In [17]:
import numpy as np
import random
import time

In [81]:
# pos: part of speech
class Viterbi_PennTree:
    def __init__(self, filenames=None, train_ratio=0.8):
        self.train_ratio=train_ratio
        self.pos2idx={'BOS': 0}
        self.idx2pos=['BOS']
        self.word2idx={'UNK': 0, 'NUM': 1}
        self.idx2word=['UNK', 'NUM']
        self.wordCount={}
        self.prob_pos2pos=None
        self.prob_pos2word=None
        self.train_data=[]
        self.test_data=[]
        random.shuffle(filenames)
        self.initTable(filenames)
        self.num_word=len(self.idx2word)
        self.num_pos=len(self.idx2pos)
        
        
    def getSentences(self, filename):
        with open(filename, 'r') as file:
            lines=[l for l in file.readlines()]
        sentence=""
        sentences=[]
        chkNewSentence=True
        for i in range(len(lines)):
            if i+1<len(lines) and (('===' in lines[i]) or (lines[i]=='\n' and lines[i+1]=='\n')):
                if len(sentence)>1:
                    sentences.append(sentence)
                    sentence=""
            elif lines[i]=='\n':
                continue
            else:
                tmpstr=lines[i]
                if tmpstr[0]=='[':
                    tmpstr=tmpstr[2:] # rm space too
                if tmpstr[-1]=='\n':
                    tmpstr=tmpstr[:-1]
                if tmpstr[-1]==']':
                    tmpstr=tmpstr[:-2] # rm space too
                sentence=sentence+str(tmpstr)+' '
        if len(sentence)>1:
                    sentences.append(sentence)
                    sentence=""
        return sentences
    
    def initTable(self, filenames):
        num_train_file=int(self.train_ratio*len(filenames))
        for filename in filenames:
            sentences=self.getSentences(filename)
            for sentence in sentences:
                poses=self.extractPos(sentence)
                if num_train_file<=0:
                    self.test_data.append(poses)
                    continue
                else:
                    self.train_data.append(poses)
                for word, pos in poses:
                    if not word in self.wordCount:
                        self.wordCount[word]=1
                    else:
                        self.wordCount[word]+=1
                    if not pos in self.pos2idx:
                        self.pos2idx[pos]=len(self.idx2pos)
                        self.idx2pos.append(pos)
            num_train_file-=1
            
        for word in self.wordCount:
            count = self.wordCount[word]
            if count==1:
                continue
            self.word2idx[word]=len(self.idx2word)
            self.idx2word.append(word)
        self.prob_pos2pos=np.zeros([len(self.idx2pos), len(self.idx2pos)])
        self.prob_pos2word=np.zeros([len(self.idx2word), len(self.idx2word)])
                
    def extractPos(self, sentence):
        poses=[]
        words=sentence.split(' ')
        words=[w for w in words if not w=='']
        for word in words:
            if not '/' in word:
                continue
            i=-1
            while word[i]!='/':
                i-=1
            pos=word[i+1:]
            w=word[:i].lower()
            w=self.convertIfFloat(w)
            poses.append((w,pos))
        return poses
            
    def convertIfFloat(self,w):
        try:
            tmp=float(w)
            w='NUM'
        except:
            assert(1==1)
        return w
    
    def convertIfUNK(self, word):
        if not word in self.idx2word:
            w='UNK'
        else:
            w=word
        return w
    
    def train(self):
        print(f"Training with {str(len(self.train_data))} sentences.")
        start_time=time.time()
        for poses in self.train_data:
            prev_pos='BOS'
            for word, cur_pos in poses:
                w=self.convertIfUNK(word)
                self.prob_pos2pos[self.pos2idx[prev_pos]][self.pos2idx[cur_pos]]+=1
                self.prob_pos2word[self.pos2idx[cur_pos]][self.word2idx[w]]+=1
                prev_pos=cur_pos

        
        a=self.prob_pos2pos
        self.prob_pos2pos=a/(np.sum(a, axis=1).reshape((a.shape[0],1))+1e-15)
        a=self.prob_pos2word
        self.prob_pos2word=a/(np.sum(a, axis=1).reshape((a.shape[0],1))+1e-15)
        elapsed_time=time.time()-start_time
        print(f"Elapsed time {str(elapsed_time)}s")
        
    def test(self):
        print(f"Testing with {str(len(self.test_data))} sentences.")
        num_correct=0
        num_pos_correct=0
        num_pos=0
        avg_correct_perc=0
        for poses in self.test_data:
            sentence=[]
            target=[]
            for word, pos in poses:
                sentence.append(word)
                target.append(pos)
                num_pos+=1
            pred, _ =self.predict(sentence)
            if ''.join(pred)==''.join(target):
                num_correct+=1
            assert(len(pred)==len(target))
            correct_cnt=0
            for i in range(len(pred)):
                if pred[i]==target[i]:
                    correct_cnt+=1
            num_pos_correct+=correct_cnt
            avg_correct_perc+=correct_cnt/len(pred)
        avg_correct_perc/=len(self.test_data)
        res = {'sentence_acc': num_correct/len(self.test_data), 
               'pos_acc': num_pos_correct/num_pos, 
               'avg_cor_perc': avg_correct_perc}
        print(f"Accuracy by sentence ={res['sentence_acc']}\nAccuracy by word ={res['pos_acc']}")
        print(f"Average correct prediction ratio per sentence ={res['avg_cor_perc']}")
        
        return res
    
    def predict(self, sentence):
        prev_pos='BOS'
        pred=[]
        likelihood=1.0
        for word in sentence:
            mx=-1
            mx_pos=''
            w=self.convertIfUNK(word)
            word_idx=self.word2idx[w]
            for pos in self.pos2idx:
                cur_pos_idx=self.pos2idx[pos]
                a=self.prob_pos2pos[self.pos2idx[prev_pos]][cur_pos_idx]
                b=self.prob_pos2word[cur_pos_idx][word_idx]
                prob=a*b
                if mx<prob:
                    mx=prob
                    mx_pos=pos
            pred.append(mx_pos)
            prev_pos=mx_pos
            likelihood*=mx
        return pred, likelihood
        

