<a href="https://colab.research.google.com/github/ethanwcooper/gRPCAuthWithAzureAD/blob/master/EWC180001_POS_Tagger.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [33]:
# install necessary packages using pip
!pip install keras numpy wget

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [34]:
!pwd

/content


In [35]:
import os
import sys
import numpy as np

def load_corpus(path):
    """ Load corpus from a folder / directory

    Arg:
        path: a text sequence denotes the path of corpus

    Return:
        sentences: a list of sentences that are preprocessed in the corpus
    """
    # Check if the path is a directory.
    if not os.path.isdir(path):
        sys.exit("Input path is not a directory")
    else:
        sentences = []
        # print(os.listdir(path))
        for file in os.listdir(path):
            if not os.path.isdir(file):
                with open(os.path.join(path, file), 'r') as f:      
                    lines = f.readlines()
                for i in range(len(lines)):
                    lines[i] = lines[i].strip()
                    word_tags = lines[i].split()
                    sentence = []
                    for j in range(len(word_tags)):
                        # word_tag_tuple = tuple(word_tags[j].split('/'))
                        word_tag_tuple = word_tags[j].split('/')
                        word_tag_tuple[0] = word_tag_tuple[0].lower()
                        word_tag_tuple = tuple(word_tag_tuple)
                        sentence.append(word_tag_tuple)
                    if sentence:  # VERIFY THAT LEAVING OUT BLANK LINES IS OK
                        sentences.append(sentence)
        return sentences


In [36]:
!pwd

/content


In [37]:
import os
import numpy as np

def load_corpus(path):
    """ Load corpus from a folder / directory

    Arg:
        path: a text sequence denotes the path of corpus

    Return:
        sentences: a list of sentences that are preprocessed in the corpus
    """
    # Check if the path is a directory.
    if not os.path.isdir(path):
        sys.exit("Input path is not a directory")
    else:
        sentences = []
        # print(os.listdir(path))
        for file in os.listdir(path):
            if not os.path.isdir(file):
                with open(os.path.join(path, file), 'r') as f:      
                    lines = f.readlines()
                for i in range(len(lines)):
                    lines[i] = lines[i].strip()
                    word_tags = lines[i].split()
                    sentence = []
                    for j in range(len(word_tags)):
                        word_tag_tuple = word_tags[j].split('/')
                        word_tag_tuple[0] = word_tag_tuple[0].lower()
                        word_tag_tuple = tuple(word_tag_tuple)
                        sentence.append(word_tag_tuple)
                    if sentence:  # VERIFY THAT LEAVING OUT BLANK LINES IS OK
                        sentences.append(sentence)
        return sentences

