In [None]:
!nvidia-smi # check which GPU we have

### Imports

In [None]:
import pandas as pd
import numpy as np
import nltk
import re
import string
from collections import Counter, defaultdict
from itertools import islice

from keras.preprocessing.text import one_hot
from keras.layers import Embedding, Dense, Dropout, Flatten
from keras import Sequential 
from keras.preprocessing.sequence import pad_sequences

import tensorflow.keras.backend as K
import tensorflow as tf

from tensorflow.keras.callbacks import EarlyStopping

from keras.models import model_from_json
from pathlib import Path

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Data Processing

In [None]:
DIRECTORY = "/content/drive/My Drive/NLP A2/" 

class Sentences(object):
    """
    Object that allows for reading in of multiple sentences given some filename.
    """

    def __init__(self,filename, vocab = None) -> None:
        self.filename = filename
        if vocab is None:
          self.vocab = self.unk_handling(1)
        else: 
          self.vocab = vocab
        self.hash_to_word = defaultdict(lambda:"<UNK>")

    def unk_handling(self,threshold):
      """
      Returns a set of all vocabulary where the frequency of a word is greater than threshold.
      """
        counter = Counter()
        with open(DIRECTORY+self.filename,"rb") as file:
            for sentence in file:
                counter.update(Counter(str(sentence).lower().translate(str.maketrans('','',string.punctuation)).split()))

        return {k for k,c in counter.items() if c > threshold}

    def __iter__(self):
      """
      Iterator functionality that returns formatted numpy array ready for Keras implementation.
      """
        vocab_length = len(self.vocab)+2
        with open(DIRECTORY + self.filename,"rb") as file:
            for sentence in file:
              encoded_arr = [one_hot("<s>",vocab_length)[0]]
              for word in [word if word in self.vocab else "<UNK>" for word in str(sentence).lower().translate(str.maketrans('','',string.punctuation)).split()]:
                  hashed_word = one_hot(word,vocab_length)
                  self.hash_to_word[hashed_word[0]] = word
                  encoded_arr.append(hashed_word[0])
              yield np.array(encoded_arr)

def subseqs(seq,window_length):
  """
  Generates a numpy array containing all contigious sequences of some sequence seq where each set os of length window_length.
  """
  return np.fromfunction(lambda i, j: seq[i + j], (len(seq) - window_length + 1, window_length),dtype=int)

train_sentences = Sentences("nchlt_text.nr.train")
val_sentences = Sentences("nchlt_text.nr.valid", train_sentences.vocab)
test_sentences = Sentences("nchlt_text.nr.test", train_sentences.vocab)

In [None]:
train = []
val = []
test = []

window_length = 3 # n size for an n-gram model

for vec in train_sentences:
  train.extend(subseqs(vec,window_length))
for vec in val_sentences:
  val.extend(subseqs(vec,window_length))
for vec in test_sentences:
  test.extend(subseqs(vec,window_length))

train = pd.DataFrame(train)
val = pd.DataFrame(val)
test = pd.DataFrame(test)

In [None]:
# Formatting for Keras

X_train = np.array(train.iloc[:,0:window_length-1])
y_train = np.array(train.iloc[:,window_length-1])

X_val = np.array(val.iloc[:,0:window_length-1])
y_val = np.array(val.iloc[:,window_length-1])

X_test = np.array(test.iloc[:,0:window_length-1])
y_test = np.array(test.iloc[:,window_length-1])

In [None]:
X_train

### Neural Network Model

In [None]:
vocab_size = len(train_sentences.vocab)+2
vocab_size

In [None]:
# Perplexity metric
def perplexity(y_true, y_pred):
   """
    Perplexity = exp(cross entropy loss). Metric for Keras.
   """
   scce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
   perplexity = K.exp(scce(y_true, y_pred))
   return perplexity

# Custom function used for early stopping of model training 
custom_early_stopping = EarlyStopping(
    monitor='val_loss', 
    patience=2, 
    min_delta=0.0001 # amount of change to quantify an improvement
)

In [None]:
# define the model
model = Sequential()
model.add(Embedding(vocab_size, 1000, input_length=window_length-1))
model.add(Flatten())
model.add(Dense(1024, activation = 'relu'))
model.add(Dropout(0.2))
model.add(Dense(2048, activation = 'relu'))
model.add(Dropout(0.5))
model.add(Dense(vocab_size, activation='softmax'))

# compile the model
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.02, momentum=0.5),
              loss='sparse_categorical_crossentropy', 
              metrics=['accuracy', perplexity])
# summarize the model
print(model.summary())

In [None]:
epochs = 50
batch_size = 4096

In [None]:
# Train
model_history = model.fit(x = X_train, 
                                y = y_train, 
                                epochs=epochs,
                                batch_size=batch_size,
                                validation_data = (X_val,y_val),
                                callbacks=[custom_early_stopping],
                                verbose=1)

In [None]:
# evaluate the model
loss, accuracy, _ = model.evaluate(X_test, y_test, verbose=0)
print(f'Loss: {round(loss,4)}')
print(f'Perplexity: {round(np.exp(loss),4)}')
print(f'Accuracy: {round(accuracy,4)}')