# HW 3-Part-of-Speech Tagging with HMMs + Decoding Techniques (Greedy and Viterbi)

- Detravious Jamari Brinkley
- CSCI-544: Applied Natural Language Processing
- python version: 3.11.4

---

1. Part-of-Speech (POS) Tagging [a type of sequence labelling task where of a given word, assign the part of speech]
2. HMMs (Hidden Markov Model) [a generative-based model that's used for POS Tagging]
    1. Generative-based [provides the probabilities for all possible combinations of values of variables in the set using the joint distribution]
    2. With POS Tagging: Given a sequence of observations (sentences), the task is to infer the most likely sequence of hidden states (POS Tags) that could have generated the observed data.
3. **Decoding Techniques:**
    1. Greedy [find the optimal (OPT) solution at each step]
    2. Viterbi [make use of dynammic programming to find the OPT solution with backtracking while searching the entire search space]
4. **Notes of the data and given files:**
    - Dataset: Wall Street Journal section of the Penn Treebank
    - Folder named `data` with the following files:
        1. `train`, sentences *with* human-annotated POS Tags
        2. `dev`, sentences *with* human-annotated POS Tags
        3. `test`, sentences *without* POS Tags, thus predict the POS Tags
    - Format: Blank like at the end of each sentence. Each line contains 3 items separated by the `\t`, the tab symbol. These three items are
        1. Index of the word in the sentence
        2. Word type
        3. POS Tag



In [1]:
import sys

import numpy as np
import pandas as pd

from tqdm import tqdm
from collections import defaultdict

# Load and Update Data
- [x] Find a way to separate sentences when loading the df.

In [2]:
def load_data(file_path: str, file_name: str, config_index: bool = True):
    
    if config_index == True:
        file =  file_path + file_name
        open_df = pd.read_table(file, sep = "\t", names=['Index', 'Word', 'POS Tag'], skip_blank_lines=False)
        # open_df = open_df.set_index('1')
        
    return open_df

In [3]:
def update_df_columns(df: pd.DataFrame, new_columns_name: list, about: str) -> pd.DataFrame:  
    """Update the columns of the dataframe if first column is data needed"""  

    original_index = df.index.copy()
    N_columns = len(df.columns.to_list())


    print(about, "has 3 columns")
    dummy_row = pd.DataFrame([['0.0', ' ', 'dummy']], columns=df.columns)
    # word = df.columns.to_list()[1]
    # pos_tag = df.columns.to_list()[2]
    # new_row = pd.DataFrame([['0.0', word, pos_tag]], columns=df.columns)
    df = pd.concat([dummy_row, df], ignore_index=True)
    df.columns = new_columns_name
    df.fillna("dummy", inplace=True)

    print("Update complete\n") 
    # new_index = original_index.append(pd.Index(range(len(df) - len(original_index))))
    # df = df.set_index(new_index)
   
    return df

In [4]:
train_df = load_data('data/', 'train')
dev_df = load_data('data/', 'dev')
test_df = load_data('data/', 'test')

train_df

Unnamed: 0,Index,Word,POS Tag
0,1.0,Pierre,NNP
1,2.0,Vinken,NNP
2,3.0,",",","
3,4.0,61,CD
4,5.0,years,NNS
...,...,...,...
950307,22.0,to,TO
950308,23.0,San,NNP
950309,24.0,Francisco,NNP
950310,25.0,instead,RB


In [5]:
all_pos_tags = train_df['POS Tag'].unique()
all_pos_tags

array(['NNP', ',', 'CD', 'NNS', 'JJ', 'MD', 'VB', 'DT', 'NN', 'IN', '.',
       nan, 'VBZ', 'VBG', 'CC', 'VBD', 'VBN', 'RB', 'TO', 'PRP', 'RBR',
       'WDT', 'VBP', 'RP', 'PRP$', 'JJS', 'POS', '``', 'EX', "''", 'WP',
       ':', 'JJR', 'WRB', '$', 'NNPS', 'WP$', '-LRB-', '-RRB-', 'PDT',
       'RBS', 'FW', 'UH', 'SYM', 'LS', '#'], dtype=object)

In [6]:
updated_test_df = test_df.drop(['POS Tag'], axis=1)
updated_test_df

Unnamed: 0,Index,Word
0,1.0,Influential
1,2.0,members
2,3.0,of
3,4.0,the
4,5.0,House
...,...,...
135110,26.0,them
135111,27.0,here
135112,28.0,with
135113,29.0,us


In [7]:
two_columns_name = ['Index', 'Word', 'POS Tag']
# one_columns_name = ['Index', 'Word', 'POS Tag']

updated_train_df = update_df_columns(train_df, two_columns_name, "Train data")
updated_dev_df = update_df_columns(dev_df, two_columns_name, "Dev data")
# updated_test_df = update_df_columns(test_df, one_columns_name, "Test data")

Train data has 3 columns
Update complete

Dev data has 3 columns
Update complete



In [8]:
updated_test_df.head(33)
# updated_train_df.tail(5)
# updated_train_df

# updated_train_df[updated_train_df['POS Tag'] == "dummy"].shape

Unnamed: 0,Index,Word
0,1.0,Influential
1,2.0,members
2,3.0,of
3,4.0,the
4,5.0,House
5,6.0,Ways
6,7.0,and
7,8.0,Means
8,9.0,Committee
9,10.0,introduced


# Outline of Tasks

1. Vocabulary Creation
2. Model Learning
3. Greedy Decoding with HMM
4. Viterbi Decoding with HMM


# 1. Vocabulary Creation

- **Problem:** Creating vocabulary to handle unkown words.
    - **Solution:** Replace rare words wtih whose occurrences are less than a threshold (ie: 3) with a special token `< unk >`

---

1. [ ] Create a vocabulary using the training data in the file train
2. [ ] Output the vocabulary into a txt file named `vocab.txt`
    - [ ] See PDF on how to properly format vocabulary file
3. [ ] Questions
    1. [ ] What is the selected threshold for unknown words replacement?
    2. [ ] What is the total size of your vocabulary?
    3. [ ] What is the total occurrences of the special token `< unk >`after replacement?

In [9]:
# siddhant
# shivam

In [10]:
true_false_series = updated_train_df['Word'].value_counts()
print(true_false_series)

Word
,           46476
the         39533
dummy       38234
.           37452
of          22104
            ...  
Birthday        1
Happy           1
Bertie          1
crouched        1
Huricane        1
Name: count, Length: 43193, dtype: int64


In [11]:
vocab_df = pd.DataFrame(true_false_series)
vocab_df.reset_index(inplace = True)

In [12]:
true_false_series = vocab_df['count'] > 3

updated_vocab_df = vocab_df.loc[true_false_series == True]
updated_false_vocab_df = vocab_df.loc[true_false_series == False]
updated_false_vocab_df['Word'] = ' <unk> '
print()
N_updated_false_vocab_df = len(updated_false_vocab_df)
N_updated_false_vocab_df
new_row = pd.DataFrame([['<unk>', N_updated_false_vocab_df]], columns=updated_vocab_df.columns)
new_row
df = pd.concat([new_row, updated_vocab_df], ignore_index=True)
N_vocab = range(0, len(updated_vocab_df)+1)

df['index'] = N_vocab

df = df.reindex(columns=['Word', 'index', 'count'])
df
# df.to_csv('vocab.txt', header=None, index=None, sep='\t')




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  updated_false_vocab_df['Word'] = ' <unk> '


Unnamed: 0,Word,index,count
0,<unk>,0,29443
1,",",1,46476
2,the,2,39533
3,dummy,3,38234
4,.,4,37452
...,...,...,...
13746,trafficking,13746,4
13747,7.62,13747,4
13748,gut,13748,4
13749,17.3,13749,4


In [13]:
df

Unnamed: 0,Word,index,count
0,<unk>,0,29443
1,",",1,46476
2,the,2,39533
3,dummy,3,38234
4,.,4,37452
...,...,...,...
13746,trafficking,13746,4
13747,7.62,13747,4
13748,gut,13748,4
13749,17.3,13749,4


In [14]:
# df[df[word]

# 2. Model Learning

- Learn an HMM from the training data
- **HMM Parameters:**
  <div style="text-align: center;">

    $
    \text{Transition Probability (} t \text{)}: \quad t(s' \mid s) = \frac{\text{count}(s \rightarrow s')}{\text{count}(s)}
    $

    $
    \text{Emission Probability (} e \text{)}: \quad e(x \mid s) = \frac{\text{count}(s \rightarrow x)}{\text{count}(s)}
    $

  </div>