class HMMTagger:
    def __init__(self):
        self.tags = ['DETERMINER', 'NOUN', 'ADJECTIVE', 'VERB', 'PREPOSITION', 'PUNCT', 'ADVERB', 'NUMBER', 'PRONOUN', 'CONJUNCTION', 'X']
        # self.tags = ['NOUN', 'PUNCT', 'VERB', 'X']

        self.init_tag_count = {}
        self.num_sentences = 0

        self.trans_count = {}
        self.num_trans = {}
        self.emission_count = {}
        self.vocab = set()
        self.word_count = {}
        self.tag_count = {}

    def get_vocab(self, sentences):
        vocab = set()
        for sentence in sentences:
            for pair in sentence:
                word = pair[0]
                vocab.add(word)
        return vocab

    def initialize_probabilities(self, sentences):
        """ Initialize / learn probabilities from the corpus

        In this function, you should learn inital probability, transition
        probability, and emission probability. Also, you should apply the
        add-one smoothing properly here.

        Arg:
            sentences: a list of sentences that are preprocessed in the corpus
        """

        # Initialize tag probabilities
        self.init_tag_count = {}
        self.num_sentences = 0
        for sentence in sentences:
            start_tag = sentence[0][1]
            self.init_tag_count[start_tag] = self.init_tag_count.get(start_tag, 0) + 1
            self.num_sentences += 1
        # Initialize transistion probabilities
        self.trans_count = {}
        self.num_trans = {}
        for sentence in sentences:
            for i in range(1, len(sentence)):
                t_curr = sentence[i][1]
                t_prev = sentence[i-1][1]
                i_to_j = (t_prev, t_curr)
                self.trans_count[i_to_j] = self.trans_count.get(i_to_j, 0) + 1
                self.num_trans[t_prev] = self.num_trans.get(t_prev, 0) + 1
        # Initialize emission probabilities
        self.emission_count = {}
        self.vocab = self.get_vocab(sentences)
        self.word_count = {}
        self.tag_count = {}
        for sentence in sentences:
            for pair in sentence:
                word = pair[0]
                tag = pair[1]
                t_to_w = (tag, word)
                self.emission_count[t_to_w] = self.emission_count.get(t_to_w, 0) + 1
                self.tag_count[tag] = self.tag_count.get(tag, 0) + 1
                self.word_count[word] = self.word_count.get(word, 0) + 1
    
    def get_init_tag_prob(self, tag):
        return (self.init_tag_count.get(tag, 0) + 1) / (self.num_sentences + len(self.tags))
    
    def get_trans_prob(self, i, j):
        i_to_j = (i, j)
        return (self.trans_count.get(i_to_j, 0) + 1) / (self.num_trans.get(i, 0) + len(self.tags))

    def get_emission_prob(self, tag, word):
        t_to_w = (tag, word)
        return (self.emission_count.get(t_to_w, 0) + 1) / (self.tag_count.get(tag, 0) + len(self.vocab))

    def viterbi_decode(self, sentence):
        """ Viterbi decoding algorithm implementation

        Arg:
            sentence: a text sequence needed to be decoded
        """
        viterbi = np.empty([len(self.tags), len(sentence)], dtype=float) # key = (tag, observation aka word in sentence)
        backpointer = np.empty([len(self.tags), len(sentence)], dtype=int)
        N = len(self.tags)
        T = len(sentence)
        for i in range(len(self.tags)):
            tag = self.tags[i]
            # print(tag)
            # print(sentence[0])
            # print(self.get_emission_prob(tag, sentence[0]))
            viterbi[i, 0] = self.get_init_tag_prob(tag) * self.get_emission_prob(tag, sentence[0])
            backpointer[i, 0] = -1 # Should this be a 0
        for t in range(1, T):
            word = sentence[t]
            for s in range(N):
                tag = self.tags[s]
                max = float("-inf")
                max_idx = -999
                for s_prime in range(N):
                    tag_prime = self.tags[s_prime]
                    curr = viterbi[s_prime, t-1] * self.get_trans_prob(tag_prime, tag) * self.get_emission_prob(tag, word)
                    if curr > max:
                        max = curr
                        max_idx = s_prime
                viterbi[s, t] = max
                backpointer[s, t] = max_idx
        max = float("-inf")
        max_idx = -999
        for s in range(N):
            tag = self.tags[s]
            curr = viterbi[s, T-1] * self.get_trans_prob(tag, self.tags[N-1])
            if curr > max:
                max = curr
                max_idx = s
        bestpathprob = max
        bestpathpointer = max_idx
        bestpath = self.trace(bestpathpointer, backpointer)
        return bestpath, bestpathprob
    
    def trace(self, bestpathpointer, backpointer):
        states = []
        s = bestpathpointer
        # print('len(backpointer)', len(backpointer))
        for t in range(len(backpointer[0]) - 1, -1, -1):
            # print('s:', self.tags[s], 't:', t)
            states.insert(0, self.tags[s])
            s = backpointer[s, t]
        return states


sentences = load_corpus('./brown/')
tagger = HMMTagger()
tagger.initialize_probabilities(sentences)
print(tagger.viterbi_decode(['the', 'planet', 'jupiter', 'and', 'its', 'moons', 'are', 'in', 'effect', 'a', 'mini', 'solar', 'system', '.']))
print(tagger.viterbi_decode(['computers', 'process', 'programs', 'accurately', '.']))


(['DETERMINER', 'NOUN', 'NOUN', 'CONJUNCTION', 'PRONOUN', 'VERB', 'VERB', 'PREPOSITION', 'NOUN', 'DETERMINER', 'ADJECTIVE', 'ADJECTIVE', 'NOUN', 'PUNCT'], 2.8465742362225926e-49)
(['DETERMINER', 'NOUN', 'NOUN', 'ADVERB', 'PUNCT'], 8.417180570032029e-23)


