In [391]:
import re
import math
import conllu
import random
import numpy as np
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import nltk

In [392]:
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")
device

device(type='cpu')

In [298]:
word_to_index = {}
tag_to_index = {}
max_sentence_length = 0
word_count = {}
word_to_index['<PAD>'] = 0
tag_to_index['<UNK>'] = 0

Got all senteces with padding and each sentence with pos tags

In [393]:
# This will provide each sentence with paddings <unk>
def process_dataset(dataset_file, p=2, s=3):
    sentences_list = []
    pos_list = []

    with open(dataset_file, 'r', encoding='utf-8') as f:
        sentence_tokens = []
        pos_tags = []

        for line in f:
            line = line.strip()

            if line.startswith('#'):
                sentence_tokens = []
                pos_tags = []
                continue
            elif line == '':
                # Append padding to the end of the sentence
                padded_sentence = ' '.join(['<PAD>'] * p) + ' ' + ' '.join(sentence_tokens) + ' ' + ' '.join(['<PAD>'] * s)
                padded_pos = ' '.join(['<UNK>'] * p + pos_tags + ['<UNK>'] * s)
                sentences_list.append(padded_sentence)
                pos_list.append(padded_pos)
                continue
            else:
                # New sentence begins
                token_attrs = line.split('\t')
                word_form = token_attrs[1]  # Word form of the token
                pos_tag = token_attrs[3]    # POS tag of the token
                sentence_tokens.append(word_form)
                pos_tags.append(pos_tag)

    return sentences_list, pos_list

In [395]:
def get_indices(sentences_list, pos_list, word_to_index, tag_to_index, max_sentence_length, word_count):
    # Process each sentence to tokenize the data
    for sentence_str, tag_str in zip(sentences_list, pos_list):
        # Tokenize the sentence into individual tokens
        tokens = sentence_str.split(' ')
        tags = tag_str.split(' ')
            # Word to index
        for word, tag in zip(tokens, tags):
            if word not in word_to_index:
                word_to_index[word] = len(word_to_index)
            word_count[word] = word_count.get(word, 0) + 1
        # Tag to index
            if tag not in tag_to_index:
                tag_to_index[tag] = len(tag_to_index)
        max_sentence_length = max(max_sentence_length, len(tokens))
    return word_to_index, tag_to_index, max_sentence_length, word_count

In [396]:
train_dataset = "./conllu/train.conllu"
test_dataset = "./conllu/test.conllu"
val_dataset = "./conllu/val.conllu"
train_sentence_list, train_pos_list = process_dataset(train_dataset, p= 2, s= 3)
test_sentence_list, test_pos_list = process_dataset(test_dataset, p= 2, s= 3)
val_sentence_list, val_pos_list = process_dataset(val_dataset, p= 2, s= 3)


sentences_list = train_sentence_list + test_sentence_list + val_sentence_list
pos_list = train_pos_list + test_pos_list + val_pos_list

# Split the data into sentences
word_to_index = {'<UNK>': 0}
tag_to_index = {'<UNK>': 0}
word_count = {'<UNK>': 1}
max_sentence_length = 0
# get all indices
word_to_index, tag_to_index, max_sentence_length, word_count = get_indices(sentences_list, pos_list, word_to_index, tag_to_index, max_sentence_length, word_count)

## Function for calculating embedding for all sentences 

In [397]:
def PrepareEmbedding(sentence_dataset, pos_dataset, word_to_index, tag_to_index, max_sentence_length, word_count):
    token_embeddings = []
    labels_embedding = []
    for sentence_str, tag_str in zip(sentence_dataset, pos_dataset):
        # Tokenize the sentence into individual tokens
        tokens = sentence_str.split(' ')
        tags = tag_str.split(' ')
        one_sentence_token_embedding = []
        one_sentence_pos_embedding = []
        # Word to index
        for word, tag in zip(tokens, tags):
            if word in word_to_index:
                if word_count[word] < 2:
                    word_cur_idx = word_to_index['<UNK>']
                else:
                    word_cur_idx = word_to_index[word]
            else:
                word_cur_idx = word_to_index['<UNK>']
            # Tag to index
            if tag in tag_to_index:
                tag_cur_idx = tag_to_index[tag]
            else:
                tag_cur_idx = tag_to_index['<UNK>']
            one_sentence_token_embedding.append(word_cur_idx)
            one_sentence_pos_embedding.append(tag_cur_idx)
        # Pad sequences using PyTorch's pad_sequence function
        # one_sentence_token_embedding.extend([0] * (max_sentence_length - len(one_sentence_token_embedding)))
        # one_sentence_pos_embedding.extend([0] * (max_sentence_length - len(one_sentence_pos_embedding)))
        token_embeddings.append(one_sentence_token_embedding)
        labels_embedding.append(one_sentence_pos_embedding)
    return token_embeddings, labels_embedding

