## N-gram model

This is to train a bigram and trigram language model.

In [1]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader


import sys, re
import numpy as np
import math


In [3]:


###############################################################################

def preprocess(s):
    """Tokenise a line"""
    o = re.sub('([^a-zA-Z0-9\']+)', ' \g<1> ', s.strip())
    return ['<BOS>'] + re.sub('  *', ' ', o).strip().split(' ')

###############################################################################

In [5]:
training_samples = []
vocabulary = set(['<UNK>'])

def file_read(filename):
    with open(filename) as f:
        lines = [line.rstrip() for line in f]
        return lines

lines = file_read("train.txt")

for line in lines:
    tokens = preprocess(line)
    for i in tokens: vocabulary.add(i) 
    training_samples.append(tokens)


word2idx = {k: v for v, k in enumerate(vocabulary)}
idx2word = {v: k for k, v in word2idx.items()}

x_train = []
y_train = []
for tokens in training_samples:
    for i in range(len(tokens) - 2): #!!!#
        x_train.append([word2idx[tokens[i]], word2idx[tokens[i+1]]]) #!!!#
        y_train.append([word2idx[tokens[i+2]]]) #!!!#

x_train = np.array(x_train)
y_train = np.array(y_train)

###############################################################################

BATCH_SIZE = 1
NUM_EPOCHS = 10

train_set = np.concatenate((x_train, y_train), axis=1)

In [28]:
train_set

array([[13, 15,  2],
       [15,  2,  3],
       [ 2,  3,  1],
       [ 3,  1,  9],
       [13,  8, 15],
       [ 8, 15,  2],
       [15,  2,  9],
       [13, 15,  2],
       [15,  2,  5],
       [ 2,  5,  9],
       [13,  6, 12],
       [ 6, 12,  5],
       [12,  5, 10],
       [13, 15,  2],
       [15,  2, 14],
       [ 2, 14,  4],
       [14,  4,  9],
       [13,  0,  2],
       [ 0,  2, 14],
       [ 2, 14, 11],
       [14, 11,  9]])

In [29]:
# Bigram Neural Network Model
class BigramNNmodel(nn.Module):

    def __init__(self, vocab_size, embedding_dim, context_size, hidden_dim):
        super(BigramNNmodel, self).__init__()
        self.context_size = context_size
        self.embedding_dim = embedding_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(context_size * embedding_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, vocab_size, bias = False)

    def forward(self, inputs):
        # compute x': concatenation of x1 and x2 embeddings
        embeds = self.embeddings(inputs).view(
                (-1,self.context_size * self.embedding_dim))
        # compute h: tanh(W_1.x' + b)
        out = torch.tanh(self.linear1(embeds))
        # compute W_2.h
        out = self.linear2(out)
        # compute y: log_softmax(W_2.h)
        log_probs = F.log_softmax(out, dim=1)
        # return log probabilities
        # BATCH_SIZE x len(vocab)
        return log_probs

### Training and testing bigram model

In [30]:
EMBEDDING_DIM = 4
CONTEXT_SIZE = 1 #!!!#
HIDDEN_DIM = 6

In [33]:
def train(train_file):        
    import sys, re
    import numpy as np
    import math

    ###############################################################################

    training_samples = []
    vocabulary = set(['<UNK>'])
    lines = file_read(train_file)

    # line = sys.stdin.readline()
    # while line:
    #     tokens = preprocess(line)
    #     for i in tokens: vocabulary.add(i) 
    #     training_samples.append(tokens)
    #     line = sys.stdin.readline()

    for line in lines:
        tokens = preprocess(line)
        for i in tokens: vocabulary.add(i) 
        training_samples.append(tokens)


    word2idx = {k: v for v, k in enumerate(vocabulary)}
    idx2word = {v: k for k, v in word2idx.items()}

    x_train = []
    y_train = []
    for tokens in training_samples:
        for i in range(len(tokens) - 1): #!!!#
            x_train.append([word2idx[tokens[i]]]) #!!!#
            y_train.append([word2idx[tokens[i+1]]]) #!!!#

    x_train = np.array(x_train)
    y_train = np.array(y_train)

    ###############################################################################

    BATCH_SIZE = 1
    NUM_EPOCHS = 10

    train_set = np.concatenate((x_train, y_train), axis=1)
    train_loader = DataLoader(train_set, batch_size=BATCH_SIZE)

    loss_function = nn.NLLLoss()
    model = BigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    optimiser = torch.optim.Adam(model.parameters(), lr=0.01)

    for epoch in range(NUM_EPOCHS):
        for i, data_tensor in enumerate(train_loader):
            context_tensor = data_tensor[:,0:1] #!!!#
            target_tensor = data_tensor[:,1] #!!!#

            model.zero_grad()

            log_probs = model(context_tensor)
            loss = loss_function(log_probs, target_tensor)

            loss.backward()
            optimiser.step()    

        print('Epoch:', epoch, 'loss:', float(loss))

    torch.save({'model': model.state_dict(), 'vocab': idx2word}, 'model_bigram.lm')

    print('Model saved.')

