## 利用HMM來實現pos演算法


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install conllu

from io import open
from conllu import parse_incr
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import re
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence

data_file = open("/content/drive/My Drive/zh_gsd-ud-train.conllu", "r", encoding="utf-8")

# save dataset
sentences_list = []
pos_list = []
start_token = '--s--'

for tokenlist in parse_incr(data_file):
    temp_str = []
    temp_pos = []
    temp_str.append(start_token)
    temp_pos.append(start_token)
    for s in tokenlist:
        temp_str.append(s['form'])
        temp_pos.append(s['upos'])
    sentences_list.append(temp_str)
    pos_list.append(temp_pos)

Collecting conllu
  Downloading https://files.pythonhosted.org/packages/ae/be/be6959c3ff2dbfdd87de4be0ccdff577835b5d08b1d25bf7fd4aaf0d7add/conllu-4.4-py2.py3-none-any.whl
Installing collected packages: conllu
Successfully installed conllu-4.4


In [None]:
# vocab: 將詞記錄在字典中
vocab = {}
cnt_word = 0

for sentence in sentences_list: 
    for word in sentence:
      if word not in vocab:
        vocab[word] = cnt_word
        cnt_word += 1
    
print("字典:")
cnt = 0
for k,v in vocab.items():
    print(f"{k}:{v}")
    cnt += 1
    if cnt > 20:
        break

Vocabulary dictionary, key is the word, value is a unique integer
--s--:0
看似:1
簡單:2
，:3
只:4
是:5
二:6
選:7
一:8
做:9
決擇:10
但:11
其實:12
他們:13
代表:14
的:15
你:16
周遭:17
親朋:18
好友:19
試:20


#### Transition counts
- The first dictionary is the `transition_counts` dictionary which computes the number of times each tag happened next to another tag. 

This dictionary will be used to compute: 
$$P(t_i |t_{i-1}) \tag{1}$$

This is the probability of a tag at position $i$ given the tag at position $i-1$.

In order for you to compute equation 1, you will create a `transition_counts` dictionary where 
- The keys are `(prev_tag, tag)`
- The values are the number of times those two tags appeared in that order. 

#### Emission counts

The second dictionary you will compute is the `emission_counts` dictionary. This dictionary will be used to compute:

$$P(w_i|t_i)\tag{2}$$

In other words, you will use it to compute the probability of a word given its tag. 

In order for you to compute equation 2, you will create an `emission_counts` dictionary where 
- The keys are `(tag, word)` 
- The values are the number of times that pair showed up in your training set. 

#### Tag counts

The last dictionary you will compute is the `tag_counts` dictionary. 
- The key is the tag 
- The value is the number of times each tag appeared.

In [None]:
import pandas as pd
from collections import defaultdict
import math
import numpy as np

def create_dictionaries(sentences_list, pos_list):
    
    # initialize the dictionaries using defaultdict
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    sentence_len = len(sentences_list)
    
    # use 'i' to track the line number in the corpus
    i = 0 
    
    # Each item in the training corpus contains a word and its POS tag
    # Go through each word and its tag in the training corpus
    for sentence_idx in range(sentence_len):
      prev_tag = pos_list[sentence_idx][0]
      tag_counts[prev_tag] += 1
      for word_tag_idx in range(1, len(sentences_list[sentence_idx])):
        
        # Increment the word_tag count
        i += 1
        
        # Every 5,000 words, print the word count
        if i % 5000 == 0:
            print(f"word count = {i}")

        # get the word and tag using the get_word_tag helper function (imported from utils_pos.py)
        word, tag = sentences_list[sentence_idx][word_tag_idx], pos_list[sentence_idx][word_tag_idx]
        
        # Increment the transition count for the previous word and tag
        transition_counts[(prev_tag, tag)] += 1
        
        # Increment the emission count for the tag and word
        emission_counts[(tag, word)] += 1

        # Increment the tag count
        tag_counts[tag] += 1

        # Set the previous tag to this tag (for the next iteration of the loop)
        prev_tag = tag
        
    return emission_counts, transition_counts, tag_counts

In [None]:
emission_counts, transition_counts, tag_counts = create_dictionaries(sentences_list, pos_list)

word count = 5000
word count = 10000
word count = 15000
word count = 20000
word count = 25000
word count = 30000
word count = 35000
word count = 40000
word count = 45000
word count = 50000
word count = 55000
word count = 60000
word count = 65000
word count = 70000
word count = 75000
word count = 80000
word count = 85000
word count = 90000
word count = 95000


