In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#https://www.mygreatlearning.com/blog/pos-tagging/(took reference from the great learning)

In [None]:
# Importing libraries
import nltk
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import pprint, time
import random
from datetime import datetime

In [None]:
class Viterbi_Algorithm:
  def __init__(self):
      self.train_unique_words = []
      self.train_unique_tags = []
      self.words_emission_count = pd.DataFrame()
      self.words_emission_prob = pd.DataFrame()
      self.tags_transition_count = pd.DataFrame()
      self.tags_transition_prob = pd.DataFrame()

      self.input_pred = pd.DataFrame()
      self.words_bestTags = {}
      self.results = []
      self.result_df = pd.DataFrame()

# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% LOAD DATA
  def load_data_as_dataFrame(self, filePath):
      testfile = open(filePath, 'r')
      words = []
      tags = []
      for line in testfile:
          pieces = line.rstrip("\n").split("\t")
          words.append(pieces[0])
          tags.append(pieces[1])

      # --------------------------------------------------------------- Preparing DataFrame
      data = pd.DataFrame(columns=["words", "tags"])
      data['words'] = [str(word) for word in words]
      data['tags'] = [str(tag) for tag in tags]

      return data

  def prepare_words_tags(self, data):
      # --------------------------------------------------------------- Preparing Unique Words and Unique Tags
      data_not_S = data[data["words"] != "<S>"]
      self.train_unique_words = list(set(data_not_S['words']))
      self.train_unique_tags = list(set(data_not_S['tags']))

      # --------------------------------------------------------------- Prining the POS and those counts
      # print("===== TAGS & COUNT =====")
      # print(data_not_S['tags'].value_counts())
      return ""

  def load_train_data(self, filePath):
      startTime = datetime.now()
      data = self.load_data_as_dataFrame(filePath)
      _ = self.prepare_words_tags(data)
      endTime = datetime.now()
      print("===== PREPARING TRAINING DATA IS SUCCESSFULLY COMPLETED =====")
      print("Data Preparing Time Taken : ", endTime - startTime)
      return data

  def load_test_data(self, filePath):
      startTime = datetime.now()
      data = self.load_data_as_dataFrame(filePath)
      endTime = datetime.now()
      print("===== PREPARING VALIDATE DATA IS SUCCESSFULLY COMPLETED =====")
      print("Data Preparing Time Taken : ", endTime - startTime)
      return data

  def load_pred_data(self, filePath):
      startTime = datetime.now()
      testfile = open(filePath, 'r')
      words = []
      for line in testfile:
          pieces = line.rstrip("\n")
          words.append(pieces[0])

      # --------------------------------------------------------------- Preparing DataFrame
      data = pd.DataFrame(columns=["words"])
      data['words'] = [str(word) for word in words]

      endTime = datetime.now()
      print("===== PREPARING PREDICTION DATA IS SUCCESSFULLY COMPLETED =====")
      print("Data Preparing Time Taken : ", endTime - startTime)
      return data

# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% TRAINING
  def find_words_emission_CountProbTables(self, data):
      # --------------------------------------------------------------- Emission Count Table
      self.words_emission_count = pd.crosstab(data['words'], data['tags'])

      # --------------------------------------------------------------- Emission Probabilities Table
      self.words_emission_prob = self.words_emission_count.copy()
      for tag in self.train_unique_tags:
          self.words_emission_prob[tag] /= self.words_emission_prob[tag].sum()
      return ""

  def find_tags_transition_CountProbTables(self, data):
      # --------------------------------------------------------------- Transition Count Table
      data.loc[data[data['words'] == "<S>"].index, "tags"] = "<S>"
      data['tags_shift_1'] = data['tags'].shift(1)
      data.loc[0, 'tags_shift_1'] = "<S>"
      self.tags_transition_count = pd.crosstab(data['tags_shift_1'], data['tags'])
      self.tags_transition_count.loc['<E>', :] = self.tags_transition_count['<S>']
      self.tags_transition_count = self.tags_transition_count[self.train_unique_tags]

      # --------------------------------------------------------------- Transition Probabilities Table
      self.tags_transition_prob = self.tags_transition_count.copy()
      for tag in self.train_unique_tags:
          self.tags_transition_prob[tag] /= self.tags_transition_prob[tag].sum()

      return ""

  def train(self, data):
      startTime = datetime.now()
      _ = self.prepare_words_tags(data)
      _ = self.find_words_emission_CountProbTables(data)
      _ = self.find_tags_transition_CountProbTables(data)
      endTime = datetime.now()
      print("===== TRAINING IS SUCCESSFULLY COMPLETED =====")
      print("Training Duration Time Taken : ", endTime-startTime)
      return ""

  # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PREDICTIONS

  # --------------------------------------------------------------- Predict possible tag probability for the Word
  def predict_word_tagProbabilities(self, i, word, prevTag):
      if word in self.train_unique_words:
          for tag in self.train_unique_tags:
              self.input_pred.loc[str(i) + '_' + word, tag] = self.words_emission_prob.loc[word, tag] * self.tags_transition_prob.loc[prevTag, tag]
      else:
          for tag in self.train_unique_tags:
              self.input_pred.loc[str(i) + '_' + word, tag] = self.tags_transition_prob.loc[prevTag, tag]

  # finding best values
  def predict_word_bestTagProbability(self, i, word):
      tempdata = self.input_pred.loc[str(i) + '_' + word, :]
      bestValue = tempdata.max()
      result = dict(tempdata[tempdata == bestValue])
      self.words_bestTags[i] = (word, list(result.keys())[0], list(result.values())[0])
      return ""

  def save_predictions(self):
      for i in range(len(self.words_bestTags)):
          self.result_df.loc[len(self.result_df), ["words", "tags"]] = [self.words_bestTags[i][0], self.words_bestTags[i][1]]

      self.result_df.loc[len(self.result_df), ["words", "tags"]] = ["<S>", "<S>"]

  def predict_tags_of_words(self, X):
      # Creating empty dataframe to store the possible probabilities between words and tags
      rows = [str(i) + '_' + word for i, word in enumerate(X)]
      cols = self.train_unique_tags
      zero_data = np.zeros(shape=(len(rows), len(cols)))
      self.input_pred = pd.DataFrame(zero_data, index=rows, columns=cols)
      
      # Creating empty dictionary to store best tags (which tag has max probability) of each word 
      self.words_bestTags = {}

      # --------------------------------------------------------------- Finding Probabilities between the words and tags
      for i in range(len(X)):
          word = X[i]
          if i==0:
              prevTag = "<S>"
          else:
              prevTag = self.words_bestTags[i - 1][1]
          self.predict_word_tagProbabilities(i, word, prevTag)  # Finding all the possible tags & possible probabilities of the word
          _ = self.predict_word_bestTagProbability(i, word)     # Finding the tag which has the max probability for the word
      # --------------------------------------------------------------- Saving the Results (Predictions of tags) of Sentence

      Y_pred = [self.words_bestTags[i][1] for i in range(len(self.words_bestTags))]
      return Y_pred

  def predict_data(self, data):
      startTime = datetime.now()
      
      if data.loc[len(data) - 1, "words"] != "<S>":
          data.loc[len(data), ["words", "tags"]] = ["<S>", "<S>"]
      self.result_df = pd.DataFrame()
      sentence = []
      for word in data["words"]:
          if word != "<S>":
              sentence.append(word)
          elif len(sentence)>0:
              _ = self.predict_tags_of_words(sentence)
              sentence = []
              self.save_predictions()
      else:
          self.result_df = self.result_df.drop(len(self.result_df)-1)
      
      endTime = datetime.now()
      print("===== PREDICTION IS SUCCESSFULLY COMPLETED =====")
      print("Prediction Duration Time Taken : ", endTime - startTime)
      return self.result_df

  def get_accuracy(self, Y, Y_pred):
      startIndex = 0
      Y = list(Y)
      Y_pred = list(Y_pred)
      if Y[0] == "<S>":
          Y = Y[1:]
      if Y[-1] == "<S>":
          Y = Y[:-1]

      count = 0
      for i in range(len(Y)-1):
          if Y[i] == Y_pred[i]:
              count += 1
      print(f"Accuracy : {(count / len(Y_pred)) * 100}%")

  def validate_data(self, data):
      startTime = datetime.now()
      
      if data.loc[len(data) - 1, "words"] != "<S>":
          data.loc[len(data), ["words", "tags"]] = ["<S>", "<S>"]
      data.loc[data[data['words'] == "<S>"].index, "tags"] = "<S>"
      result_df = self.predict_data(data[['words']])
      self.get_accuracy(data['tags'], result_df['tags'])
      
      endTime = datetime.now()
      print("===== VALIDATING IS SUCCESSFULLY COMPLETED =====")
      print("Validating Duration Time Taken : ", endTime - startTime)


In [None]:
# Code all together ===================================================================
'''VA = Viterbi_Algorithm()
train_data = VA.load_train_data("/content/drive/MyDrive/NLP/data/train-v2.tsv")
test_data = VA.load_train_data("/content/drive/MyDrive/NLP/data/test-v2.tsv")
_ = VA.train(train_data)

validate_train_data = train_data.loc[1100:1500, ['words', 'tags']].reset_index(drop=True)
VA.validate_data(validate_train_data)

validate_test_data = test_data.loc[1100:1500, ['words', 'tags']].reset_index(drop=True)
VA.validate_data(validate_test_data)'''

**Initiating Class Object**

In [None]:
VA = Viterbi_Algorithm()

**Loading Train & Test Data**

In [None]:
train_data = VA.load_train_data("/content/drive/MyDrive/NLP/data/train-v2.tsv")
test_data = VA.load_validate_data("/content/drive/MyDrive/NLP/data/test-v2.tsv")

===== PREPARING TRAINING DATA IS SUCCESSFULLY COMPLETED =====
Data Preparing Time Taken :  0:00:11.790623
===== PREPARING VALIDATE DATA IS SUCCESSFULLY COMPLETED =====
Data Preparing Time Taken :  0:00:01.117546


**Training the Model**

In [None]:
_ = VA.train(train_data)

===== TRAINING IS SUCCESSFULLY COMPLETED =====
Training Duration Time Taken :  0:00:09.869574


**Validating Train Data**

In [None]:
validate_train_data = train_data.loc[1100:1500, ['words', 'tags']].reset_index(drop=True)

result_df = VA.predict_data(validate_train_data)
VA.get_accuracy(validate_train_data['tags'], result_df['tags'])

Accuracy : 79.80049875311721%


In [None]:
VA.validate_data(validate_train_data)

Accuracy : 79.80049875311721%


**Validating Test Data**

In [None]:
validate_test_data = test_data.loc[1100:1500, ['words', 'tags']].reset_index(drop=True)
VA.validate_data(validate_test_data)

Accuracy : 78.80299251870323%


**============= NEED TO RUN FOR TEST DATA =============**

In [None]:
result_df = VA.predict_data(test_data)
VA.get_accuracy(test_data['tags'], result_df['tags'])

In [None]:
validate_test_data = test_data.loc[0:100000, ['words', 'tags']].reset_index(drop=True)
VA.validate_data(validate_test_data)

Accuracy : 79.676%