---

1. [x] Learn a model using the training data in the file train
2. [ ] Output the learned model into a model file in json format, named `hmm.json`. The model file should contains two dictionaries for the emission and transition parameters, respectively.
    1. [ ] 1st dictionary: Named transition, contains items with pairs of (s, s′) as key and t(s′|s) as value. 
    2. [ ] 2nd dictionary: Named emission, contains items with pairs of (s, x) as key and e(x|s) as value.
3. Question
    1. [ ] How many transition and emission parameters in your HMM?


In [15]:
updated_train_df.head(20)

Unnamed: 0,Index,Word,POS Tag
0,0.0,,dummy
1,1.0,Pierre,NNP
2,2.0,Vinken,NNP
3,3.0,",",","
4,4.0,61,CD
5,5.0,years,NNS
6,6.0,old,JJ
7,7.0,",",","
8,8.0,will,MD
9,9.0,join,VB


In [16]:
def hmm(df):
    transition_states = defaultdict(int)
    emission_state_word = defaultdict(int)
    N_state = defaultdict(int)
    

    df['Previous_POS Tag'] = df['POS Tag'].shift(1) # previous state for trnasition probabilities

    # iterate through vocabulary
    for index, row in tqdm(df.iterrows(), total=df.shape[0]):

        emission_state_word[(row["POS Tag"], row["Word"])] += 1
        # transition count + 1
        if pd.notnull(row['Previous_POS Tag']):  # Check if it's not NaN
            transition_states[(row["Previous_POS Tag"], row['POS Tag'])] += 1

        # increment tag when I see it
        N_state[(row["POS Tag"])] += 1

    # print(emission_state_word)
    # print(transition_states)
    # print(N_state)

    return emission_state_word, transition_states, N_state

