# Import required libraries


In [None]:
import pandas as pd
import numpy as np
from sklearn import preprocessing

In [None]:
cd drive/MyDrive/EECS738-Project2/notebooks/

/content/drive/MyDrive/EECS738-Project2/notebooks


# Preprocess data

In [None]:
df = pd.read_csv("../data/Shakespeare_data.csv")
df.head()

Unnamed: 0,Dataline,Play,PlayerLinenumber,ActSceneLine,Player,PlayerLine
0,1,Henry IV,,,,ACT I
1,2,Henry IV,,,,SCENE I. London. The palace.
2,3,Henry IV,,,,"Enter KING HENRY, LORD JOHN OF LANCASTER, the ..."
3,4,Henry IV,1.0,1.1.1,KING HENRY IV,"So shaken as we are, so wan with care,"
4,5,Henry IV,1.0,1.1.2,KING HENRY IV,"Find we a time for frighted peace to pant,"


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 111396 entries, 0 to 111395
Data columns (total 6 columns):
 #   Column            Non-Null Count   Dtype  
---  ------            --------------   -----  
 0   Dataline          111396 non-null  int64  
 1   Play              111396 non-null  object 
 2   PlayerLinenumber  111393 non-null  float64
 3   ActSceneLine      105153 non-null  object 
 4   Player            111389 non-null  object 
 5   PlayerLine        111396 non-null  object 
dtypes: float64(1), int64(1), object(4)
memory usage: 5.1+ MB


***Let's look deeper at the datasets given***<br>
*column_name (type of column) 
    - description*

The shakespeare dataset contains the following columns :
- Dataline (int) ***seems useless***
    - A unique id for each line.
- Play (str) 
    - Name of the play where the lines are from
- PlayerLineNumber (int) 
    - The real line number that are being spoken 
    - Difference with ActSceneLine's Line
        - PlayerLineNumber is one whole long sentence and the ActSceneLine is the small cut part of PlayerLineNumber
        - PlayerLinenNumber resets in new scene
- ActSceneLine (str) 
    - Comes in 1.1.1 (Act is 1, Scene is 1, Line is 1)
- Player (str)
    - The character name in the play
- PlayerLine (str)
    - The line spoken by the player 
<br><br>


In [None]:
df = df.dropna()
df.reset_index(inplace=True,drop=True)
df.drop(["Dataline","Play","PlayerLinenumber","ActSceneLine","Player"], axis=1, inplace=True)
df.head()

Unnamed: 0,PlayerLine
0,"So shaken as we are, so wan with care,"
1,"Find we a time for frighted peace to pant,"
2,And breathe short-winded accents of new broils
3,To be commenced in strands afar remote.
4,No more the thirsty entrance of this soil


In [None]:
df["PlayerLine"] = df["PlayerLine"].str.replace(r'[^\w\s]', '').str.lower()
df.head()

Unnamed: 0,PlayerLine
0,so shaken as we are so wan with care
1,find we a time for frighted peace to pant
2,and breathe shortwinded accents of new broils
3,to be commenced in strands afar remote
4,no more the thirsty entrance of this soil


# Build our Hidden Markov Model
- Our emission will be the word from the text
- Our hidden state will be the starting letter of the word. (A better hidden state would be Noun, Verb, etc. Meaningful hidden state will helpful in building a better HMM model)

In [None]:
list_of_token = []
list_of_token_starting_letter = []
for line in df["PlayerLine"]:
  for word in line.split():
    list_of_token.append(word)
    list_of_token_starting_letter.append(word[0])

For simplicity only taking the first 100 words

In [None]:
# list_of_token = ["this", "is", "a", "apple", "a"]
# list_of_token_starting_letter = ["t", "i", "a", "a", "a" ]

list_of_token = list_of_token[:100]
list_of_token_starting_letter = list_of_token_starting_letter[:100]

Change strings to numbers for easier usage

In [None]:
LE_1 = preprocessing.LabelEncoder()
list_of_token = LE_1.fit_transform(list_of_token)

LE_2 = preprocessing.LabelEncoder()
list_of_token_starting_letter = LE_2.fit_transform(list_of_token_starting_letter)

## Calculation of initial probability, transition probability, emission probabilty

In [None]:
initial_state_matrix = []
length= len(list_of_token)

for i in sorted(set(list_of_token)):
  number_of_token =  len(np.where(list_of_token==i)[0])
  initial_state_matrix.append(number_of_token/length)

In [None]:
initial_hidden_state_matrix = []
length= len(list_of_token_starting_letter)

