<a href="https://colab.research.google.com/github/JunQuann/CNN-BiLSTM-POSTagger/blob/master/CNN_BiLSTM_POStagger.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import os
import math
import sys
import random
import time
import pickle

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [0]:
from google.colab import drive
drive.mount('/gdrive')

In [0]:
%cd '/gdrive/My Drive/released'

In [0]:
char2Idx = {"PAD":0, "UNK":1}
for c in " 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.,-_()[]{}!?:;#'\"/\\%$`&=*+@^~|":
    char2Idx[c] = len(char2Idx)

In [0]:
CHAR_EMBEDDING_DIM = 30
char_embedding_weights = torch.FloatTensor(len(char2Idx), CHAR_EMBEDDING_DIM).uniform_(-math.sqrt(3/CHAR_EMBEDDING_DIM), math.sqrt(3/CHAR_EMBEDDING_DIM))
char_embedding_weights[0] = torch.zeros(CHAR_EMBEDDING_DIM)

In [0]:
WORD_EMBEDDING_DIM = 100 #glove.6b.50d
word2Idx_pkl_path = 'w2i.6B.100d.pkl'
weembed_pkl_path = 'wembed.6B.100d.pkl'

if os.path.exists(word2Idx_pkl_path) and os.path.exists(weembed_pkl_path):
    word2Idx = pickle.load(open(word2Idx_pkl_path, 'rb'))
    word_embedding_weights = pickle.load(open(weembed_pkl_path, 'rb'))

else:
    word2Idx = {}
    word_embedding_weights = []
    with open("glove.6B.100d.txt", 'r') as f:
        for line in f:
            values = line.strip().split()
            word = values[0]
            
            if len(word2Idx) == 0: #Add unknown
                word2Idx["UNK"] = len(word2Idx)
                vector = torch.FloatTensor(1, WORD_EMBEDDING_DIM).uniform_(-math.sqrt(3/WORD_EMBEDDING_DIM), math.sqrt(3/WORD_EMBEDDING_DIM))
                word_embedding_weights.append(vector)

            vector = [float(num) for num in values[1:]]
            word_embedding_weights.append(torch.tensor(vector, dtype=torch.float32).reshape(1,-1))
            word2Idx[word] = len(word2Idx)
            
    word_embedding_weights = torch.cat(word_embedding_weights)

In [0]:
pickle.dump(word2Idx, open('w2i.6B.100d.pkl', 'wb'))
pickle.dump(word_embedding_weights, open('wembed.6B.100d.pkl', 'wb'))