In [82]:
random.seed(1234)
filenames=['./treebank/tagged/wsj_'+str(i).zfill(4)+'.pos' for i in range(1,200)]
V = Viterbi_PennTree(filenames=filenames,train_ratio=0.8)

In [83]:
V.train()

Training with 1460 sentences.
Elapsed time 1.721384048461914s


In [84]:
res=V.test()

Testing with 273 sentences.
Accuracy by sentence =0.06593406593406594
Accuracy by word =0.8646827624929815
Average correct prediction ratio per sentence =0.870243114320104


In [85]:
test_data=V.test_data
random.shuffle(test_data)
sample_sentence=test_data[0]
sentence=[w for w,_ in sample_sentence]
print("Example sentence:")
print(''.join([w+' ' for w,_ in sample_sentence]))
print("\nLabeled:")
print(''.join([pos+' ' for _,pos in sample_sentence]))
pred, likelihood = V.predict(sentence)
print("\nPrediction:")
print(''.join([p+' ' for p in pred]))
print(f"\nLikelihood={likelihood}")

Example sentence:
-- dorothy l. sayers , `` the nine tailors '' aslacton , england -- of all scenes that evoke rural england , this is one of the loveliest : an ancient stone church stands amid the fields , the sound of bells cascading from its tower , calling the faithful to evensong . 

Labeled:
: NNP NNP NNP , `` DT CD NNP '' NNP , NNP : IN DT NNS WDT VBP JJ NNP , DT VBZ CD IN DT JJS : DT JJ NN NN VBZ IN DT NNS , DT NN IN NNS VBG IN PRP$ NN , VBG DT NN TO NN . 

Prediction:
: NNP NNP NNP , `` DT CD NN '' NNP , NNP : IN DT NN IN NNP NNP NNP , DT VBZ CD IN DT NN : DT NN NN NN VBZ VBN DT NNS , DT JJ IN NNP NNP IN PRP$ NNP , NNP DT NN TO VB . 

Likelihood=6.500328067697114e-114