for i in sorted(set(list_of_token_starting_letter)):
  number_of_token =  len(np.where(list_of_token_starting_letter==i)[0])
  initial_hidden_state_matrix.append(number_of_token/length)

In [None]:
# Transition matrix
transition_matrix = []
number_of_hidden_states = 1 + int(max(list_of_token_starting_letter))
temp_matrix = np.zeros(shape=(number_of_hidden_states,number_of_hidden_states))

temp = True
for i in range(0,number_of_hidden_states):
    list_of_index = list(np.where(list_of_token_starting_letter==i)[0])
    for index in list_of_index:
        if((index+1) != len(list_of_token_starting_letter)):
            temp_matrix[i,list_of_token_starting_letter[index+1]] += 1/len(list_of_index)

transition_matrix = temp_matrix

In [None]:
# Emission Matrix
emission_matrix = []
number_of_states = 1 + int(max(list_of_token))
temp_matrix = np.zeros(shape=(number_of_hidden_states,number_of_states))

temp = True
for i in range(0,number_of_states):
    list_of_index = list(np.where(list_of_token==i)[0])
    for index in list_of_index:
      word = LE_1.inverse_transform([list_of_token[index]])[0]
      l_idx = LE_2.transform([word[0]])[0]
      
      temp_matrix[l_idx, list_of_token[index]] += 1

temp_matrix /= np.expand_dims(temp_matrix.sum(1), axis=1)

emission_matrix = temp_matrix

## We will use laplace smoothing 
- Laplace smoothing is a smoothing technique that handles the problem of zero probability in Naïve Bayes. 

In [None]:
def laplace_smoothing(matrix, count):
  matrix[matrix!=0] = matrix[matrix!=0] + (1/count)
  matrix[matrix==0] = 1/count
  return matrix

In [None]:
transition_matrix = laplace_smoothing(transition_matrix, len(list_of_token_starting_letter))
emission_matrix = laplace_smoothing(emission_matrix, len(list_of_token))

## Forward algorithm
Calculate the total probability of all the observations (from t_1 ) up to time t

In [None]:
def forward(observations, transition_matrix, emission_matrix, initial_state_matrix):
  T = len(observations)
  M = len(transition_matrix)
  alpha = np.zeros((T, M))
  alpha[0, :] = [initial_state_matrix[i] * emission_matrix[i][0] for i in range(len(emission_matrix))]
  for i in range(1, T):
    for j in range(M):
        alpha[i, j] = alpha[i-1].dot(transition_matrix[:][ j]) * emission_matrix[j][ observations[i]]

  total_alpha = sum(max(alpha[i]) for i in range(T))
  return total_alpha

## Backward algorithm
Similarly calculate total probability of all the observations from final time (T) to t.

In [None]:
def backward(observations, transition_matrix, emission_matrix):
  T = len(observations)
  M = len(transition_matrix)

  beta = np.zeros((T,M ))
  beta[T - 1] = np.ones((M))

  for i in range(T - 2, -1, -1): # backward from t-1 not t-2
      for j in range(M): 
        beta[i, j] = (beta[i+1] * emission_matrix[:, list_of_token[i+1]]).dot(transition_matrix[j, :])
      
  total_beta = sum(max(beta[i]) for i in range(T))
  return total_beta

## Viterbi Decoding
- parsing a sequence into the optimal series of hidden states
- GIVEN  a sequence (list_of_token) and HMM model with transition_matrix, emission_matrix, initial_hidden_state_matrix
- OUTPUT the sequence hidden states that maximizes probability
- Viterbi algorithm, dynamic programming, max score over all paths, trace pointers find path

In [None]:
def viterbi_decoding(list_of_token, transition_matrix=transition_matrix, emission_matrix=emission_matrix, initial_hidden_state_matrix=initial_hidden_state_matrix):
  T = len(list_of_token)
  M = transition_matrix.shape[0]

  omega = np.zeros((T, M))
  omega[0, :] = np.log(initial_hidden_state_matrix * emission_matrix[:, list_of_token[0]])

  #forward
  prev = np.zeros((T - 1, M))
  for i in range(1, T):
    for j in range(M):
      probability = omega[i - 1] + np.log(transition_matrix[:, j]) + np.log(emission_matrix[j, list_of_token[i]])
      prev[i-1, j] = np.argmax(probability)
      omega[i, j] = np.max(probability)

  S = np.zeros(T)
  last_state = np.argmax(omega[T - 1, :])

  #backward
  S[0] = last_state
  backtrack_index = 1
  for i in range(T - 2, -1, -1):
      S[backtrack_index] = prev[i, int(last_state)]
      last_state = prev[i, int(last_state)]
      backtrack_index += 1
      
  S = np.flip(S, axis=0)
  S = S.astype(int)
  hidden_state_list = LE_2.inverse_transform(S)
  
  return hidden_state_list

