In [2]:
#importing required libraries 
import torch 
import torch.nn as nn 
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader
from conllu import parse
import numpy as np
import random as rd 
from random import shuffle                                                      

cuda = torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")
print("Using device:", device)

Using device: cpu


In [3]:
train_file = 'en_atis-ud-train.conllu'
test_file = 'en_atis-ud-test.conllu'
dev_file = 'en_atis-ud-dev.conllu'


In [4]:
raw_datasets = {}
raw_datasets['train'] = parse(open(train_file, 'r').read())
raw_datasets['dev'] = parse(open(dev_file, 'r').read())
raw_datasets['test'] = parse(open(test_file, 'r').read())
datasets = {}
datasets['test'] = []
datasets['train'] = []
datasets['dev'] = []

In [5]:
      

def get_device():
    '''
    This function returns the device on which the model will be trained
    '''
    if torch.cuda.is_available():
        device = torch.device('cuda')
    else:
        device = torch.device('cpu')
    return device


#reading data from the file 
def read_file(file_name):
    '''
    This function reads the file and returns the data in the form of list of sentences.
    files shoulf be in conllu format 
    conllu file is parsed using conllu library
    args : file_name
    returns : list of sentences
    '''
    
    raw_data = parse(open(file_name).read())
    return raw_data

def create_datasets(file_path , word2idx , tag2idx , train=False):
    '''
    This function creates the dataset from the raw data
    args : raw_data
    returns : dataset where each element is a tuple of (sentence , tags)
    '''
    dataset = [[] , []]
    if train: 
        raw_data = read_file(file_path)
        words = []
        tags = []
        for sentence in raw_data:
            sentence_words = []
            sentence_tags = []

            for word in sentence:
                if word['form'] not in word2idx:
                    word2idx.update({word['form']:len(word2idx)})
                if word['upostag'] not in tag2idx:
                    tag2idx.update({word['upostag']:len(tag2idx)})
                sentence_tags.append(tag2idx[word['upostag']])
                sentence_words.append(word['form'])
            dataset[0].append(sentence_words)
            dataset[1].append(sentence_tags)
        return dataset 
    else:
        raw_data= read_file (file_path)
        for sentence in raw_data:
            sentence_words = []
            sentence_tags = []
            for word in sentence:
                new_word = word['form']
                new_tag = word['upostag']
                if new_word not in word2idx:
                    new_word = '<UNK>'
                sentence_words.append(new_word)
                if new_tag not in tag2idx:
                    tag2idx.update({new_tag:len(tag2idx)})
                sentence_tags.append(tag2idx[new_tag])
            dataset[1].append(sentence_tags)
            dataset[0].append(sentence_words)
        return dataset
    


        



In [13]:

word2idx = {}
word2idx = {'<UNK>': 0}
tag2idx = {}
train_loader = []
dev_loader = []
test_loader = []



class POS_Dataset(Dataset):
    '''
    This class creates the dataset for the model
    '''

    def __init__(self ,words , tags , word2idx ):
        super(POS_Dataset , self).__init__()
        '''
        This function initializes the dataset
        args : data_dir , word2idx , tag2idx
        '''
        self.words = torch.tensor(words)
        self.tags = torch.tensor(tags)
        self.word2idx = word2idx

    def __len__(self):
        '''
        This function returns the length of the dataset
        '''
        return len(self.words)
    
    def __getitem__(self , idx):
        '''
        This function returns the item at the given index
        '''
        return self.words[idx] , self.tags[idx]
    

datasets['train'] = create_datasets(train_file , word2idx , tag2idx , train=True)
loader = DataLoader(datasets['train'] , batch_size=32 , shuffle=True)
print(type(loader))

<class 'torch.utils.data.dataloader.DataLoader'>


In [12]:
# class textDataset(torch.utils.data.Dataset):
#     def __init__


class POS_tagger(nn.Module):
    def __init__(self, embedding_dim , hidden_dim ,hidden_dim2, vocab_size , tagset_size ):
        super(POS_tagger, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        self.hidden2tag = nn.Linear(hidden_dim,hidden_dim2)
        
        self.hidden2tag2 = nn.Linear(hidden_dim2,tagset_size)
       
    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores
        


In [13]:
# mode1 = nn.Sequential(nn.Embedding(len(word2idx), 100), nn.LSTM(100, 100), nn.Linear(100, len(tag2idx)), nn.LogSoftmax(dim=1))
model = POS_tagger(100, 100, len(word2idx), len(tag2idx))
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.05, weight_decay=0)