In [417]:
train_sentence_embeddings, train_pos_embeddings = PrepareEmbedding(train_sentence_list, train_pos_list, word_to_index, tag_to_index, max_sentence_length, word_count)
test_sentence_embeddings, test_pos_embeddings = PrepareEmbedding(test_sentence_list, test_pos_list, word_to_index, tag_to_index, max_sentence_length, word_count)
val_sentence_embeddings, val_pos_embeddings = PrepareEmbedding(val_sentence_list, val_pos_list, word_to_index, tag_to_index, max_sentence_length, word_count)
# It will save a lot of time

In [400]:
train_sentence_embeddings = train_sentence_embeddings[:10]
train_pos_embeddings = train_pos_embeddings[:10]
test_sentence_embeddings = test_sentence_embeddings[:10]
test_pos_embeddings = test_pos_embeddings[:10]
val_sentence_embeddings = val_sentence_embeddings[:10]
val_pos_embeddings = val_pos_embeddings[:10]


In [401]:
with open("out.txt", "w") as f:
    for word in train_sentence_embeddings:
        f.write(f"{word}\n")   

In [385]:
# saving json of dictinary
import json
with open('word_to_index.json', 'w') as f:
    json.dump(word_to_index, f)
with open('tag_to_index.json', 'w') as f:
    json.dump(tag_to_index, f)
with open('word_count.json', 'w') as f:
    json.dump(word_count, f)

### Feed Forward netword

In [418]:
import numpy as np
# Step 1: Define the Model
class FFNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size, p, s):
        super(FFNN, self).__init__()
        # Calculate the actual input size considering embedding dimensions
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.fc1 = nn.Linear((p + s + 1) *embedding_dim , hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        # Flatten the input tensor
        first = self.embedding(x)
        first = first.view(-1)
        out = self.fc1(first)
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [None]:
FFNNmodel_path = 'FFNNmodel.pth'
# Save the model
torch.save(model.state_dict(), FFNNmodel_path)

In [420]:
# Define the number of steps after which to print the loss and accuracy
print_interval = 100

# Step 2: Define Loss Function
criterion = nn.CrossEntropyLoss()

# Step 3: Instantiate Model
vocab_size = len(word_to_index)
embedding_dim = 100  # Example dimension, adjust as needed
hidden_size = 64  # Example size, adjust as needed
output_size = len(tag_to_index)
p = 2
s = 3
model = FFNN(vocab_size, embedding_dim, hidden_size, output_size, p, s).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001) # Example optimizer, adjust as needed

def train_model(model, criterion, optimizer, train_embeddings, train_pos_embeddings, print_interval, epoch):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    # Iterate over the training dataset
    for token_indices, pos_indices in zip(train_embeddings, train_pos_embeddings):
        token_indices = torch.LongTensor(token_indices).to(device)
        pos_indices = torch.LongTensor(pos_indices).to(device)
        # Create sliding window of size 6 and convert to tensors
        for i in range(p, len(token_indices) - s):
            window_tokens = token_indices[i-p:i+s+1]
            # window_tokens_tensor = torch.LongTensor(window_tokens).to(device)
            pos_tag = pos_indices[i]
            # creating one hot encoding for the pos tag
            # length should be the number of tags
            pos_tag_tensor = torch.zeros(len(tag_to_index)).to(device)
            pos_tag_tensor[pos_tag] = 1
            optimizer.zero_grad()
            outputs = model(window_tokens)  # Forward pass
            # Calculate loss
            loss = criterion(outputs, pos_tag_tensor)  # Compare outputs with true labels
            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

    return running_loss / len(train_embeddings)