In [17]:
emissions, transitions, N_states = hmm(updated_train_df)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 950313/950313 [00:51<00:00, 18578.04it/s]


In [18]:
# N_states

In [19]:
def calculate_t_prob(transitions, N_states):    
    # Calculate probabilities
    transition_probs = {} # dictionary definition
    for key,value in transitions.items(): # iterate through dicitionary 

        curr_state = key[0]
        # print('current state: ', curr_state, "\nKey: ", key)
        # print("Value of dictionary at the index: ", value,'\nNumber of times this state has been the current state: ', N_states[curr_state])

        # count(given s, find s') / given s
        transition_probs[key] = value / N_states[curr_state]

        # count(given s, find w) / given s
        
        # how many times you've seen the (s => s') = v / how many times you've seen the current state , s  
        # break 
    # print(transition_probs)

    return transition_probs

# Calculate emission probabilities

In [20]:
t_probs = calculate_t_prob(transitions, N_states)
# t_probs

In [21]:
list(t_probs.items())[:3]

[(('dummy', 'NNP'), 0.19789104610393007),
 (('NNP', 'NNP'), 0.3782645420509543),
 (('NNP', ','), 0.13846908958086018)]

In [22]:
list(emissions.items())[:3]

[(('dummy', ' '), 1), (('NNP', 'Pierre'), 6), (('NNP', 'Vinken'), 2)]

In [23]:
def calculate_e_prob(emissions, N_states):
    # Calculate probabilities
    emissions_probs = {} # dictionary definition
    for key, value in emissions.items(): # iterate through dicitionary 
        curr_state = key[0]
        # count(given s, find w) / given s
        emissions_probs[key] = value / N_states[curr_state]

    return emissions_probs

In [24]:
e_probs = calculate_e_prob(emissions, N_states)
# e_probs

In [25]:
list(e_probs.items())[:3]

[(('dummy', ' '), 2.6165681092678842e-05),
 (('NNP', 'Pierre'), 6.84868961738654e-05),
 (('NNP', 'Vinken'), 2.2828965391288468e-05)]

# 3. Greedy Decoding with HMM

1. [ ] Implement the greedy decoding algorithm
2. [ ] Evaluate it on the development data
3. [ ] Predicting the POS Tags of the sentences in the test data
4. [ ] Output the predictions in a file named `greedy.out`, in the same format of training data
5. [ ] Evaluate the results of the model on `eval.py` in the terminal with `python eval.py − p {predicted file} − g {gold-standard file}`
6. [ ] Question
    1. [ ] What is the accuracy on the dev data? 

In [259]:
# updated_dev_df.head(40)