In [34]:
train("train.txt")

Epoch: 0 loss: 2.890814781188965
Epoch: 1 loss: 2.7067513465881348
Epoch: 2 loss: 2.524578809738159
Epoch: 3 loss: 2.319181442260742
Epoch: 4 loss: 2.0889270305633545
Epoch: 5 loss: 1.8314764499664307
Epoch: 6 loss: 1.5535516738891602
Epoch: 7 loss: 1.2740225791931152
Epoch: 8 loss: 1.0172144174575806
Epoch: 9 loss: 0.8042901158332825
Model saved.


In [35]:
def test(test_file):
    blob = torch.load('model_bigram.lm')
    idx2word = blob['vocab']
    word2idx = {k: v for v, k in idx2word.items()}
    vocabulary = set(idx2word.values())

    model = BigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    model.load_state_dict(blob['model'])

    ###############################################################################

    BATCH_SIZE = 1

    lines = file_read(test_file)
    
    for line in lines:
        tokens = preprocess(line)
        
        x_test = []
        y_test = []
        for i in range(len(tokens) - 1): #!!!#
            x_test.append([word2idx[tokens[i]]]) #!!!#
            y_test.append([word2idx[tokens[i+1]]]) #!!!#
        
        x_test = np.array(x_test)
        y_test = np.array(y_test)
        
        test_set = np.concatenate((x_test, y_test), axis=1)
        test_loader = DataLoader(test_set, batch_size=BATCH_SIZE)
        
        total_prob = 1.0
        for i, data_tensor in enumerate(test_loader):
            context_tensor = data_tensor[:,0:1] #!!!#
            target_tensor = data_tensor[:,1] #!!!#
            log_probs = model(context_tensor)
            probs = torch.exp(log_probs)
            predicted_label = int(torch.argmax(probs, dim=1)[0])
        
            true_label = y_test[i][0]
            true_word = idx2word[true_label]
        
            prob_true = float(probs[0][true_label])
            total_prob *= prob_true
        
        print('%.6f\t%.6f\t' % (total_prob, math.log(total_prob)), tokens)

In [36]:
test("test.txt")

0.005596	-5.185768	 ['<BOS>', 'where', 'are', 'you', '?']
0.000946	-6.962973	 ['<BOS>', 'were', 'you', 'in', 'england', '?']
0.011601	-4.456650	 ['<BOS>', 'are', 'you', 'in', 'mexico', '?']
0.000014	-11.157101	 ['<BOS>', 'i', 'am', 'in', 'mexico', '.']
0.000124	-8.995025	 ['<BOS>', 'are', 'you', 'still', 'in', 'mexico', '?']


### Training and testing a trigram model

In [37]:
EMBEDDING_DIM = 4
CONTEXT_SIZE = 2 #!!!#
HIDDEN_DIM = 6

In [38]:
# Trigram Neural Network Model
class TrigramNNmodel(nn.Module):

    def __init__(self, vocab_size, embedding_dim, context_size, hidden_dim):
        super(TrigramNNmodel, self).__init__()
        self.context_size = context_size
        self.embedding_dim = embedding_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(context_size * embedding_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, vocab_size, bias = False)

    def forward(self, inputs):
        # compute x': concatenation of x1 and x2 embeddings
        embeds = self.embeddings(inputs).view(
                (-1,self.context_size * self.embedding_dim))
        # compute h: tanh(W_1.x' + b)
        out = torch.tanh(self.linear1(embeds))
        # compute W_2.h
        out = self.linear2(out)
        # compute y: log_softmax(W_2.h)
        log_probs = F.log_softmax(out, dim=1)
        # return log probabilities
        # BATCH_SIZE x len(vocab)
        return log_probs

