❄️ Frostvakt – Modellutvärdering (fast 70/30-split) – Körs i colab

In [None]:

# Steg 1: Installera paket (Colab)
!pip -q install scikit-learn pandas numpy


In [None]:

# Steg 2: Ladda upp SQLite-databas (t.ex. weather_history_forcast.db)
from google.colab import files
import os, shutil

def upload_database():
    print("Klicka på 'Välj filer' och välj din .db-fil (ex. weather_history_forcast.db)")
    uploaded = files.upload()
    os.makedirs('/content/data', exist_ok=True)
    db_path = None
    for fn in uploaded.keys():
        dst = f"/content/data/{fn}"
        shutil.move(fn, dst)
        print("Fil uppladdad till:", dst)
        if fn.endswith(".db"):
            db_path = dst
    if db_path is None:
        raise RuntimeError("Hittade ingen .db-fil. Försök igen.")
    return db_path

DB_PATH = upload_database()
DB_PATH


In [None]:

# Steg 3: Funktioner
import sqlite3
import numpy as np
import pandas as pd
from typing import Tuple
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

# Data
def fetch_historical_df(db_path: str) -> pd.DataFrame:
    q = """
    SELECT valid_time, temperature_2m, wind_speed_10m, 
           relative_humidity_2m, dew_point_2m, cloud_cover, pressure_msl,
           strftime('%Y', valid_time) as year,
           strftime('%m', valid_time) as month,
           strftime('%d', valid_time) as day,
           strftime('%H', valid_time) as hour
    FROM weather_historical
    WHERE temperature_2m IS NOT NULL AND wind_speed_10m IS NOT NULL
    ORDER BY valid_time
    """
    con = sqlite3.connect(db_path)
    df = pd.read_sql(q, con)
    con.close()
    df['valid_time'] = pd.to_datetime(df['valid_time'])
    for c in ['year','month','day','hour']:
        df[c] = df[c].astype(int)
    df['actual_frost'] = (df['temperature_2m'] <= 0).astype(int)
    return df

def prepare_features(df: pd.DataFrame):
    X = df.copy()
    X['temp_dewpoint_diff'] = X['temperature_2m'] - X['dew_point_2m']
    X['hour_sin'] = np.sin(2*np.pi*X['hour']/24)
    X['hour_cos'] = np.cos(2*np.pi*X['hour']/24)
    X['month_sin'] = np.sin(2*np.pi*X['month']/12)
    X['month_cos'] = np.cos(2*np.pi*X['month']/12)
    X['temp_rolling_3h'] = X['temperature_2m'].rolling(3, min_periods=1).mean()
    X['temp_trend'] = X['temperature_2m'] - X['temperature_2m'].rolling(6, min_periods=1).mean()
    features = [
        'temperature_2m','wind_speed_10m','relative_humidity_2m','dew_point_2m',
        'cloud_cover','pressure_msl','temp_dewpoint_diff','hour_sin','hour_cos',
        'month_sin','month_cos','temp_rolling_3h','temp_trend'
    ]
    X = X[features].fillna(X.mean(numeric_only=True))
    y = df['actual_frost'].values
    return X, y

# ---- Regelalgoritmer ----
def rule_original(row):
    return row['temperature_2m'] <= 0

def rule_daytime_filter(row):
    hour = row.get('hour', None)
    temp_roll = row.get('temp_rolling_3h', row['temperature_2m'])
    if hour is not None and 8 <= hour <= 17 and temp_roll > 0:
        return False
    return row['temperature_2m'] <= 0

def rule_cloud_and_daytime(row):
    if rule_daytime_filter(row):
        return True
    cloud = row.get('cloud_cover', np.nan)
    wind = row['wind_speed_10m']
    temp_roll = row.get('temp_rolling_3h', row['temperature_2m'])
    cloud_factor = 1.5 if pd.notna(cloud) and cloud <= 20 else (1.2 if pd.notna(cloud) and cloud <= 50 else 1.0)
    limit = 3.0 if cloud_factor >= 1.4 else (2.0 if cloud_factor >= 1.1 else 1.0)
    return (temp_roll <= limit) and (wind < 4)

