In [None]:
from torchtext import data
from torchtext import datasets
import torch
import spacy
import random
import numpy as np

import time

import torch.nn as nn
import torch.nn.functional as F

import spacy
nlp = spacy.load('en')

In [None]:
questions = data.Field(tokenize = 'spacy', batch_first = True)
labels = data.LabelField(dtype = torch.float)

In [None]:
train_data, _ = datasets.TREC.splits(questions, labels)

train_data, valid_data = train_data.split()

In [None]:
train_data

In [None]:
train_data.examples[0].text

In [None]:
train_data.examples[0].label

In [None]:
print(len(train_data))
print(len(valid_data))

In [None]:
questions.build_vocab(train_data,
                 vectors = "glove.6B.200d", 
                 unk_init = torch.Tensor.normal_)

labels.build_vocab(train_data)

In [None]:
questions.vocab.vectors

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator = data.BucketIterator.splits(
    (train_data, valid_data), 
    batch_size = 64, 
    device = device)

In [None]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
        
        super().__init__()
                
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        self.convs = nn.ModuleList([
                                    nn.Conv2d(in_channels = 1, 
                                              out_channels = n_filters, 
                                              kernel_size = (fs, embedding_dim)) 
                                    for fs in filter_sizes
                                    ])
        
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
        
        emb = self.embedding(text).unsqueeze(1)
        
        conved = [F.relu(c(emb)).squeeze(3) for c in self.convs]
                
        pooled = [F.max_pool1d(c, c.shape[2]).squeeze(2) for c in conved]
        
        concat = self.dropout(torch.cat(pooled, dim = 1))
            
        return self.fc(concat)

In [None]:
input_dimensions = len(questions.vocab)
output_dimensions = 6
embedding_dimensions = 200
pad_index = questions.vocab.stoi[questions.pad_token]

number_of_filters = 100
filter_sizes = [2,3,4]
dropout_pc = 0.5


model = CNN(input_dimensions, embedding_dimensions, number_of_filters, 
            filter_sizes, output_dimensions, dropout_pc, pad_index)

In [None]:
glove_embeddings = questions.vocab.vectors

model.embedding.weight.data.copy_(glove_embeddings)

In [None]:
unknown_index = questions.vocab.stoi[questions.unk_token]

model.embedding.weight.data[unknown_index] = torch.zeros(embedding_dimensions)
model.embedding.weight.data[pad_index] = torch.zeros(embedding_dimensions)

In [None]:
optimizer = torch.optim.Adam(model.parameters())

criterion = nn.CrossEntropyLoss().to(device)

model = model.to(device)

In [None]:
def multi_accuracy(preds, y):
    pred = torch.max(preds,1).indices
    correct = (pred == y).float()
    acc = correct.sum() / len(correct)
    return acc

In [None]:
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        preds = model(batch.text).squeeze(1)
        loss = criterion(preds, batch.label.long())
        
        acc = multi_accuracy(preds, batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    total_epoch_loss = epoch_loss / len(iterator)
    total_epoch_accuracy = epoch_acc / len(iterator)
        
    return total_epoch_loss, total_epoch_accuracy

In [None]:
def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            preds = model(batch.text).squeeze(1)
            
            loss = criterion(preds, batch.label.long())
            
            acc = multi_accuracy(preds, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
            
    total_epoch_loss = epoch_loss / len(iterator)
    total_epoch_accuracy = epoch_acc / len(iterator)
        
    return total_epoch_loss, total_epoch_accuracy

In [None]:
epochs = 10

lowest_validation_loss = float('inf')

for epoch in range(epochs):

    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()
    
    if valid_loss < lowest_validation_loss:
        lowest_validation_loss = valid_loss
        torch.save(model.state_dict(), 'cnn_model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {int(end_time - start_time)}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [None]:
model.load_state_dict(torch.load('cnn_model.pt'))

In [None]:
def predict_sentiment(model, sentence, min_len = 5):
    
    tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
    if len(tokenized) < min_len:
        tokenized += ['<pad>'] * (min_len - len(tokenized))
    indexed = [questions.vocab.stoi[t] for t in tokenized]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(0)
    
    model.eval()
    prediction = torch.max(model(tensor),1).indices.item()
    pred_index = labels.vocab.itos[prediction]
    
    return pred_index

In [None]:
pred_class = predict_sentiment(model, "How many roads must a man walk down?")
print('Predicted class is: ' + str(pred_class))