In [0]:
class CNN_BiLSTM(nn.Module):
    def __init__(self, 
                 char_embedding_weights, 
                 word_embedding_weights, 
                 tagset_size, 
                 num_filters=30,
                 kernel_size=3,
                 hidden_dim=200,
                 num_layers=1,
                 dropout=0.5
                ):
        super(CNN_BiLSTM, self).__init__()
        self.kernel_size = kernel_size
        self.device = torch.device("cuda:0" if torch.cuda.is_available else "cpu")
        self.word_embedding = nn.Embedding.from_pretrained(word_embedding_weights, freeze=False)
        self.char_embedding = nn.Embedding.from_pretrained(char_embedding_weights, freeze=False)
        
        self.conv_layer = nn.Sequential(
                        nn.Dropout(p=dropout),
                        nn.Conv1d(self.char_embedding.embedding_dim, num_filters, kernel_size),
                        nn.AdaptiveMaxPool1d(1)
        )
        
        self.dropout_char_rep = nn.Dropout(p=dropout)
        
        self.bi_lstm = nn.LSTM(self.word_embedding.embedding_dim + num_filters, hidden_dim, 
                               num_layers=num_layers, bidirectional=True)
        self.hidden2tag = nn.Linear(hidden_dim * num_layers * 2, tagset_size)
        
        
    def forward(self, sentence, words):
        embeds = self.word_embedding(sentence)
        char_hidden_final = []
        
        for word in words:
            pad = torch.zeros(self.kernel_size // 2, dtype=torch.long, device=self.device)
            padded_word = torch.cat((pad, word, pad))
            char_embeds = self.char_embedding(padded_word)
            char_representation = self.conv_layer(char_embeds.view(1, -1, len(char_embeds)))
            char_hidden_final.append(char_representation.view(-1))
            
        char_hidden_final = torch.stack(tuple(char_hidden_final))
        
        combined = torch.cat((embeds, char_hidden_final), 1)
        
        dropout_combined = self.dropout_char_rep(combined)
        lstm_out, _ = self.bi_lstm(dropout_combined.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

In [0]:
def str2tuple(s, sep='/'):
    loc = s.rfind(sep)
    if loc >= 0:
        return (s[:loc], s[loc + len(sep) :].upper())
    else:
        return (s, None) 
    
def prepare_sequence(seq, to_ix):
    idxs = []
    for w in (seq):
        try:
            idxs.append(to_ix[w])
        except KeyError:
            idxs.append(to_ix['UNK'])
        
    return torch.tensor(idxs, dtype=torch.long)

In [0]:
tag2idx = {}
tagged_sentences = []

    
with open('sents.train', 'r') as f:
    for line in f:
        tagged_words = [str2tuple(word) for word in line.split()]
        tagged_sentences.append(tagged_words)
        for tagged_word in tagged_words:

            tag = tagged_word[1]
            if tag not in tag2idx:
                tag2idx[tag] = len(tag2idx)

word_embedding_weights = torch.FloatTensor(len(word2Idx), WORD_EMBEDDING_DIM).uniform_(-math.sqrt(3/WORD_EMBEDDING_DIM), math.sqrt(3/WORD_EMBEDDING_DIM))

In [0]:
NUM_FILTER = 30
KERNEL_SIZE = 3
HIDDEN_DIM = 200
NUM_LAYERS = 1
DROPOUT = 0.5
EPOCH = 5
SUBSAMPLE_SIZE = 1
LEARNING_RATE = 0.01
DECAY_RATE = 0.05

In [0]:
model = CNN_BiLSTM(char_embedding_weights, word_embedding_weights, len(tag2idx), num_filters=NUM_FILTER, 
                   kernel_size=KERNEL_SIZE, hidden_dim=HIDDEN_DIM, num_layers=NUM_LAYERS, dropout=DROPOUT)

saved_model_path = 'CNN_BiLSTM.pt'
if os.path.exists(saved_model_path):
  model.load_state_dict(torch.load(saved_model_path))

use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")

if use_cuda:
    model.cuda()

loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.9)

# The test sentence
check = 'Just a random sentence .'.split()
print("Running a check on the model before training.\nSentences:\n{}".format(" ".join(check)))
with torch.no_grad():
    words = [prepare_sequence(s, char2Idx).to(device) for s in check]
    sentence = prepare_sequence(map(lambda word: word.lower(), check), word2Idx).to(device)
        
    tag_scores = model(sentence, words)
    _, indices = torch.max(tag_scores, 1)
    ret = []
    for i in range(len(indices)):
        for key, value in tag2idx.items():
            if indices[i] == value:
                ret.append((check[i], key))
    print(ret)

lambda1 = lambda epoch: 1 / (1 + DECAY_RATE * epoch)
scheduler = optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda1)
model = model.train()
start = time.time()
for epoch in range(EPOCH):
    subsample_tagged_sentences = random.sample(tagged_sentences, int(SUBSAMPLE_SIZE * len(tagged_sentences)))
    
    for i, tagged_sentence in enumerate(subsample_tagged_sentences):
        
        seq = [tagged_word[0] for tagged_word in tagged_sentence]
        tags = [tagged_word[1] for tagged_word in tagged_sentence]
        words = [prepare_sequence(s, char2Idx).to(device) for s in seq]
        sentence = prepare_sequence(map(lambda word: word.lower(), seq), word2Idx).to(device)
        targets = prepare_sequence(tags, tag2idx).to(device)

        model.zero_grad()

        tag_scores = model(sentence, words)
    
        loss = loss_function(tag_scores, targets)
        loss.backward()
        optimizer.step()
    
    scheduler.step()
    epoch_end_time = time.time()
    print('Epoch %d completed in %d minutes %d seconds' % (epoch + 1, (epoch_end_time - start) // 60, (epoch_end_time - start) % 60))

        
# See what the scores are after training
with torch.no_grad():
    words = [prepare_sequence(s, char2Idx).to(device) for s in check]
    sentence = prepare_sequence(map(lambda word: word.lower(), check), word2Idx).to(device)
        
    tag_scores = model(sentence, words)
    _, indices = torch.max(tag_scores, 1)
    ret = []
    for i in range(len(indices)):
        for key, value in tag2idx.items():
            if indices[i] == value:
                ret.append((check[i], key))
    print(ret)

In [0]:
torch.save(model.state_dict(), 'CNN_BiLSTM.pt')

In [0]:
test_sentences = []

with open('sents.test', 'r') as f:
    for line in f:
        test_seq = line.split()
        test_sentences.append(test_seq)

In [0]:
test_tagged = []

model = model.eval()
for test_sentence in test_sentences:
    words = [prepare_sequence(s, char2Idx).to(device) for s in test_sentence]
    sentence = prepare_sequence(map(lambda word: word.lower(), test_sentence), word2Idx).to(device)

    tag_scores = model(sentence, words)
    _, indices = torch.max(tag_scores, 1)
    ret = []
    for i in range(len(indices)):
        for key, value in tag2idx.items():
            if indices[i] == value:
                ret.append((test_sentence[i], key))
    test_tagged.append(ret)

In [0]:
with open('sents.out', 'w') as f:
    for sentence in test_tagged:
        formatted_sentence = ' '.join(map(lambda t: '%s%s%s' % (t[0], '/', t[1]), sentence))
        f.write(formatted_sentence + '\n')

In [0]:
%run eval.py sents.out sents.answer