def evaluate_model(model, criterion, val_embeddings, val_pos_embeddings):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0

    # Iterate over the validation dataset
    with torch.no_grad():
        for token_indices, pos_indices in zip(val_embeddings, val_pos_embeddings):
            # Create sliding window of size 6 and convert to tensors
            token_indices = torch.LongTensor(token_indices).to(device)
            pos_indices = torch.LongTensor(pos_indices).to(device)
            for i in range(p, len(token_indices) - s):
                window_tokens = token_indices[i-p:i+s+1]
                # window_tokens_tensor = torch.LongTensor(window_tokens).to(device)
                pos_tag = pos_indices[i]
                # creating one hot encoding for the pos tag
                # length should be the number of tags
                pos_tag_tensor = torch.zeros(len(tag_to_index)).to(device)
                pos_tag_tensor[pos_tag] = 1
                outputs = model(window_tokens)  # Forward pass
                # Calculate loss
                loss = criterion(outputs, pos_tag_tensor)  # Compare outputs with true labels
                running_loss += loss.item()

    return running_loss / len(val_embeddings)

def test_model(model, criterion, test_embeddings, test_pos_embeddings):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0

    # Iterate over the test dataset
    with torch.no_grad():
        for token_indices, pos_indices in zip(test_embeddings, test_pos_embeddings):
            # Create sliding window of size 6 and convert to tensors
            token_indices = torch.LongTensor(token_indices).to(device)
            pos_indices = torch.LongTensor(pos_indices).to(device)
            for i in range(p, len(token_indices) - s):
                window_tokens = token_indices[i-p:i+s+1]
                # window_tokens_tensor = torch.LongTensor(window_tokens)
                pos_tag = pos_indices[i]
                # creating one hot encoding for the pos tag
                # length should be the number of tags
                pos_tag_tensor = torch.zeros(len(tag_to_index)).to(device)
                pos_tag_tensor[pos_tag] = 1
                outputs = model(window_tokens)  # Forward pass
                # Calculate loss
                loss = criterion(outputs, pos_tag_tensor)  # Compare outputs with true labels
                running_loss += loss.item()

    return running_loss / len(test_embeddings)