In [38]:
import numpy as np # you may need this to convert lists to np arrays before returning them



# Creates the dataset with train_X (words) and train_y (tag).
def create_dataset(sentences):
    # Defines the relevant lists.
    train_words, train_tags = list(), list()
    words = set([])
    tags = set([])
    for sentence in sentences:
        sentence_X = []
        sentence_y = []
        for word_tag in sentence:
            sentence_X.append(word_tag[0])
            words.add(word_tag[0])
            sentence_y.append(word_tag[1])
            tags.add(word_tag[1])
        train_words.append(np.array(sentence_X))
        train_tags.append(np.array(sentence_y))
        
 
    # map each word to a unique integer
    word_map = {word: idx + 2 for idx, word in enumerate(list(words))}    
    word_map['-PAD-'] = 0
    word_map['-OOV-'] = 1   

    # Map each tag to a unique integer
    tag_map = {tag: idx + 1 for idx, tag in enumerate(list(tags))}
    tag_map['-PAD-'] = 0

    train_X, train_y = list(), list()
    # convert train_words to integers
    for sentence in train_words:
        sentence_ints = []
        for word in sentence:
            sentence_ints.append(word_map[word])
        train_X.append(np.asarray(sentence_ints))
    for sentence in train_tags:
        sentence_ints = []
        for word in sentence:
            sentence_ints.append(tag_map[word])
        train_y.append(np.asarray(sentence_ints))
    
    return word_map, tag_map, np.asarray(train_X), np.asarray(train_y)

    # print(train_X)
    # print(train_y)

    # TODO: Your code goes here


# Test the function here
# Call create_dataset()
# print(create_dataset([[('YOU', 'NOUN'), ('ARE','VERB'), ('.', 'PUNCT')], [('I', 'NOUN'), ('ARE','VERB'), ('!', 'PUNCT')], [('BYE', 'X'), ('.', 'PUNCT')]]))


In [39]:
from tensorflow.python import train
# from keras.preprocessing.sequence import pad_sequences as pad
# from keras.utils import pad_sequences
from keras.utils.data_utils import pad_sequences as pad

# Pad the sequences with 0s to the max length.
def pad_sequences(train_X, train_y):
    # Use MAX_LENGTH to record length of longest sequence 
    # TODO: Your code goes here
    MAX_LEN = len(max(train_X, key=len))
    train_X = pad(train_X, maxlen=MAX_LEN, padding='post')
    train_y = pad(train_y, maxlen=MAX_LEN, padding='post')
    
    return train_X, train_y, MAX_LEN

# Test the function
# train_X = np.asarray([np.asarray(['3', '4']), np.asarray(['5', '6'])])
# train_y = np.asarray([np.asarray(['2', '3']), np.asarray(['4', '3'])])
# train_X, train_y, MAX_LENGTH = pad_sequences(train_X, train_y)
# print(train_X)
# print(train_y)
# print(MAX_LENGTH)

In [40]:
from keras.models import Sequential
from keras.layers import InputLayer, Activation
from keras.layers import Dense, LSTM, InputLayer, Bidirectional, TimeDistributed, Embedding
from keras.optimizers import Adam

# Define the Keras model.
def define_model(word_map, tag_map, MAX_LENGTH):  
    model = Sequential()
    # Add layer to receive input
    model.add(InputLayer(input_shape=(MAX_LENGTH, )))
    # Compute word vector model for the words
    model.add(Embedding(len(word_map), 128))
    # Add LSTM layer. Bidirectional inputs the next values in the sequence
    model.add(Bidirectional(LSTM(256, return_sequences=True)))
    # Add a fully connected layer to pick the POS tag
    model.add(TimeDistributed(Dense(len(tag_map))))
    # Using softmax because this is a multi-class setting
    model.add(Activation('softmax'))
    
    model.compile(loss='categorical_crossentropy',
                optimizer=Adam(0.001),
                metrics=['accuracy'])
 
    print(model.summary())
    return model

# Call the function here
# model = define_model(MAX_LENGTH)