In [None]:
sentence = "to be "
observations = []
for word in sentence.split():
  try:
    le_class = LE_1.transform([word])[0]
    observations.append(le_class)
  except:
    raise ValueError("Cannot find the word at the corpus")

In [None]:
sentence_hidden_state = viterbi_decoding(observations)
print(f"Hidden state of '{sentence}'")
print(sentence_hidden_state)

Hidden state of 'to be '
['t' 'b']


Looks good! We will use this as a base


## Generate future sentence
With viterbi algorithm, we get the best future sentence


In [None]:
import numpy as np
def ViterbiAlgo(transition_matrix, emission_matrix, observations):
    T = len(observations)
    M = len(transition_matrix)

    probs = np.zeros(M)

    paths = np.zeros( (M, T+1 ))
    paths[:, 0] = np.arange(M)

    for obs_ind, obs_val in enumerate(observations):
        for state_ind in range(M):
            val = 0
            if obs_val< np.size(emission_matrix,1):
                val = np.log(emission_matrix[state_ind][obs_val])
            temp_probs = probs + val + np.log(transition_matrix[:][state_ind])
            best_temp_ind = np.argmax(temp_probs)
            paths[state_ind,:] = paths[best_temp_ind,:]
            paths[state_ind,(obs_ind+1)] = state_ind
            probs[state_ind] = temp_probs[best_temp_ind]
    best_path_ind = np.argmax(probs)
    
    return (paths[best_path_ind].astype(int), probs[best_path_ind])

In [None]:
generated_sequence , _ =  ViterbiAlgo(transition_matrix, emission_matrix, observations)

given_seq = LE_1.inverse_transform(observations)
print(f"Future sequence of '{' '.join(given_seq)}' is")
ans = np.append(given_seq,LE_1.inverse_transform(generated_sequence))
print(" ".join(ans))

Future sequence of 'to be' is
to be care care accents


Quite gibberish. But, I am satisfied.

## Generate Best Sequence with length 3

In [None]:
import operator
def forward_backward(states, s_pro, t_pro, e_pro, length=3):
  path = { s:[] for s in states} # init path: path[s] represents the path ends with s
  path_prob = { s:[] for s in states} 

  num_hidden_state  = len(e_pro)
  num_emission = len(e_pro[0])

  all_prob = {}
  for i in range(length):
    all_prob[i] = {}
    for emis_ind in range(num_emission):
      all_prob[i][emis_ind] = []

  # Initialize prob
  initial_prob = {} 
  for emis_ind in range(num_emission): # words (emissions)
    max_emission_prob , max_state = -1, -1
    for s in states:
      start = (s_pro[s])
      emis = e_pro[s][emis_ind]
      result = start*emis
      if max_emission_prob < result:
        max_emission_prob = result
        max_state = s
    all_prob[0][emis_ind].append((max_emission_prob , max_state))

  # Get the rest of the probability
  for i in range(1, length):
    last_pro = all_prob[i-1]
    for emis_ind in range(num_emission):
      for curr_state in states:
        # Get maximum 
        max_pro , max_last_sta = -1, None
        temp_emis_ind = 0
        for last_state in states:
          prev_prob, prev_state = last_pro[temp_emis_ind][0]
          trans = t_pro[last_state][curr_state]
          emis = e_pro[curr_state][emis_ind]
          result = prev_prob*trans*emis
          temp_emis_ind +=1
          if max_pro < result:
            max_pro = result
            max_last_sta = last_state

        all_prob[i][emis_ind].append((max_pro , max_last_sta))
  
  ans = []
  for i in range(length):
    max_prob, max_emis_index = -1 , -1
    for key in all_prob[i]:
      for prob, _ in all_prob[i][key]:
        if max_prob < prob:
          max_prob = prob
          max_emis_index = key
    ans.append(max_emis_index)
  return ans

In [None]:
generated_sequence = forward_backward(list(set(list_of_token_starting_letter)), initial_state_matrix, transition_matrix, emission_matrix  )
generated_sequence = LE_1.inverse_transform(generated_sequence)
print("Generated sequence is ")
print(" ".join(generated_sequence))

Generated sequence is 
entrance of of