# Number of epochs
num_epochs = 10
train_losses = []
val_losses = []
for epoch in range(num_epochs):
    # Training phase
    train_loss = train_model(model, criterion, optimizer, train_sentence_embeddings, train_pos_embeddings, print_interval, epoch+1)
    print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}")
    train_losses.append(train_loss)
    # Validation phase
    val_loss = evaluate_model(model, criterion, val_sentence_embeddings, val_pos_embeddings)
    print(f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {val_loss:.4f}")
    val_losses.append(val_loss)

# Testing phase
test_loss = test_model(model, criterion, test_sentence_embeddings, test_pos_embeddings)
print(f"Test Loss: {test_loss:.4f}")



Epoch [1/10], Train Loss: 1.9634
Epoch [1/10], Validation Loss: 3.0968
Epoch [2/10], Train Loss: 0.9286
Epoch [2/10], Validation Loss: 2.8092
Epoch [3/10], Train Loss: 0.7599
Epoch [3/10], Validation Loss: 2.8984
Epoch [4/10], Train Loss: 0.5813
Epoch [4/10], Validation Loss: 3.1660
Epoch [5/10], Train Loss: 0.5628
Epoch [5/10], Validation Loss: 3.5876
Epoch [6/10], Train Loss: 0.5331
Epoch [6/10], Validation Loss: 3.9433
Epoch [7/10], Train Loss: 0.4856
Epoch [7/10], Validation Loss: 3.9942
Epoch [8/10], Train Loss: 0.5076
Epoch [8/10], Validation Loss: 4.4048
Epoch [9/10], Train Loss: 0.5241
Epoch [9/10], Validation Loss: 4.8696



# Step 4: Training Loop

In [None]:
def evaluateFFNN(model, sentences, word_to_index, tag_to_index, device):
    sentence_token = sentences.split(' ')
    embedded_sentence = [word_to_index[word] if word in word_to_index else word_to_index['<UNK>'] for word in sentence_token]
    embedded_sentence = torch.tensor(embedded_sentence, dtype=torch.long)
    token = []
    tag = []
    for i in range(2, len(embedded_sentence) - 3):
        input = embedded_sentence[i-2:i+3 + 1]
        output = model(input)
        token.append(sentence_token[i])
        tag.append(list(tag_to_index.keys())[list(tag_to_index.values()).index(torch.argmax(output).item())])

    for i in range(len(token)):
        print(f"{token[i]}: {tag[i]}")


sentence = "where is the beset place to go for a vacation"
padded_sentence = ' '.join(['<PAD>'] * p) + ' ' + ''.join(sentence) + ' ' + ' '.join(['<PAD>'] * s)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FFNN(vocab_size, embedding_dim, hidden_size, output_size, p, s)
model_param = torch.load('FFNNmodel.pth') 
model.load_state_dict(model_param)

evaluateFFNN(model, padded_sentence, word_to_index, tag_to_index, device)

where: PRON
is: PRON
the: DET
beset: VERB
place: PROPN
to: ADP
go: PROPN
for: ADP
a: DET
vacation: NOUN


AttributeError: 'collections.OrderedDict' object has no attribute 'to'

# Question2 : 2 Recurrent Neural Network POS Tagging
Design and implement a model which uses Recurrent Neural Networks (Vanilla
RNN, LSTM, or GRU) for POS Tagging. The model should take the embeddings for all tokens in a sentence and output the corresponding POS tags in
sequence.
- For Example: In the sentence "An apple a day keeps the doctor away",
 the model takes the embeddings for 
- ["An", "apple", "a", "day", "keeps", "the","doctor", "away"] and
 outputs the POS tags for all the words in the sentence
- ["DET", "NOUN", "DET", "NOUN", "VERB", "DET", "NOUN", "ADV"] 

Step1 : Count all words and postags and provide them a index value

- Doing it for all 3 datasets train, validation and test-dataset

### for all train, test and validation dataset
- I got all sentences tokens and respective pos-tag in form of sentence sepateted by space 
- Max sentece length, word count , word to index adn tag t index 

In [164]:
with open("out.txt", "w") as f:
    for word in tag_to_index:
        f.write(f"{word}\n")    

# index to tag dictionary will be used for predicting

In [305]:
index_to_tag = {v: k for k, v in tag_to_index.items()}
index_to_tag

{0: '<UNK>',
 1: 'PRON',
 2: 'AUX',
 3: 'DET',
 4: 'NOUN',
 5: 'ADP',
 6: 'PROPN',
 7: 'VERB',
 8: 'NUM',
 9: 'ADJ',
 10: 'CCONJ',
 11: 'ADV',
 12: 'PART',
 13: 'INTJ',
 14: 'SYM'}

# Lstm for training as well as validting and testing 

In [335]:
import torch.nn as nn
import torch.nn.functional as F

class LSTMTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)
        self.hidden2tag = nn.Linear(hidden_dim * 2, tagset_size)

    def forward(self, input_sentence):
        embeds = self.word_embeddings(input_sentence)
        lstm_out, _ = self.lstm(embeds.view(len(input_sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(input_sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores


In [379]:
train_sentence_embeddings, train_pos_embeddings = PrepareEmbedding(train_sentence_list, train_pos_list, word_to_index, tag_to_index, max_sentence_length, word_count)
test_sentence_embeddings, test_pos_embeddings = PrepareEmbedding(test_sentence_list, test_pos_list, word_to_index, tag_to_index, max_sentence_length, word_count)
val_sentence_embeddings, val_pos_embeddings = PrepareEmbedding(val_sentence_list, val_pos_list, word_to_index, tag_to_index, max_sentence_length, word_count)
# It will save a lot of time

In [332]:
with open("out.txt", "w") as f:
    for word in val_sentence_embeddings:
        f.write(f"{word}\n")

In [380]:
vocab_size = len(word_to_index)
tagset_size = len(tag_to_index)
embedding_dim = 100
hidden_dim = 128

In [381]:
train_sentence_embeddings = train_sentence_embeddings[:80]
train_pos_embeddings = train_pos_embeddings[:80]
test_sentence_embeddings = test_sentence_embeddings[:4]
test_pos_embeddings = test_pos_embeddings[:4]

In [382]:
def trainModel(model):
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)   
    loss_function = nn.CrossEntropyLoss()
    for epoch in range(10):
        model.zero_grad()
        # ?PrepareEmbedding
        for sentence, tags in zip(train_sentence_embeddings, train_pos_embeddings):
            sentence_in = torch.LongTensor(sentence)
            targets = torch.LongTensor(tags)
            tag_scores = model(sentence_in)
            loss = loss_function(tag_scores, targets)
            loss.backward()
            optimizer.step()

In [383]:
model = LSTMTagger(embedding_dim, hidden_dim, len(word_to_index), len(tag_to_index))
trainModel(model)


In [375]:
def testModel(model, test_sentence_embeddings, test_pos_embeddings):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():  # No need to track gradients during testing
        for sentence, tags in zip(test_sentence_embeddings, test_pos_embeddings):
            sentence_in = torch.LongTensor(sentence)
            targets = torch.LongTensor(tags)
            tag_scores = model(sentence_in)
            _, predicted = torch.max(tag_scores, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    accuracy = correct / total
    print(f"Test Accuracy: {accuracy * 100:.2f}%")
    return accuracy


In [376]:
testModel(model, test_sentence_embeddings, test_pos_embeddings)

Test Accuracy: 5.48%


0.0547945205479452

In [360]:
word_to_index

{'<UNK>': 0,
 '<PAD>': 1,
 'what': 2,
 'is': 3,
 'the': 4,
 'cost': 5,
 'of': 6,
 'a': 7,
 'round': 8,
 'trip': 9,
 'flight': 10,
 'from': 11,
 'pittsburgh': 12,
 'to': 13,
 'atlanta': 14,
 'beginning': 15,
 'on': 16,
 'april': 17,
 'twenty': 18,
 'fifth': 19,
 'and': 20,
 'returning': 21,
 'may': 22,
 'sixth': 23,
 'now': 24,
 'i': 25,
 'need': 26,
 'leaving': 27,
 'fort': 28,
 'worth': 29,
 'arriving': 30,
 'in': 31,
 'denver': 32,
 'no': 33,
 'later': 34,
 'than': 35,
 '2': 36,
 'pm': 37,
 'next': 38,
 'monday': 39,
 'fly': 40,
 'kansas': 41,
 'city': 42,
 'chicago': 43,
 'wednesday': 44,
 'following': 45,
 'day': 46,
 'meaning': 47,
 'meal': 48,
 'code': 49,
 's': 50,
 'show': 51,
 'me': 52,
 'all': 53,
 'flights': 54,
 'which': 55,
 'serve': 56,
 'for': 57,
 'after': 58,
 'tomorrow': 59,
 'us': 60,
 'air': 61,
 'list': 62,
 'nonstop': 63,
 'early': 64,
 'tuesday': 65,
 'morning': 66,
 'dallas': 67,
 'st.': 68,
 'petersburg': 69,
 'toronto': 70,
 'that': 71,
 'arrive': 72,
 'listin

In [378]:
def evaluate(sentence):
    sentence = re.sub('[^ A-Za-z0-9]+', '', sentence).split()
    tokenized_sent = []
    for word in sentence:
        if word in word_to_index:
            tokenized_sent.append(word_to_index[word])
        else:
            tokenized_sent.append(word_to_index['<UNK>'])

    inputs = torch.tensor(tokenized_sent, dtype=torch.long, device=device)
    output = model(inputs)
    for i in range(len(sentence)):
        print(sentence[i]+"    "+index_to_tag[torch.argmax(output[i]).item()])
evaluate("what is the cost of a round")

what
is
the
cost
of
a
round
what    SYM
is    <UNK>
the    VERB
cost    SYM
of    SYM
a    CCONJ
round    ADJ