In [41]:
# Returns the one-hot encoding of the sequence.
def to_categorical(train_y, num_tags):
    #TODO: Write code here
    one_hot_y = []
    for sent_idx in range(len(train_y)):
        sentence = train_y[sent_idx]
        one_hot_sentence = []
        for tag_idx in range(len(sentence)):
            tag = sentence[tag_idx]
            # Get a list of zeros of len(num_tags)
            zeros = np.zeros(num_tags)
            one_hot_sentence.append(zeros)
            # Change one of the zeros to a 1, so that it is a one-hot
            one_hot_sentence[-1][tag] = 1.0
        one_hot_y.append(one_hot_sentence)
    return np.array(one_hot_y)

# Call the function as to_categorical(train_y, categories = len(tag2idx))

In [42]:
import tensorflow as tf

# Trains the model.
def train(model, train_X, train_y):

    # Fit the data into the Keras model, through 40 passes (epochs) using model.fit()
    model.fit(train_X, to_categorical(train_y, len(tag_map)), batch_size=128, epochs=40, validation_split=.2)
    # Return the model.
    return model

# call function here
# model = train(model, train_X, train_y)

In [43]:
def from_categorical(seqs, idx):
    tag_seqs = []
    # Get each sequence of categorical tags
    for cat_seq_idx in range(len(seqs)):
        cat_seq = seqs[cat_seq_idx]
        tag_seq = []
        # Get each categorical tag
        for cat_idx in range(len(cat_seq)):
            cat = cat_seq[cat_idx]
            # Convert the categorical tag back to its name
            tag_seq.append(idx[np.argmax(cat)])
        tag_seqs.append(tag_seq)
    return tag_seqs

In [44]:
from keras.utils.data_utils import pad_sequences as pad

# Test a sentence using the given model.
def test(model, word_map, tag_map, MAX_LEN, sentences):
    # TODO: Write your code here
    formatted_sents = []
    for sentence in sentences:
        formatted_sent = []
        for word in sentence:
            if word.lower() in word_map:
                formatted_sent.append(word_map[word.lower()])
            else:
                formatted_sent.append(word_map['-OOV-'])
        formatted_sents.append(formatted_sent)
    formatted_sents = pad(formatted_sents, maxlen=MAX_LEN, padding='post')

    preds = model.predict(formatted_sents)
    pred_tags = from_categorical(preds, {i: t for t, i in tag_map.items()})
    no_pads = []
    for sentence in pred_tags:
        sent_no_pads = []
        for tag in sentence:
            if not tag == '-PAD-':
                sent_no_pads.append(tag)
        no_pads.append(sent_no_pads)
    for sent_idx in range(len(no_pads)):
        print()
        print(sentences[sent_idx])
        print(no_pads[sent_idx])
    

sentences = load_corpus('./brown/')
word_map, tag_map, train_X, train_y = create_dataset(sentences)
train_X, train_y, MAX_LEN = pad_sequences(train_X, train_y)
model = define_model(word_map, tag_map, MAX_LEN)
model = train(model, train_X, train_y)

# For the first evaluation sentence.
# testString1 = ["the", "secretariat", "is", "expected", "to", "race", "tomorrow", "."]
# call test() to print tags

# For the second evaluation sentence.
# testString2 = "people continue to enquire the reason for the race for outer space ."
# call test() to print tags
test_sents = [['the', 'planet', 'jupiter', 'and', 'its', 'moons', 'are', 'in', 'effect', 'a', 'mini', 'solar', 'system', '.'], ['computers', 'process', 'programs', 'accurately', '.']]
test(model, word_map, tag_map, MAX_LEN, test_sents)


  return word_map, tag_map, np.asarray(train_X), np.asarray(train_y)


Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_3 (Embedding)     (None, 180, 128)          6367104   
                                                                 
 bidirectional_3 (Bidirectio  (None, 180, 512)         788480    
 nal)                                                            
                                                                 
 time_distributed_3 (TimeDis  (None, 180, 12)          6156      
 tributed)                                                       
                                                                 
 activation_3 (Activation)   (None, 180, 12)           0         
                                                                 
Total params: 7,161,740
Trainable params: 7,161,740
Non-trainable params: 0
_________________________________________________________________
None
Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 