In [255]:
def greedy_decoding(dev_df: pd.DataFrame, t_probs: dict, e_probs: dict, N_pos_tags: np.array):
    """Implement greedy decoding on the development file (words only) using the transition probability and emission probability. 
    Furthermore, don't use POS Tag of development file, thus only use POS Tag from training data.

    Parameters
    ----------
    df: `pd.DataFrame`
        Dev file

    t_probs: `py dict`
        Tranision probabilities for POS Tag given previous POS Tag

    e_probs: `py dict`
        Emission probabilities for Word given POS Tags

    N_pos_tags: `np.array`
        All POS Tags found in the training file
    
    Return
    ------
    
    """

    previous_pos_tag = "dummy"
    all_max_scores_with_pos_tag = {}
    
    for index, row in tqdm(dev_df.iterrows(), total=dev_df.shape[0]):
        # print("index", index, "with word", row['Word'])
        # print("with POS tag from dev", row['POS Tag'])


        store_scores = []
        for N_pos_tags_idx in range(len(N_pos_tags)):
            current_pos_tag = N_pos_tags[N_pos_tags_idx]
            # print("--- Current POS Tag: ", current_pos_tag)

            """Transition"""
            t_find_pos_tag = current_pos_tag
            t_given_pos_tag = previous_pos_tag
            # print(f"--- t({t_find_pos_tag} | {t_given_pos_tag})")
            
            """Emission"""
            e_word = row['Word']
            e_given_pos_tag = current_pos_tag
            # print(f"--- e({e_given_pos_tag} | {e_word})") # order this way to match e_probs dictionary
            
            """Transition * Emission"""
            t_key = (t_find_pos_tag, t_given_pos_tag)
            e_key = (e_given_pos_tag, e_word)
            # print(t_key in t_probs, e_key in e_probs)

            # IF-ELSE bc not all pairs will be found. If pair is found, use score, otherwise (pair isn't found) set score to 0.0.
            if t_key in t_probs and e_key in e_probs:
                t = t_probs[t_key]
                e = e_probs[e_key]
                score = t * e
                # print(f"--- t({t_find_pos_tag} | {t_given_pos_tag}) * e({e_word} | {e_given_pos_tag}) = {score}")
                
            else:
                t = 0.0
                e = 0.0
                score = t * e
                # print(f"--- t({t_find_pos_tag} | {t_given_pos_tag}) * e({e_word} | {e_given_pos_tag}) = {score}")
                        
            store_scores.append(score)
        # print(f"Scores", store_scores)
        max_score_idx = np.argmax(np.array(store_scores)) # use argmax to get the index of max score
        # all_max_scores_with_pos_tag[max_score_idx] = max_score_idx
        current_pos_tag = N_pos_tags[max_score_idx] # use the index of the max score to find which POS Tag to update to
        # print("Updated POS Tag", current_pos_tag)
        # print()
        
    
    # return all_max_scores_with_pos_tag

In [256]:
greedy_decoding(updated_dev_df, t_probs, e_probs, all_pos_tags)

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 137295/137295 [00:23<00:00, 5852.38it/s]


# 4. Viterbi Decoding with HMM

1. [ ] Implement the viterbi decoding algorithm
2. [ ] Evaluate it on the development data
3. [ ] Predict the POS Tags of the sentences in the test data
4. [ ] Output the predictions in a file named `viterbi.out`, in the same format of training data
5. [ ] Question
    1. [ ] What is the accuracy on the dev data?

In [None]:
# updated_dev_df.head(40)

In [263]:
def viterbi_decoding(dev_df: pd.DataFrame, t_probs: dict, e_probs: dict, N_pos_tags: np.array):
    """Implement greedy decoding on the development file (words only) using the transition probability and emission probability. 
    
    
    ??? Furthermore, don't use POS Tag of development file, thus only use POS Tag from training data.

    Parameters
    ----------
    df: `pd.DataFrame`
        Dev file

    t_probs: `py dict`
        Tranision probabilities for POS Tag given previous POS Tag

    e_probs: `py dict`
        Emission probabilities for Word given POS Tags

    N_pos_tags: `np.array`
        All POS Tags found in the training file
    
    Return
    ------
    
    """
    
    previous_pos_tag = "dummy"
    v_pi = {}

    """Base cases"""
    for index, row in tqdm(dev_df.iterrows(), total=dev_df.shape[0]):
        print("index", index, "with word", row['Word'], "and POS tag from dev", row['POS Tag'])
        
        current_pos_tag = row['POS Tag']
        v_pi_key = (1, current_pos_tag)

        """Base case
        pi[1, all_pos_tags_in_dev_file] = t(all_pos_tags_in_dev_file | s) * e(words | all_pos_tags_in_dev_file)
        
        pi[1, all_pos_tags_in_dev_file] = t(t_find_pos_tag | t_given_pos_tag) * e(e_word | e_given_pos_tag)
        
        pi[1, dummy] = t(_ | dummy) * e(The | dummy) = s1
        pi[1, DT] = t(_ | DT) * e(The | DT) = s2
        pi[1, NNP] = t(_| NNP) * e(The | NNP) = s3
        
        ...reset score to 0 bc we update POS Tag, dummy -> s1

        """
        
        """Transition"""
        t_given_pos_tag = 
        t_find_pos_tag = 
        t_key = (t_given_pos_tag, t_find_pos_tag)

        """Emission"""
        
        v_pi_value = 
        v_pi[v_pi_key] = v_pi_value

        """Transition"""
        
        # current_pos_tag = N_pos_tags[max_score_idx]
        print()

In [264]:
viterbi_decoding(updated_dev_df[:3], t_probs, e_probs, all_pos_tags)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 4800.81it/s]

index 0 with word   and POS tag from dev dummy

index 1 with word The and POS tag from dev DT

index 2 with word Arizona and POS tag from dev NNP




