<a href="https://colab.research.google.com/github/annabellarinaldi/HMMWeatherModel/blob/main/Modeling_Weather_with_HMMs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import math
outputs = [0, 0, 2, 2, 1] # X_1 .. X_N
transition_prob = [[0.55, 0.25, 0.2],\
                   [0.15, 0.7, 0.15],\
                   [0.2, 0.3, 0.4]]
emission_prob = [[0.1, 0.3, 0.6],\
                 [0.2, 0.5, 0.3],\
                 [0.6, 0.3, 0.1]]
start_prob = [0.3, 0.3, 0.4]

In [None]:
import math

class HMM:
  def __init__(self, transition_prob: list[list[int]], emission_prob: list[list[int]], start: list[int]):
    # P(Y_(t+1) | Y_t)
    self.transition_prob = transition_prob

    # P(X_t | Y_t)
    self.emission_prob = emission_prob

    self.start = start

  def joint_prob(self, outputs: list[int], hiddens: list[int]) -> list[float]:
    prob = self.start[hiddens[0]] * self.emission_prob[hiddens[0]][outputs[0]]
    for i in range(1, len(hiddens)):
      prev_y = hiddens[i - 1]
      curr_y = hiddens[i]
      x = outputs[i]
      prob *= self.transition_prob[prev_y][curr_y] * self.emission_prob[curr_y][x]
    return prob

  def conditional_weights(self, outputs: list[int], hiddens: list[int], t: int) -> float:
    weights = []
    for y in [0, 1, 2]:
      weight = self.emission_prob[y][outputs[t]]
      if t == 0:
        weight *= self.start[y]
      else:
        weight *= self.transition_prob[hiddens[t - 1]][y]
      if t < len(hiddens) - 1:
        weight *= self.transition_prob[y][hiddens[t + 1]]
      weights.append(weight)
    return weights

In [None]:
hmm = HMM(transition_prob, emission_prob, start_prob)

In [None]:
# Test cases
print(hmm.joint_prob([0,0,2,2,1], [1,1,1,1,1]))
# about 0.000129654
print(hmm.joint_prob([0,0,2,2,1], [2,2,0,0,1]))
# 0.00028512

0.00012965399999999996
0.00028512


In [None]:
import itertools

def get_most_likely_hidden_states(hmm: HMM, outputs: list[int]):
  joint_probs = {}
  total_joint_prob = 0.0

  # iterate over possible hidden states
  for hiddens in itertools.product([0,1,2],repeat=5):
    hiddens = list(hiddens) # convert to list
    jp = hmm.joint_prob(outputs, hiddens)
    joint_probs[tuple(hiddens)] = jp
    total_joint_prob += jp
  cond_probs = {
        seq: joint / total_joint_prob for seq, joint in joint_probs.items()
  }
  top_10 = sorted(cond_probs.items(), key=lambda x: x[1], reverse=True)[:10]
  return top_10


In [None]:
get_most_likely_hidden_states(hmm, outputs)

[((2, 2, 1, 1, 1), 0.10142053368734852),
 ((2, 2, 0, 0, 0), 0.10017864960138101),
 ((2, 2, 0, 0, 1), 0.07589291636468258),
 ((2, 1, 1, 1, 1), 0.059161977984286636),
 ((2, 2, 0, 1, 1), 0.048295492232070726),
 ((2, 2, 0, 0, 2), 0.036428599855047636),
 ((1, 1, 1, 1, 1), 0.0345111538241672),
 ((2, 0, 0, 0, 0), 0.022957607200316487),
 ((2, 2, 1, 0, 0), 0.020491087418464293),
 ((2, 2, 2, 1, 1), 0.019318196892828297)]

# Gibbs Sampling

In [None]:
# Test cases
print(hmm.conditional_weights([0,0,2,2,1], [1,1,1,1,1], 2))
# about [0.0225, 0.147, 0.0045]
print(hmm.conditional_weights([0,0,2,2,1], [2,2,0,0,1], 2))
# about [0.066, 0.0135, 0.00800000000000000]

[0.0225, 0.147, 0.0045]
[0.066, 0.0135, 0.008000000000000002]


In [None]:
# Perform a single Gibbs sample Y_1 .. Y_N ~ P(Y_1 .. Y_N | X_1 .. X_N)
import random

def gibbs_sample(hmm: HMM, outputs: list[int]) -> list[list[int]]:
  n = len(outputs)
  hiddens = [random.choice([0, 1, 2]) for _ in range(n)]

  for t in range(n):
    weights = hmm.conditional_weights(outputs, hiddens, t)
    total = sum(weights)
    probs = [w / total for w in weights]
    hiddens[t] = random.choices([0, 1, 2], weights=probs, k=1)[0]

  return hiddens

In [None]:
import collections
def estimate_likely_hidden_states(hmm: HMM, outputs: list[int]):
  sample_counts = {}
  num_iterations = 5000
  burn_in = 1000

  hiddens = [random.choice([0, 1, 2]) for _ in range(len(outputs))]

  for _ in range(burn_in):
    for t in range(len(outputs)):
      weights = hmm.conditional_weights(outputs, hiddens, t)
      total = sum(weights)
      probs = [w / total for w in weights]
      hiddens[t] = random.choices([0, 1, 2], weights=probs, k=1)[0]

  for _ in range(num_iterations):
    for t in range(len(outputs)):
      weights = hmm.conditional_weights(outputs, hiddens, t)
      total = sum(weights)
      probs = [w / total for w in weights]
      hiddens[t] = random.choices([0, 1, 2], weights=probs, k=1)[0]

    sample_tuple = tuple(hiddens.copy())
    sample_counts[sample_tuple] = sample_counts.get(sample_tuple, 0) + 1

  total_samples = sum(sample_counts.values())
  estimated_probs = {
      seq: count / total_samples for seq, count in sample_counts.items()
  }
  top_10 = sorted(estimated_probs.items(), key=lambda x: x[1], reverse=True)[:10]
  return top_10

In [None]:
estimate_likely_hidden_states(hmm, outputs)

[((2, 2, 1, 1, 1), 0.1014),
 ((2, 2, 0, 0, 0), 0.1008),
 ((2, 2, 0, 0, 1), 0.0816),
 ((2, 1, 1, 1, 1), 0.0604),
 ((2, 2, 0, 1, 1), 0.0468),
 ((2, 2, 0, 0, 2), 0.0342),
 ((1, 1, 1, 1, 1), 0.0338),
 ((2, 0, 0, 0, 0), 0.0286),
 ((2, 2, 1, 0, 0), 0.0212),
 ((2, 2, 2, 1, 1), 0.021)]