In [41]:
def train(train_file):        
    import sys, re
    import numpy as np
    import math

    ###############################################################################

    training_samples = []
    vocabulary = set(['<UNK>'])
    lines = file_read(train_file)

    # line = sys.stdin.readline()
    # while line:
    #     tokens = preprocess(line)
    #     for i in tokens: vocabulary.add(i) 
    #     training_samples.append(tokens)
    #     line = sys.stdin.readline()

    for line in lines:
        tokens = preprocess(line)
        for i in tokens: vocabulary.add(i) 
        training_samples.append(tokens)


    word2idx = {k: v for v, k in enumerate(vocabulary)}
    idx2word = {v: k for k, v in word2idx.items()}

    x_train = []
    y_train = []
    for tokens in training_samples:
        for i in range(len(tokens) - 2): #!!!#
            x_train.append([word2idx[tokens[i]], word2idx[tokens[i+1]]]) #!!!#
            y_train.append([word2idx[tokens[i+2]]]) #!!!#

    x_train = np.array(x_train)
    y_train = np.array(y_train)

    ###############################################################################

    BATCH_SIZE = 1
    NUM_EPOCHS = 10

    train_set = np.concatenate((x_train, y_train), axis=1)
    train_loader = DataLoader(train_set, batch_size=BATCH_SIZE)

    loss_function = nn.NLLLoss()
    model = TrigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    optimiser = torch.optim.Adam(model.parameters(), lr=0.01)

    for epoch in range(NUM_EPOCHS):
        for i, data_tensor in enumerate(train_loader):
            context_tensor = data_tensor[:,0:2] #!!!#
            target_tensor = data_tensor[:,2] #!!!#

            model.zero_grad()

            log_probs = model(context_tensor)
            loss = loss_function(log_probs, target_tensor)

            loss.backward()
            optimiser.step()    

        print('Epoch:', epoch, 'loss:', float(loss))

    torch.save({'model': model.state_dict(), 'vocab': idx2word}, 'model_trigram.lm')

    print('Model saved.')

In [42]:
train("train.txt")

Epoch: 0 loss: 2.7683162689208984
Epoch: 1 loss: 2.051835536956787
Epoch: 2 loss: 1.4063498973846436
Epoch: 3 loss: 0.9947658181190491
Epoch: 4 loss: 0.7658755779266357
Epoch: 5 loss: 0.6151495575904846
Epoch: 6 loss: 0.5007540583610535
Epoch: 7 loss: 0.4108475148677826
Epoch: 8 loss: 0.3393906354904175
Epoch: 9 loss: 0.2827325463294983
Model saved.


In [43]:
def test(test_file):
    blob = torch.load('model_trigram.lm')
    idx2word = blob['vocab']
    word2idx = {k: v for v, k in idx2word.items()}
    vocabulary = set(idx2word.values())

    model = BigramNNmodel(len(vocabulary), EMBEDDING_DIM, CONTEXT_SIZE, HIDDEN_DIM)
    model.load_state_dict(blob['model'])

    ###############################################################################

    BATCH_SIZE = 1

    lines = file_read(test_file)
    
    for line in lines:
        tokens = preprocess(line)
        
        x_test = []
        y_test = []
        for i in range(len(tokens) - 2): #!!!#
            x_test.append([word2idx[tokens[i]], word2idx[tokens[i+1]]]) #!!!#
            y_test.append([word2idx[tokens[i+2]]]) #!!!#
        
        x_test = np.array(x_test)
        y_test = np.array(y_test)
        
        test_set = np.concatenate((x_test, y_test), axis=1)
        test_loader = DataLoader(test_set, batch_size=BATCH_SIZE)
        
        total_prob = 1.0
        for i, data_tensor in enumerate(test_loader):
            context_tensor = data_tensor[:,0:2] #!!!#
            target_tensor = data_tensor[:,2] #!!!#
            log_probs = model(context_tensor)
            probs = torch.exp(log_probs)
            predicted_label = int(torch.argmax(probs, dim=1)[0])
        
            true_label = y_test[i][0]
            true_word = idx2word[true_label]
        
            prob_true = float(probs[0][true_label])
            total_prob *= prob_true
        
        print('%.6f\t%.6f\t' % (total_prob, math.log(total_prob)), tokens)
        

In [44]:
test("test.txt")

0.043800	-3.128118	 ['<BOS>', 'where', 'are', 'you', '?']
0.035767	-3.330737	 ['<BOS>', 'were', 'you', 'in', 'england', '?']
0.038391	-3.259942	 ['<BOS>', 'are', 'you', 'in', 'mexico', '?']
0.000484	-7.633777	 ['<BOS>', 'i', 'am', 'in', 'mexico', '.']
0.000565	-7.478812	 ['<BOS>', 'are', 'you', 'still', 'in', 'mexico', '?']