In [14]:
model.train()
EPOCHS = 50 
STEPS = 0 
best_accuracy = 0
RUNNING_LOSS = 0
PRINT_EVERY = 100
for epoch in range(EPOCHS):
    if epoch % 10 == 0:
        optimizer.param_groups[0]['lr'] = optimizer.param_groups[0]['lr'] * 0.5
    for sentence, tags in train_loader:
        STEPS += 1
        optimizer.zero_grad()
        tag_scores = model(sentence)
        # print(sentence)
        loss = criterion(tag_scores, tags)
        loss.backward()
        optimizer.step()
        RUNNING_LOSS += loss.item()
        if STEPS % PRINT_EVERY == 0:
            model.eval()
            test_loss = 0
            accuracy = 0
            with torch.no_grad():
                for sentence, tags in dev_loader:
                    tag_scores = model(sentence)
                    test_loss += criterion(tag_scores, tags)
                    ps = torch.exp(tag_scores)
                    top_p, top_class = ps.topk(1, dim=1)
                    equals = top_class == tags.view(*top_class.shape)
                    accuracy += torch.mean(equals.type(torch.FloatTensor))
            
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                torch.save(model.state_dict(), 'pos_tagger3.pth')
                print(f"Epoch {epoch+1}/{EPOCHS} "
                  f"Train loss: {RUNNING_LOSS/PRINT_EVERY:.3f}.. "
                  f"Test loss: {test_loss/len(dev_loader):.3f}.. "
                  f"Test accuracy: {accuracy/len(dev_loader):.3f}")
            
            model.train()
            RUNNING_LOSS = 0

Epoch 1/50 Train loss: 1.345.. Test loss: 0.904.. Test accuracy: 0.708
Epoch 1/50 Train loss: 0.985.. Test loss: 0.807.. Test accuracy: 0.740
Epoch 1/50 Train loss: 0.770.. Test loss: 0.770.. Test accuracy: 0.758
Epoch 1/50 Train loss: 0.818.. Test loss: 0.713.. Test accuracy: 0.767
Epoch 1/50 Train loss: 0.833.. Test loss: 0.749.. Test accuracy: 0.768
Epoch 1/50 Train loss: 0.787.. Test loss: 0.714.. Test accuracy: 0.779
Epoch 1/50 Train loss: 0.784.. Test loss: 0.716.. Test accuracy: 0.783
Epoch 1/50 Train loss: 0.739.. Test loss: 0.654.. Test accuracy: 0.787
Epoch 1/50 Train loss: 0.760.. Test loss: 0.685.. Test accuracy: 0.789
Epoch 1/50 Train loss: 0.818.. Test loss: 0.701.. Test accuracy: 0.792
Epoch 1/50 Train loss: 0.812.. Test loss: 0.641.. Test accuracy: 0.796
Epoch 1/50 Train loss: 0.692.. Test loss: 0.632.. Test accuracy: 0.803
Epoch 1/50 Train loss: 0.732.. Test loss: 0.606.. Test accuracy: 0.803
Epoch 1/50 Train loss: 0.701.. Test loss: 0.635.. Test accuracy: 0.807
Epoch 

KeyboardInterrupt: 

In [15]:
model = POS_tagger(100, 100, len(word2idx), len(tag2idx))
model.load_state_dict(torch.load('pos_tagger2.pth'))
model.eval()
accuracy = 0

for sentence, tags in test_loader:
    tag_scores = model(sentence)
    ps = torch.exp(tag_scores)
    top_p, top_class = ps.topk(1, dim=1)
    equals = top_class == tags.view(*top_class.shape)
    accuracy += torch.mean(equals.type(torch.FloatTensor))
print(f"Test accuracy: {accuracy/len(test_loader):.3f}")

Test accuracy: 0.961


In [20]:
input = 'mary had a little lamb'
input = input.split()
input_new = []
for word in input:
    if word not in word2idx:
        input_new.append(word2idx['<UNK>'])
    else :
        input_new.append(word2idx[word])

input_new = torch.tensor(input_new, dtype=torch.long)
# print(input_new)
tag_scores = model(input_new)
# print(tag_scores)
ps = torch.exp(tag_scores)
top_p, top_class = ps.topk(1, dim=1)
# print(top_p)
idxtag = {v: k for k, v in tag2idx.items()}
for i in range(len(top_class)):
    print(idxtag[top_class[i][0].item()])


PRON
NOUN
DET
NOUN
NOUN
