In [1]:
import pandas as pd
import numpy as np

In [2]:
path = r"../data/weatherHistory.csv"

# 1. Load data
df = pd.read_csv(path)

# 2. Parse and sort by date
df["Formatted Date"] = pd.to_datetime(df["Formatted Date"])
df = df.sort_values("Formatted Date").reset_index(drop=True)

# Keep only needed columns for now
df = df[["Formatted Date", "Temperature (C)", "Summary", "Precip Type"]]

  df["Formatted Date"] = pd.to_datetime(df["Formatted Date"])


In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 96453 entries, 0 to 96452
Data columns (total 4 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Formatted Date   96453 non-null  object 
 1   Temperature (C)  96453 non-null  float64
 2   Summary          96453 non-null  object 
 3   Precip Type      95936 non-null  object 
dtypes: float64(1), object(3)
memory usage: 2.9+ MB


In [4]:
df.head()

Unnamed: 0,Formatted Date,Temperature (C),Summary,Precip Type
0,2006-01-01 00:00:00+01:00,0.577778,Partly Cloudy,rain
1,2006-01-01 01:00:00+01:00,1.161111,Mostly Cloudy,rain
2,2006-01-01 02:00:00+01:00,1.666667,Mostly Cloudy,rain
3,2006-01-01 03:00:00+01:00,1.711111,Overcast,rain
4,2006-01-01 04:00:00+01:00,1.183333,Mostly Cloudy,rain


We will use Temperature (C) as the observation and convert it to 3 categories:

0 → Cold

1 → Mild

2 → Hot

For example:

Temp < 10°C → Cold (0)

10°C ≤ Temp < 20°C → Mild (1)

Temp ≥ 20°C → Hot (2)

In [5]:
def temp_to_obs(t):
    if t < 10:
        return 0   # Cold
    elif t < 20:
        return 1   # Mild
    else:
        return 2   # Hot

df["Obs"] = df["Temperature (C)"].apply(temp_to_obs)

# Check distribution of observations
print(df["Obs"].value_counts())


Obs
0    42436
1    33340
2    20677
Name: count, dtype: int64


In [6]:
# Observation sequence as integers
O = df["Obs"].to_numpy(dtype=int)

print("Total length of observation sequence:", len(O))
print("First 20 observations:", O[:20])


Total length of observation sequence: 96453
First 20 observations: [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [7]:
train_ratio = 0.7
split_idx = int(train_ratio * len(O))

O_train = O[:split_idx]
O_test  = O[split_idx:]

print("Train length:", len(O_train))
print("Test length:", len(O_test))


Train length: 67517
Test length: 28936


## **2. Model Assumption And Initialization of Matrices**

### **Normalization of HMM Parameter Matrices**
#### **Ensuring π, A, and B Rows Form Valid Probability Distributions**

In [None]:
def normalize_rows(mat, eps=1e-12):
    # rows to sum to 1
    row_sums = mat.sum(axis=1, keepdims=True)
    row_sums[row_sums == 0] = eps
    return mat / row_sums

### **Initialization functions**

In [None]:
def init_uniform(N, M):
    pi = np.ones(N) / N
    A = np.ones((N, N)) / N
    B = np.ones((N, M)) / M
    return pi, A, B

def init_dirichlet(N, M, alpha_pi=1.0, alpha_A=1.0, alpha_B=1.0, seed=None):
    rng = np.random.default_rng(seed)
    # pi: length N
    pi = rng.dirichlet([alpha_pi]*N)
    # A: N x N (each row a distribution)
    A = np.vstack([rng.dirichlet([alpha_A]*N) for _ in range(N)])
    # B: N x M
    B = np.vstack([rng.dirichlet([alpha_B]*M) for _ in range(N)])
    return pi, A, B

def init_with_self_bias(N, M, self_prob=0.6):
    """Initialize A with a bias to remain in the same state."""
    if not (0 <= self_prob <= 1):
        raise ValueError("self_prob must be between 0 and 1")
    off_prob = (1.0 - self_prob) / (N - 1) if N>1 else 0.0
    A = np.full((N, N), off_prob)
    np.fill_diagonal(A, self_prob)
    pi = np.ones(N) / N
    B = np.ones((N, M)) / M
    return pi, A, B

### **Initialization from Labeled Sequences**
#### **-Supervised Estimation of HMM Parameters**

In [None]:
def init_from_labeled_sequences(labeled_sequences, N, M, smoothing=1.0):
    """
    labeled_sequences: list of sequences. Each sequence is (states, obs) where
      states: list of ints in [0..N-1]
      obs: list of ints in [0..M-1]
    Returns: pi, A, B (with Laplace smoothing)
    """
    pi_counts = np.zeros(N)
    A_counts = np.zeros((N, N))
    B_counts = np.zeros((N, M))

    for states, obs in labeled_sequences:
        if len(states) == 0:
            continue
        pi_counts[states[0]] += 1
        for t in range(len(states)):
            s = states[t]
            o = obs[t]
            B_counts[s, o] += 1
            if t+1 < len(states):
                A_counts[s, states[t+1]] += 1

    # Add smoothing and normalize
    pi = (pi_counts + smoothing) / (pi_counts.sum() + smoothing * N)
    A = (A_counts + smoothing) / (A_counts.sum(axis=1, keepdims=True) + smoothing * N)
    B = (B_counts + smoothing) / (B_counts.sum(axis=1, keepdims=True) + smoothing * M)

    return pi, A, B

### **Initialization from Observations Using K-Means Clustering**
#### **-Unsupervised Approximation for Emission Probabilities**

In [None]:
def init_from_observations_kmeans(observations, N, M, kmeans_labels, smoothing=1.0):
    """
    observations: list/array of observation indices (0..M-1)
    kmeans_labels: array of length len(observations) with labels in 0..N-1
    This is a helper if you precomputed clustering (KMeans on feature vectors).
    """
    # convert sequence-like grouping if needed
    # Here we assume a single long sequence
    pi_counts = np.zeros(N)
    A_counts = np.zeros((N, N))
    B_counts = np.zeros((N, M))

    if len(kmeans_labels) == 0:
        return init_dirichlet(N, M)

    pi_counts[kmeans_labels[0]] += 1
    for t in range(len(kmeans_labels)):
        s = kmeans_labels[t]
        o = observations[t]
        B_counts[s, o] += 1
        if t+1 < len(kmeans_labels):
            A_counts[s, kmeans_labels[t+1]] += 1

    pi = (pi_counts + smoothing) / (pi_counts.sum() + smoothing * N)
    A = (A_counts + smoothing) / (A_counts.sum(axis=1, keepdims=True) + smoothing * N)
    B = (B_counts + smoothing) / (B_counts.sum(axis=1, keepdims=True) + smoothing * M)
    return pi, A, B

### **Validation function**

In [14]:
def validate_pi_A_B(pi, A, B, tol=1e-9):
    ok = True

    if not np.allclose(pi.sum(), 1.0, atol=tol):
        print(f"Warning: pi does not sum to 1 (sum={pi.sum()})")
        ok = False

    if not np.allclose(A.sum(axis=1), np.ones(A.shape[0]), atol=tol):
        print(f"Warning: some rows of A do not sum to 1 (row sums={A.sum(axis=1)})")
        ok = False

    if not np.allclose(B.sum(axis=1), np.ones(B.shape[0]), atol=tol):
        print(f"Warning: some rows of B do not sum to 1 (row sums={B.sum(axis=1)})")
        ok = False

    return ok

### **Generate & validate π, A, B**

In [16]:
N = 3   # hidden states
M = 4   # observation symbols
pi_u, A_u, B_u = init_uniform(N, M)
pi_r, A_r, B_r = init_dirichlet(N, M, alpha_pi=1.0, alpha_A=1.0, alpha_B=1.0, seed=42)
print("Uniform A:\n", A_u)
print("Uniform B:\n", B_u)
print("Dirichlet pi:\n", pi_r)
assert validate_pi_A_B(pi_r, A_r, B_r)

Uniform A:
 [[0.33333333 0.33333333 0.33333333]
 [0.33333333 0.33333333 0.33333333]
 [0.33333333 0.33333333 0.33333333]]
Uniform B:
 [[0.25 0.25 0.25 0.25]
 [0.25 0.25 0.25 0.25]
 [0.25 0.25 0.25 0.25]]
Dirichlet pi:
 [0.33742524 0.32787894 0.33469582]


## **Member3 -> GOOD LUCK :)**