def rule_complete(row):
    if rule_cloud_and_daytime(row):
        return True
    hum = row.get('relative_humidity_2m', np.nan)
    temp_roll = row.get('temp_rolling_3h', row['temperature_2m'])
    wind = row['wind_speed_10m']
    if pd.notna(hum) and temp_roll <= 2 and wind < 3 and hum > 85:
        return True
    return False

def rule_advanced_rolling(row):
    if row['temperature_2m'] <= 0:
        return True
    trend = row.get('temp_trend', 0)
    wind = row['wind_speed_10m']
    temp_roll = row.get('temp_rolling_3h', row['temperature_2m'])
    return (temp_roll <= 2 and wind < 3 and trend < 0)

# Eval helpers
def tally_metrics(y_true, y_pred):
    y_true = np.asarray(y_true).astype(int)
    y_pred = np.asarray(y_pred).astype(int)
    tp = ((y_true==1) & (y_pred==1)).sum()
    fp = ((y_true==0) & (y_pred==1)).sum()
    fn = ((y_true==1) & (y_pred==0)).sum()
    recall = tp/(tp+fn) if (tp+fn)>0 else 0.0
    precision = tp/(tp+fp) if (tp+fp)>0 else 0.0
    f1 = 2*(precision*recall)/(precision+recall) if (precision+recall)>0 else 0.0
    return dict(recall=recall, precision=precision, f1=f1, missade=int(fn), falska=int(fp))

def ascii_table(df: pd.DataFrame) -> str:
    df = df.copy()
    df['Recall'] = (df['recall']*100).round(1).astype(str) + "%"
    df['Precision'] = (df['precision']*100).round(1).astype(str) + "%"
    df['F1'] = df['f1'].round(3).map(lambda x: f"{x:.3f}")
    df['Missade'] = df['missade'].astype(int)
    df['Falska'] = df['falska'].astype(int)
    show = df[['Algoritm','Recall','Precision','F1','Missade','Falska']].copy()
    widths = [max(len(str(x)) for x in show[c]) for c in show.columns]
    widths = [max(w, len(col)) for w,col in zip(widths, show.columns)]
    header = "  ".join(col.ljust(w) for col,w in zip(show.columns, widths))
    sep = "-"*len(header)
    lines = [header, sep]
    for _, row in show.iterrows():
        line = "  ".join(str(row[c]).ljust(w) for c,w in zip(show.columns, widths))
        lines.append(line)
    return "\n".join(lines)

# Fast 70/30-split
def frost_stratified_split_fixed(df: pd.DataFrame, random_state=42):
    """Dela upp frost och icke-frost i ca 70/30 till train/test, oberoende per klass."""
    df = df.sample(frac=1.0, random_state=random_state).reset_index(drop=True)

    frost = df[df['actual_frost'] == 1]
    nonfrost = df[df['actual_frost'] == 0]

    # Frost 70/30
    n_frost_train = int(round(len(frost) * 0.70))
    frost_train = frost.iloc[:n_frost_train]
    frost_test  = frost.iloc[n_frost_train:]

    # Icke-frost 70/30
    n_nf_train = int(round(len(nonfrost) * 0.70))
    nonfrost_train = nonfrost.iloc[:n_nf_train]
    nonfrost_test  = nonfrost.iloc[n_nf_train:]

    df_train = pd.concat([frost_train, nonfrost_train], ignore_index=True)                 .sample(frac=1.0, random_state=random_state)
    df_test  = pd.concat([frost_test, nonfrost_test], ignore_index=True)                 .sample(frac=1.0, random_state=random_state+1)
    return df_train, df_test

# Sample weights
def make_sample_weights(y: np.ndarray, frost_weight: float = 3.0):
    y = np.asarray(y).astype(int)
    return np.where(y==1, frost_weight, 1.0).astype(float)

