In [2]:
# Weather Forecasting using Hidden Markov Model + Viterbi
# Assignment-ready code for Google Colab

import numpy as np
import pandas as pd
from collections import Counter
import pickle, os

# ---------------- CONFIG ----------------
DATA_PATH = 'weatherHistory.csv'  # upload file in Colab to /content
N_STATES = 3       # number of hidden states
N_ITER = 40        # Baum-Welch training iterations
TEST_DAYS = 365    # last N days for test set
SEED = 42
# ----------------------------------------

np.random.seed(SEED)

# ---- Map summaries into categories ----
def coarse_summary(s):
    s = str(s).lower()
    if 'rain' in s or 'drizzle' in s: return 'rain'
    if 'snow' in s or 'sleet' in s: return 'snow'
    if 'clear' in s or 'sun' in s: return 'clear'
    if 'cloud' in s or 'overcast' in s or 'partly' in s: return 'cloudy'
    if 'fog' in s or 'mist' in s: return 'fog'
    return 'other'

# ---- Load data ----
df = pd.read_csv(DATA_PATH)
df['Formatted Date'] = pd.to_datetime(df['Formatted Date'], errors='coerce', utc=True)
df = df.dropna(subset=['Formatted Date']).copy()
df['date'] = df['Formatted Date'].dt.date
df['obs_coarse'] = df['Summary'].apply(coarse_summary)

# daily mode
daily = df.groupby('date')['obs_coarse'].agg(lambda x: Counter(x).most_common(1)[0][0]).reset_index()
daily = daily.sort_values('date').reset_index(drop=True)

# encode observations
obs_list = sorted(daily['obs_coarse'].unique())
obs2i = {o:i for i,o in enumerate(obs_list)}
i2obs = {i:o for o,i in obs2i.items()}
sequence = daily['obs_coarse'].map(obs2i).values

print("Observation categories:", obs_list)
print("Total days:", len(sequence))

# ---- HMM helpers ----
def random_normalized(n):
    v = np.random.rand(n)
    return v / v.sum()

def initialize_hmm(n_states, n_obs):
    pi = random_normalized(n_states)
    A = np.vstack([random_normalized(n_states) for _ in range(n_states)])
    B = np.vstack([random_normalized(n_obs) for _ in range(n_states)])
    return pi, A, B

def forward(obs, pi, A, B):
    T, N = len(obs), A.shape[0]
    alpha = np.zeros((T, N))
    scale = np.zeros(T)
    alpha[0] = pi * B[:, obs[0]]
    scale[0] = alpha[0].sum(); alpha[0] /= scale[0]
    for t in range(1, T):
        alpha[t] = (alpha[t-1] @ A) * B[:, obs[t]]
        scale[t] = alpha[t].sum()
        alpha[t] /= scale[t]
    return alpha, scale

def backward(obs, A, B, scale):
    T, N = len(obs), A.shape[0]
    beta = np.zeros((T, N))
    beta[-1] = 1.0 / scale[-1]
    for t in range(T-2, -1, -1):
        beta[t] = (A @ (B[:, obs[t+1]] * beta[t+1])) / scale[t]
    return beta

def baum_welch(obs, n_states, n_obs, n_iter=20):
    pi, A, B = initialize_hmm(n_states, n_obs)
    T = len(obs)
    for _ in range(n_iter):
        alpha, scale = forward(obs, pi, A, B)
        beta = backward(obs, A, B, scale)
        # expectations
        xi = np.zeros((T-1, n_states, n_states))
        for t in range(T-1):
            numer = (alpha[t][:,None] * A * (B[:, obs[t+1]] * beta[t+1])[None,:])
            xi[t] = numer / numer.sum()
        gamma = alpha * beta
        gamma /= gamma.sum(axis=1, keepdims=True)
        # update
        pi = gamma[0]
        A = xi.sum(axis=0); A /= A.sum(axis=1, keepdims=True)
        B = np.zeros_like(B)
        for k in range(n_obs):
            mask = (obs == k)
            if mask.any(): B[:,k] = gamma[mask].sum(axis=0)
        B /= B.sum(axis=1, keepdims=True)
    return pi, A, B

def viterbi(obs, pi, A, B):
    T, N = len(obs), A.shape[0]
    delta = np.zeros((T, N)); psi = np.zeros((T, N), dtype=int)
    delta[0] = np.log(pi+1e-12) + np.log(B[:,obs[0]]+1e-12)
    for t in range(1,T):
        for j in range(N):
            seq = delta[t-1] + np.log(A[:,j]+1e-12)
            psi[t,j] = np.argmax(seq)
            delta[t,j] = seq[psi[t,j]] + np.log(B[j,obs[t]]+1e-12)
    states = np.zeros(T,dtype=int)
    states[-1] = np.argmax(delta[-1])
    for t in range(T-2,-1,-1):
        states[t] = psi[t+1,states[t+1]]
    return states

# ---- Train/Test split ----
train_seq = sequence[:-TEST_DAYS]
test_seq  = sequence[-TEST_DAYS:]

# Train HMM
pi, A, B = baum_welch(train_seq, N_STATES, len(obs_list), n_iter=N_ITER)
print("\nTrained HMM Parameters:")
print("Initial:", pi)
print("Transition:\n", A)
print("Emission:\n", B)

# Decode full sequence with Viterbi
states = viterbi(sequence, pi, A, B)
daily['state'] = states

# ---- Forecasting ----
# One-step-ahead prediction: use last state to predict tomorrow
preds, actuals = [], []
for t in range(len(train_seq), len(sequence)-1):
    st = states[t]
    next_state = np.argmax(A[st])          # most likely next state
    pred_obs = np.argmax(B[next_state])    # most likely observation
    preds.append(pred_obs)
    actuals.append(sequence[t+1])

# Accuracy
preds, actuals = np.array(preds), np.array(actuals)
acc = (preds == actuals).mean()
print(f"\nForecast Accuracy on {TEST_DAYS} test days: {acc*100:.2f}%")

# Show sample predictions
for i in range(10):
    print(f"Day {i+1} actual: {i2obs[actuals[i]]} | predicted: {i2obs[preds[i]]}")

Observation categories: ['clear', 'cloudy', 'fog', 'other', 'rain']
Total days: 4019

Trained HMM Parameters:
Initial: [1.04766438e-28 1.00000000e+00 1.85054916e-23]
Transition:
 [[0.61365763 0.3622222  0.02412017]
 [0.02587833 0.96550225 0.00861942]
 [0.00764423 0.03735513 0.95500065]]
Emission:
 [[8.14538827e-01 1.57732005e-01 2.27591836e-02 4.96998405e-03
  0.00000000e+00]
 [1.53158536e-02 9.79560880e-01 5.12326648e-03 1.68983738e-34
  0.00000000e+00]
 [1.88233302e-02 6.99144424e-01 2.82032246e-01 2.25475671e-57
  0.00000000e+00]]

Forecast Accuracy on 365 test days: 93.68%
Day 1 actual: cloudy | predicted: cloudy
Day 2 actual: cloudy | predicted: cloudy
Day 3 actual: cloudy | predicted: cloudy
Day 4 actual: fog | predicted: cloudy
Day 5 actual: fog | predicted: cloudy
Day 6 actual: fog | predicted: cloudy
Day 7 actual: cloudy | predicted: cloudy
Day 8 actual: cloudy | predicted: cloudy
Day 9 actual: cloudy | predicted: cloudy
Day 10 actual: cloudy | predicted: cloudy