In [None]:
# get all the POS states
states = sorted(tag_counts.keys())
print(f"Number of POS tags (number of 'states'): {len(states)}")
print("View these POS tags (states)")
print(states)

Number of POS tags (number of 'states'): 16
View these POS tags (states)
['--s--', 'ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SYM', 'VERB', 'X']


In [None]:
print("transition examples: ")
for ex in list(transition_counts.items())[:3]:
    print(ex)
print()

print("emission examples: ")
for ex in list(emission_counts.items())[200:203]:
    print (ex)

transition examples: 
(('--s--', 'AUX'), 8)
(('AUX', 'ADJ'), 217)
(('ADJ', 'PUNCT'), 464)

emission examples: 
(('PUNCT', '：'), 86)
(('PUNCT', '「'), 332)
(('VERB', '吃'), 9)


In [None]:
def create_transition_matrix(alpha, tag_counts, transition_counts):

    # Get a sorted list of unique POS tags
    all_tags = sorted(tag_counts.keys())
    
    # Count the number of unique POS tags
    num_tags = len(all_tags)
    
    # Initialize the transition matrix 'A'
    A = np.zeros((num_tags, num_tags))
    
    # Get the unique transition tuples (previous POS, current POS)
    trans_keys = set(transition_counts.keys())
    
    # Go through each row of the transition matrix A
    for i in range(num_tags):
        
        # Go through each column of the transition matrix A
        for j in range(num_tags):

            # Initialize the count of the (prev POS, current POS) to zero
            count = 0
        
            # Define the tuple (prev POS, current POS)
            # Get the tag at position i and tag at position j (from the all_tags list)
            key = (all_tags[i], all_tags[j])

            # Check if the (prev POS, current POS) tuple 
            # exists in the transition counts dictionary
            if key in transition_counts: #complete this line
                
                # Get count from the transition_counts dictionary 
                # for the (prev POS, current POS) tuple
                count = transition_counts.get(key)
                
            # Get the count of the previous tag (index position i) from tag_counts
            count_prev_tag = tag_counts[all_tags[i]]
            
            # Apply smoothing using count of the tuple, alpha, 
            # count of previous tag, alpha, and total number of tags
            A[i,j] = (count + alpha)/(count_prev_tag + alpha*num_tags)
    
    return A

In [None]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)
# Testing your function
print(f"A at row 0, col 0: {A[0,0]:.9f}")
print(f"A at row 3, col 1: {A[3,1]:.4f}")

print("View a subset of transition matrix A")
A_sub = pd.DataFrame(A[1:10,1:10], index=states[1:10], columns = states[1:10] )
print(A_sub)

A at row 0, col 0: 0.000000250
A at row 3, col 1: 0.0709
View a subset of transition matrix A
            ADJ       ADP       ADV  ...      NOUN       NUM      PART
ADJ    0.024929  0.007356  0.016756  ...  0.325295  0.021251  0.346545
ADP    0.024702  0.017424  0.034407  ...  0.234891  0.132995  0.043450
ADV    0.070914  0.088152  0.089243  ...  0.031639  0.026402  0.001528
AUX    0.075269  0.049601  0.037114  ...  0.194588  0.147415  0.014222
CCONJ  0.048376  0.009387  0.015163  ...  0.393498  0.105415  0.016607
DET    0.019213  0.010568  0.011528  ...  0.683948  0.156579  0.024977
NOUN   0.020666  0.051185  0.048786  ...  0.228910  0.046313  0.128829
NUM    0.006539  0.002429  0.001681  ...  0.899119  0.000934  0.014198
PART   0.028583  0.020981  0.025948  ...  0.459760  0.043989  0.062842

[9 rows x 9 columns]


In [None]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    
    # get the number of POS tag
    num_tags = len(tag_counts)
    
    # Get a list of all POS tags
    all_tags = sorted(tag_counts.keys())
    
    # Get the total number of unique words in the vocabulary
    num_words = len(vocab)
    
    # Initialize the emission matrix B with places for
    # tags in the rows and words in the columns
    B = np.zeros((num_tags, num_words))
    
    # Get a set of all (POS, word) tuples 
    # from the keys of the emission_counts dictionary
    emis_keys = set(list(emission_counts.keys()))

    
    # Go through each row (POS tags)
    for i in range(num_tags): # complete this line
        
        # Go through each column (words)
        for j in range(num_words): # complete this line

            # Initialize the emission count for the (POS tag, word) to zero
            count = 0
                    
            # Define the (POS tag, word) tuple for this row and column
            key = (all_tags[i], vocab[j])

            # check if the (POS tag, word) tuple exists as a key in emission counts
            if key in emission_counts: # complete this line
        
                # Get the count of (POS tag, word) from the emission_counts d
                count = emission_counts.get(key)
                
            # Get the count of the POS tag
            count_tag = tag_counts[all_tags[i]]
                
            # Apply smoothing and store the smoothed value 
            # into the emission matrix B for this row and column
            B[i,j] = (count+alpha)/(num_words*alpha + count_tag)

    return B