# ML-utvärdering med vikter
def evaluate_ml_models(X_train, y_train, X_test, y_test, frost_weight=3.0):
    weights = make_sample_weights(y_train, frost_weight=frost_weight)
    scaler = StandardScaler()
    Xtr = scaler.fit_transform(X_train)
    Xte = scaler.transform(X_test)

    models = {
        "Logistic Regression": LogisticRegression(max_iter=1000, random_state=42),
        "Random Forest": RandomForestClassifier(n_estimators=200, max_depth=None, random_state=42, n_jobs=-1),
        "Gradient Boosting": GradientBoostingClassifier(n_estimators=200, random_state=42)
    }

    rows = []
    # Logistic Regression – sample_weight
    m = models["Logistic Regression"]
    m.fit(Xtr, y_train, sample_weight=weights)
    pred = m.predict(Xte)
    rows.append(dict(Algoritm="Logistic Regression", **tally_metrics(y_test, pred)))

    # Random Forest – class_weight
    rf = models["Random Forest"]
    rf.set_params(class_weight={0:1.0, 1:float(frost_weight)})
    rf.fit(X_train.values, y_train)
    pred = rf.predict(X_test.values)
    rows.append(dict(Algoritm="Random Forest", **tally_metrics(y_test, pred)))

    # Gradient Boosting – sample_weight
    gb = models["Gradient Boosting"]
    gb.fit(X_train.values, y_train, sample_weight=weights)
    pred = gb.predict(X_test.values)
    rows.append(dict(Algoritm="Gradient Boosting", **tally_metrics(y_test, pred)))

    return pd.DataFrame(rows)

# Regelutvärdering
def evaluate_rules(df_test: pd.DataFrame):
    df_rules = df_test.copy()
    df_rules['hour'] = df_rules['valid_time'].dt.hour
    preds = [
        ("Original", df_rules.apply(rule_original, axis=1)),
        ("+ Dagtidsfilter", df_rules.apply(rule_daytime_filter, axis=1)),
        ("+ Moln & Dagtid", df_rules.apply(rule_cloud_and_daytime, axis=1)),
        ("+ Komplett (Recommended)", df_rules.apply(rule_complete, axis=1)),
        ("+ Advanced (Rullande)", df_rules.apply(rule_advanced_rolling, axis=1)),
    ]
    rows = []
    for name, p in preds:
        rows.append(dict(Algoritm=name, **tally_metrics(df_rules['actual_frost'].values, p.values)))
    return pd.DataFrame(rows)


In [None]:

# === Steg 4: Körning ===
FROST_WEIGHT = 3.0  # vikt för frost i träning

df_all = fetch_historical_df(DB_PATH)

# Fast 70/30-split
df_train, df_test = frost_stratified_split_fixed(df_all, random_state=42)

# Features för respektive split
X_train, y_train = prepare_features(df_train)
X_test,  y_test  = prepare_features(df_test)

# Utvärdera regler + ML
rule_eval = evaluate_rules(df_test)
ml_eval = evaluate_ml_models(X_train, y_train, X_test, y_test, frost_weight=FROST_WEIGHT)

# Kombinera och sortera
all_eval = pd.concat([rule_eval, ml_eval], ignore_index=True).sort_values('f1', ascending=False).reset_index(drop=True)

# Info om splitten
print("Antal observationer – Train:", len(df_train), "Test:", len(df_test))
print("Frost i Train:", df_train['actual_frost'].sum(), "Frost i Test:", df_test['actual_frost'].sum())
print("Icke-frost i Train:", (df_train['actual_frost']==0).sum(), "Icke-frost i Test:", (df_test['actual_frost']==0).sum())
all_eval


In [None]:

# === Steg 5: Skriv ut ASCII-tabellen (sista rader i output) ===
print("\n" + "="*80)
print("SLUTTABELL")
print("="*80)
print(ascii_table(all_eval))
