# INF552: Programming Assignment 7 [Hidden Markov Models]

Student: Jiashi Chen

USC-ID: 4684194123

## Part 1: Implementation

In [1]:
import numpy as np

### Load data

In [2]:
def get_free_node(filename):
    f = open(filename, "r")
    count = 0
    free_nodes = []
    for line in f:
        if count > 1 and count < 12:
            vailds = line.split()
            for i in range(len(vailds)):
                if vailds[i] == "1":
                    free_nodes.append((count-2, i))

        count += 1
    
    return free_nodes

In [3]:
free_nodes = get_free_node("hmm-data.txt")
# free_nodes

In [4]:
def get_tower_node(filename):
    f = open(filename, "r")
    count = 0
    tower_node = []
    for line in f:
        if count > 15 and count < 20:
            coordinate_str = line.split(":")[1].split()
            tower_node.append(tuple([int(i) for i in coordinate_str]))

        count += 1
    
    return tower_node

In [5]:
tower_nodes = get_tower_node("hmm-data.txt")
tower_nodes

[(0, 0), (0, 9), (9, 0), (9, 9)]

In [6]:
def get_noisy_distance(filename):
    f = open(filename, "r")
    count = 0
    noisy_distance = []
    for line in f:
        if count > 23 and count < 35:
            noisy_distance_str = line.split()
            noisy_distance.append([float(i) for i in noisy_distance_str])

        count += 1
    
    return noisy_distance

In [7]:
noisy_distance = get_noisy_distance("hmm-data.txt")
noisy_distance

[[6.3, 5.9, 5.5, 6.7],
 [5.6, 7.2, 4.4, 6.8],
 [7.6, 9.4, 4.3, 5.4],
 [9.5, 10.0, 3.7, 6.6],
 [6.0, 10.7, 2.8, 5.8],
 [9.3, 10.2, 2.6, 5.4],
 [8.0, 13.1, 1.9, 9.4],
 [6.4, 8.2, 3.9, 8.8],
 [5.0, 10.3, 3.6, 7.2],
 [3.8, 9.8, 4.4, 8.8],
 [3.3, 7.6, 4.3, 8.5]]

### Obtain possible node each time

In [8]:
def get_possible_nodes(noisy_distance, free_nodes, tower_nodes):
    possible_nodes = {}
    for i in range(len(noisy_distance)):
        for node in free_nodes:
            count = 0
            for j in range(len(tower_nodes)):
                distance = np.sqrt((node[0]- tower_nodes[j][0])**2 + (node[1]- tower_nodes[j][1])**2)
                if noisy_distance[i][j] > distance*0.7 and noisy_distance[i][j] < distance*1.3:
                    count += 1
                else:
                    break
            if count == 4:
                current_possible_nodes = possible_nodes.get(i, [])
                current_possible_nodes.append(node)
                possible_nodes[i] = current_possible_nodes
    return possible_nodes

In [9]:
possible_nodes = get_possible_nodes(noisy_distance, free_nodes, tower_nodes)
possible_nodes

{0: [(3, 4), (3, 5), (4, 3), (4, 4), (4, 5), (5, 3), (5, 4), (5, 5), (6, 4)],
 1: [(4, 3), (5, 1), (5, 3), (5, 4), (6, 3), (6, 4), (7, 3)],
 2: [(6, 3), (6, 4), (7, 3), (7, 4), (7, 5), (8, 4), (9, 4)],
 3: [(7, 3), (7, 4), (8, 3), (8, 4), (9, 3)],
 4: [(7, 1), (7, 2), (7, 3), (8, 2), (8, 3)],
 5: [(7, 2), (7, 3), (8, 2), (8, 3), (9, 3)],
 6: [(7, 0), (7, 1)],
 7: [(5, 0), (5, 1), (5, 3), (6, 0), (6, 1)],
 8: [(4, 1), (5, 0), (5, 1), (6, 0), (6, 1), (6, 3)],
 9: [(3, 0), (3, 1), (4, 0), (4, 1), (5, 0), (5, 1)],
 10: [(3, 0), (3, 1), (4, 0), (4, 1)]}

### Calculate the probability of evidence

In [10]:
def get_evidence_probability(node, tower_nodes):
    probability = 1
    for tower_node in tower_nodes:
        distance = np.sqrt((node[0]- tower_node[0])**2 + (node[1]- tower_node[1])**2)
        low = distance * 0.7
        high = distance * 1.3
        probability *= 1/(high - low)
    return probability

### Check whether it's neighbor and transit probability

In [11]:
def is_neighbor(node1, node2):
    diff = np.array(node1) - np.array(node2)
    total = np.sum(diff**2)
    return total == 1

In [12]:
def tansit_proba(node):
    count = 0
    x = node[0]
    y = node[1]
    if (x + 1, y) in free_nodes:
        count += 1
    if (x - 1, y) in free_nodes:
        count += 1
    if (x, y + 1) in free_nodes:
        count += 1
    if (x, y - 1) in free_nodes:
        count += 1
    return 1/count

### Viterbi algorithm

$$
Data structure: 
pp = \{time: \{node:\{probability: \delta, pre:\psi\}\}\}
$$

