In [None]:
import itertools
import operator
from random import random
from matplotlib import pyplot as plt
import numpy as np
import networkx as nx
import pandas as pd
import pomegranate
from pomegranate import HiddenMarkovModel, DiscreteDistribution, State
from sklearn.model_selection import train_test_split
import seaborn as sns

%matplotlib inline

## Get data

In [None]:
!wget -q -O chicago_streets.csv \
 https://data.cityofchicago.org/api/views/i6bp-fvbx/rows.csv?accessType=DOWNLOAD

In [None]:
streets_df = pd.read_csv("chicago_streets.csv")
streets_df.shape

In [None]:
streets_df.head()

In [None]:
streets_df[streets_df["Suffix Direction"].str.isalpha()].sample(5)

## Create labeled dataset

In [None]:
streets_df.columns = [x.strip() for x in streets_df.columns]

vocab = {y for x in streets_df["Full Street Name"].str.split() for y in x}
vocab = sorted(list(vocab))
word2idx = {x: i for i, x in enumerate(vocab)}

fields = ["Direction", "Street", "Suffix", "Suffix Direction"]
field_names = {"Direction": "DIR", 
               "Street": "STR", 
               "Suffix": "SUF", 
               "Suffix Direction": "SD"}
def iob2_tag(span, tagname):
    if span >= 1:
        return [f"B-{tagname}"] + [f"I-{tagname}" for i in range(span-1)]
    return []

X = []
Y = []
for _, row in streets_df.iterrows():
    row = row.to_dict()
    X.append(np.array(list(itertools.chain.from_iterable([row[x].split() for x in fields]))))
    Y.append(list(itertools.chain.from_iterable([iob2_tag(len(row[x].split()), field_names[x]) for x in fields])))

In [None]:
X[0], Y[0]

In [None]:
states = sorted(list({y for x in Y for y in x}))
num_states = len(states)
states

## Train HMM using `pomegranate`

In [None]:
hmm = HiddenMarkovModel()
hmmstates = [State(DiscreteDistribution.from_samples(vocab), name=s) for s in states]
hmm.add_states(hmmstates)
for s1 in hmmstates:
    hmm.add_transition(hmm.start, s1, 1.0/len(hmmstates))
    for s2 in hmmstates + [hmm.end]:
        hmm.add_transition(s1, s2, 1.0/(len(hmmstates)+1))
hmm.bake()

In [None]:
# hmm.fit(sequences=X, labels=Y, algorithm="labeled")

# ^ Broken
# Implement manually
state_dict = {s: i for i, s in enumerate(states)}
start_counts = [0 for j in range(num_states)]
transition_counts = [[0 for j in range(num_states + 1)] 
                     for i in range(num_states)]

# Iterate through data collecting transition counts
for i, y in enumerate(Y):
    start_counts[state_dict[y[0]]] += 1
    for j, yy in enumerate(y):
        if j + 1 == len(y):
            transition_counts[state_dict[yy]][-1] += 1
        else:
            transition_counts[state_dict[yy]][state_dict[y[j+1]]] += 1

# Calculate emission probabilities for each state
for i, s in enumerate(hmmstates):
    assoc_vocab = [X[i][j] for i, y in enumerate(Y) for j, yy in enumerate(y) if yy == s.name]
    s.distribution = DiscreteDistribution.from_samples(assoc_vocab)

# Calculate transition probabilities for each state pair
for i, s in enumerate(hmmstates):
    hmm.add_transition(hmm.start, s, start_counts[i]/sum(start_counts))
    for j, ss in enumerate(hmmstates):
        hmm.add_transition(s, ss, transition_counts[i][j]/sum(transition_counts[i]))
    hmm.add_transition(s, hmm.end, transition_counts[i][-1]/sum(transition_counts[i]))
    
hmm.bake()

### Inspect states and transition probabilities

In [None]:
hmm.states

In [None]:
def show_states(states):
    pd_data = []
    for s in states:
        if s.distribution:
            top_words = sorted(s.distribution.parameters[0].items(), 
                               key=operator.itemgetter(1), 
                               reverse=True)[:5]
            for w in top_words:
                pd_data.append({"state": s.name, "word": w[0], "prob": "{:0.4f}".format(w[1])})
        else:
            pd_data.append({"state": s.name, "word": "--", "prob": "--"})
    display(pd.DataFrame(pd_data, columns=["state", "word", "prob"]))

In [None]:
show_states(hmm.states)

In [None]:
plt.figure(figsize=(12,12))
hmm.plot()
plt.show()

In [None]:
def show_transitions(transitions, states):
    t_df = pd.DataFrame(transitions, columns=states)
    t_df["from_state"] = states
    t_df.set_index("from_state", inplace=True)
    plt.figure(figsize=(6,5))
    ax = sns.heatmap(t_df, annot=True, fmt="0.2f")
    plt.xlabel("to_state")
    plt.show()

In [None]:
show_transitions(hmm.dense_transition_matrix(), [x.name for x in hmm.states])

### Sample random street names from model

In [None]:
for samp in hmm.sample(10):
    print(" ".join(list(samp)))

### Demonstrate forward and backward probabilities

In [None]:
test_street = ["N", "CLARK", "ST"]

In [None]:
def show_fb_matrix(fbm, states):
    t_df = pd.DataFrame(fbm, columns=states)
    t_df = t_df.T
    plt.figure()
    ax = sns.heatmap(t_df, annot=True, fmt="0.2f")
    plt.xlabel("state")
    plt.show()

In [None]:
f_matrix = hmm.forward(test_street)
show_fb_matrix(f_matrix, [x.name for x in hmm.states])

In [None]:
b_matrix = hmm.backward(test_street)
show_fb_matrix(b_matrix, [x.name for x in hmm.states])

In [None]:
logprob, state_seq = hmm.viterbi(test_street)
print("Viterbi tag sequence: ")
print(f"  {' '.join([x[1].name for x in state_seq])}")
print(f"  logprob = {logprob:0.2f}")

## Train HMM unsupervised

In [None]:
# HMM_INIT = "uniform"
HMM_INIT = "random"

In [None]:
hmm = HiddenMarkovModel()
hmmstates = [State(DiscreteDistribution.from_samples(vocab), name=s) 
             for s in ["A", "B", "C", "D", "E"]]
hmm.add_states(hmmstates)

if HMM_INIT == "uniform":
    for s1 in hmmstates:
        hmm.add_transition(hmm.start, s1, 1.0/len(hmmstates))
        for s2 in hmmstates + [hmm.end]:
            hmm.add_transition(s1, s2, 1.0/(len(hmmstates)+1))
else:
    transition_matrix = np.random.random((len(hmmstates), len(hmmstates)+1))
    transition_matrix = (transition_matrix.T / transition_matrix.sum(axis=1)).T
    for i, s1 in enumerate(hmmstates):
        hmm.add_transition(hmm.start, s1, 1.0/len(hmmstates))
        for j, s2 in enumerate(hmmstates + [hmm.end]):
            hmm.add_transition(s1, s2, transition_matrix[i][j])

hmm.bake()

In [None]:
hmm.fit(sequences=X, algorithm="baum-welch", max_iterations=20)

In [None]:
plt.figure(figsize=(12,12))
hmm.plot()
plt.show()

In [None]:
for samp in hmm.sample(10):
    print(" ".join(list(samp)))

In [None]:
show_states(hmm.states)

In [None]:
show_transitions(hmm.dense_transition_matrix(), [x.name for x in hmm.states])