In [None]:
# creating your emission probability matrix. this takes a few minutes to run. 
B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

print(f"View Matrix position at row 0, column 0: {B[0,0]:.9f}")
print(f"View Matrix position at row 3, column 1: {B[3,1]:.9f}")

# Try viewing emissions for a few words in a sample dataframe
cidx  = ['其實','決擇','出身', '10']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['ADV','NOUN','VERB', 'NUM','PUNCT']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
B_sub = pd.DataFrame(B[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(B_sub)

View Matrix position at row 0, column 0: 0.000000249
View Matrix position at row 3, column 1: 0.000000217
                 其實            決擇            出身            10
ADV    1.956478e-03  2.173623e-07  2.173623e-07  2.173623e-07
NOUN   3.687912e-08  3.691600e-05  3.687912e-08  3.687912e-08
VERB   6.726482e-08  6.726482e-08  7.399802e-04  6.726482e-08
NUM    1.861985e-07  1.861985e-07  1.861985e-07  1.303408e-02
PUNCT  7.336427e-08  7.336427e-08  7.336427e-08  7.336427e-08


In [None]:
def initialize(states, tag_counts, A, B, corpus, vocab):

    # Get the total number of unique POS tags
    num_tags = len(tag_counts)
    
    # Initialize best_probs matrix 
    # POS tags in the rows, number of words in the corpus as the columns
    best_probs = np.zeros((num_tags, len(corpus)))
    
    # Initialize best_paths matrix
    # POS tags in the rows, number of words in the corpus as columns
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
    # Define the start token
    s_idx = states.index("--s--")
    
    # Go through each of the POS tags
    for i in range(num_tags): # complete this line
        
        # Handle the special case when the transition from start token to POS tag i is zero
        if A[s_idx, i] == 0: # complete this line
            
            # Initialize best_probs at POS tag 'i', column 0, to negative infinity
            best_probs[i,0] = float('-inf')
        
        # For all other cases when transition from start token to POS tag i is non-zero:
        else:
            
            # Initialize best_probs at POS tag 'i', column 0
            # Check the formula in the instructions above
            best_probs[i,0] = math.log(A[s_idx][i]) + math.log(B[i][vocab[corpus[0]]])

    return best_probs, best_paths

In [None]:
best_probs, best_paths = initialize(states, tag_counts, A, B, sentences_list[0], vocab)
# Test the function
print(f"best_probs[0,0]: {best_probs[0,0]:.4f}")
print(f"best_paths[2,3]: {best_paths[2,3]:.4f}")

best_probs[0,0]: -30.4065
best_paths[2,3]: 0.0000


In [None]:
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
    # Get the number of unique POS tags (which is the num of rows in best_probs)
    num_tags = best_probs.shape[0]
    
    # Go through every word in the corpus starting from word 1
    # Recall that word 0 was initialized in `initialize()`
    for i in range(1, len(test_corpus)): 
        
        # Print number of words processed, every 5000 words
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))
            
        ### START CODE HERE (Replace instances of 'None' with your code EXCEPT the first 'best_path_i = None') ###
        # For each unique POS tag that the current word can be
        for j in range(num_tags): # complete this line
            
            # Initialize best_prob for word i to negative infinity
            best_prob_i = float('-inf')
            
            # Initialize best_path for current word i to None
            best_path_i = None

            # For each POS tag that the previous word can be:
            for k in range(num_tags): # complete this line
            
                # Calculate the probability = 
                # best probs of POS tag k, previous word i-1 + 
                # log(prob of transition from POS k to POS j) + 
                # log(prob that emission of POS j is word i)
                prob = best_probs[k][i-1] + math.log(A[k][j]) + math.log(B[j][vocab[test_corpus[i]]]) 

                # check if this path's probability is greater than
                # the best probability up to and before this point
                if best_prob_i < prob: # complete this line
                    
                    # Keep track of the best probability
                    best_prob_i = prob
                    
                    # keep track of the POS tag of the previous word
                    # that is part of the best path.  
                    # Save the index (integer) associated with 
                    # that previous word's POS tag
                    best_path_i = k

            # Save the best probability for the 
            # given current word's POS tag
            # and the position of the current word inside the corpus
            best_probs[j,i] = best_prob_i
            
            # Save the unique integer ID of the previous POS tag
            # into best_paths matrix, for the POS tag of the current word
            # and the position of the current word inside the corpus.
            best_paths[j,i] = best_path_i

        ### END CODE HERE ###
    return best_probs, best_paths