In [13]:
def Viterbi_algo(noisy_distance, possible_nodes, free_nodes):
    possible_path = {}
    time = 0
    total_time = len(noisy_distance)
    possible_path[time] = {}
    for possible_node in possible_nodes[time]:
        attr = {}
        attr["prob"] = 1/len(free_nodes) * get_evidence_probability(possible_node, tower_nodes)
        attr["pre"] = -1
        possible_path[time][possible_node] = attr

    for i in range(1, total_time):
        time = i
        possible_path[time] = {}
        for possible_node in possible_nodes[time]:
            for pre_node in possible_path[time-1]:
                if is_neighbor(pre_node, possible_node):
                    new_pro = possible_path[time-1][pre_node]["prob"] * tansit_proba(pre_node) * get_evidence_probability(possible_node, tower_nodes)
                    if possible_path[time].get(possible_node, False):
                        if possible_path[time][possible_node]["prob"] < new_pro:
                            possible_path[time][possible_node]["prob"] = new_pro
                            possible_path[time][possible_node]["pre"] = pre_node
                    else:
                        possible_path[time][possible_node] = {}
                        possible_path[time][possible_node]["prob"] = new_pro
                        possible_path[time][possible_node]["pre"] = pre_node
    return possible_path

In [14]:
possible_path = Viterbi_algo(noisy_distance, possible_nodes, free_nodes)
possible_path

{0: {(3, 4): {'prob': 5.401317337782166e-05, 'pre': -1},
  (3, 5): {'prob': 5.401317337782166e-05, 'pre': -1},
  (4, 3): {'prob': 5.401317337782166e-05, 'pre': -1},
  (4, 4): {'prob': 5.407940414014609e-05, 'pre': -1},
  (4, 5): {'prob': 5.407940414014609e-05, 'pre': -1},
  (5, 3): {'prob': 5.401317337782166e-05, 'pre': -1},
  (5, 4): {'prob': 5.407940414014609e-05, 'pre': -1},
  (5, 5): {'prob': 5.4079404140146074e-05, 'pre': -1},
  (6, 4): {'prob': 5.401317337782166e-05, 'pre': -1}},
 1: {(4, 3): {'prob': 8.460526405193606e-08, 'pre': (5, 3)},
  (5, 3): {'prob': 8.460526405193606e-08, 'pre': (4, 3)},
  (5, 4): {'prob': 8.481287661244423e-08, 'pre': (5, 5)},
  (6, 3): {'prob': 8.575479693425421e-08, 'pre': (5, 3)},
  (6, 4): {'prob': 6.35317550458042e-08, 'pre': (5, 4)}},
 2: {(6, 3): {'prob': 1.343247727289031e-10, 'pre': (5, 3)},
  (6, 4): {'prob': 1.343247727289031e-10, 'pre': (6, 3)},
  (7, 3): {'prob': 1.3775489344139835e-10, 'pre': (6, 3)},
  (7, 4): {'prob': 7.337351503856009e-

### Get most likely ending point

In [15]:
def most_prob_end(possible_path, noisy_distance):
    total_time = len(noisy_distance)
    possible_node = possible_path[total_time-1]
    max_pro = -1
    max_node = -1
    for node, attr in possible_node.items():
        if attr["prob"] > max_pro:
            max_pro = attr["prob"]
            max_node = node
    return max_node

In [16]:
max_node = most_prob_end(possible_path, noisy_distance)

### Follow the backtrack till the first observation

In [17]:
def most_likely_trajectory(possible_path, noisy_distance, max_node):
    total_time = len(noisy_distance)
    route = []
    route.append(max_node)
    for time in range(total_time-1, 0, -1):
        max_node = possible_path[time][max_node]["pre"]
        route.append(max_node)
        time -= 1
    return route[::-1]

In [18]:
trajectory = most_likely_trajectory(possible_path, noisy_distance, max_node)

In [19]:
print ("The most likely path is", ", ".join(map(lambda x: str(x), trajectory)))

The most likely path is (5, 3), (6, 3), (7, 3), (8, 3), (8, 2), (7, 2), (7, 1), (6, 1), (5, 1), (4, 1), (3, 1)


## Part 2: Software Familiarization

In [20]:
from hmmlearn import hmm    # Obtain necessary package

### Prepare data

In [21]:
node_to_index = {}
node_index = 0
for node in free_nodes:
    if node not in node_to_index.keys():
        node_to_index[node] = f"node {node_index}"
        node_index += 1

index_to_node = {}
for node, index in node_to_index.items():
    index_to_node[index] = node

In [22]:
states = [key for key in index_to_node.keys()]
n_states = len(states)
observations = [str(i) for i in range(len(noisy_distance))]
n_observations = len(observations)
start_probability = [1/n_states for i in range(n_states)]

In [23]:
transition_probability = []
for i in states:
    trans_i = []
    for j in states:
        if is_neighbor(index_to_node[i], index_to_node[j]):
            trans_i.append(tansit_proba(index_to_node[i]))
        else:
            trans_i.append(0)
    transition_probability.append(trans_i)

In [24]:
emission_probability = []
for s in states:
    emission_s = []
    for o in range(n_observations):
        if index_to_node[s] in possible_nodes[o]:
            emission_s.append(get_evidence_probability(index_to_node[s], tower_nodes))
        else:
            emission_s.append(0)
    
    emission_probability.append(emission_s)

### Fit the model

In [25]:
model = hmm.MultinomialHMM(n_components=n_states)   #Set the number of hidden states
model.startprob_ = np.array(start_probability)      #Initial state occupation distribution.
model.transmat_ = np.array(transition_probability)  #Set matrix of transition probabilities between states.
model.emissionprob_ = np.array(emission_probability) # Set probability of emitting a given symbol when in each state.
seen_list = [i for i in range(len(noisy_distance))]
seen = np.array([seen_list]).T   #The real observation.

In [26]:
box2 = model.predict(seen)  #Find most likely state sequence corresponding to seen.
print ("The most likely path is", ", ".join(map(lambda x: str(free_nodes[x]), box2)))

The most likely path is (5, 3), (6, 3), (7, 3), (8, 3), (8, 2), (7, 2), (7, 1), (6, 1), (5, 1), (4, 1), (3, 1)