In [None]:
# this will take a few minutes to run => processes ~ 30,000 words
best_probs, best_paths = viterbi_forward(A, B, sentences_list[0], best_probs, best_paths, vocab)

In [None]:
# Test this function 
print(f"best_probs[0,1]: {best_probs[0,1]:.4f}") 
print(f"best_probs[0,4]: {best_probs[0,4]:.4f}") 

best_probs[0,1]: -45.7716
best_probs[0,4]: -69.5671


In [None]:
def viterbi_backward(best_probs, best_paths, corpus, states):
    '''
    This function returns the best path.
    
    '''
    # Get the number of words in the corpus
    # which is also the number of columns in best_probs, best_paths
    m = best_paths.shape[1] 
    
    # Initialize array z, same length as the corpus
    z = [None] * m
    
    # Get the number of unique POS tags
    num_tags = best_probs.shape[0]
    
    # Initialize the best probability for the last word
    best_prob_for_last_word = float('-inf')
    
    # Initialize pred array, same length as corpus
    pred = [None] * m
    
    ### START CODE HERE (Replace instances of 'None' with your code) ###
    ## Step 1 ##
    
    # Go through each POS tag for the last word (last column of best_probs)
    # in order to find the row (POS tag integer ID) 
    # with highest probability for the last word
    for k in range(num_tags): # complete this line

        # If the probability of POS tag at row k 
        # is better than the previously best probability for the last word:
        if best_probs[k][m-1] > best_prob_for_last_word: # complete this line
            
            # Store the new best probability for the last word
            best_prob_for_last_word = best_probs[k][m-1]
    
            # Store the unique integer ID of the POS tag
            # which is also the row number in best_probs
            z[m - 1] = k
            
    # Convert the last word's predicted POS tag
    # from its unique integer ID into the string representation
    # using the 'states' list
    # store this in the 'pred' array for the last word
    pred[m - 1] = states[z[m - 1]]
    
    ## Step 2 ##
    # Find the best POS tags by walking backward through the best_paths
    # From the last word in the corpus to the 0th word in the corpus
    for i in range(m - 1, -1, -1): # complete this line
        
        # Retrieve the unique integer ID of
        # the POS tag for the word at position 'i' in the corpus
        pos_tag_for_word_i = z[i]
        
        # In best_paths, go to the row representing the POS tag of word i
        # and the column representing the word's position in the corpus
        # to retrieve the predicted POS for the word at position i-1 in the corpus
        z[i - 1] = best_paths[pos_tag_for_word_i][i]
        
        # Get the previous word's POS tag in string form
        # Use the 'states' list, 
        # where the key is the unique integer ID of the POS tag,
        # and the value is the string representation of that POS tag
        pred[i - 1] = states[z[i-1]]
        
     ### END CODE HERE ###
    return pred

In [None]:
# Run and test your function
pred = viterbi_backward(best_probs, best_paths, sentences_list[0], states)
m=len(pred)
print('The prediction for pred[-7:m-1] is: \n', sentences_list[0][-7:m-1], "\n", pred[-7:m-1], "\n")
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", sentences_list[0][0:7])
print('pos: ', pos_list[0][0:7])

The prediction for pred[-7:m-1] is: 
 ['，', '最後', '決定', '的', '還是', '自己'] 
 ['PUNCT', 'NOUN', 'VERB', 'PART', 'CCONJ', 'PRON'] 

The prediction for pred[0:8] is: 
 ['PRON', 'AUX', 'ADJ', 'PUNCT', 'ADV', 'AUX', 'NUM'] 
 ['--s--', '看似', '簡單', '，', '只', '是', '二']
pos:  ['--s--', 'AUX', 'ADJ', 'PUNCT', 'ADV', 'VERB', 'NUM']
