In [1]:
import torch.nn.functional as F
from torch.autograd import Variable
import copy
from collections import Counter
import torch 
import numpy as np
import torch.nn as nn
import torchvision

In [2]:
def read_data(path):
    f = open(path, "r")
    lines = f.readlines()
    f.close()
    data = []
    for l in lines:
        labelSplit = l.replace('\n','').split(' ', 1)
        data.append([labelSplit[0], [word.lower() for word in labelSplit[1].split()]])
    return data

data = read_data('./questions.txt')
data_train = read_data('./training_data.txt')
data_test = read_data('./testing_data.txt')

In [3]:
def remove_stop_words(data, path):
    stop_words = []
    with open(path) as f:
        stop_words = [word for line in f for word in line.split(",")]
    data_without_stop_words = []
    for k, v in data:
        words = [t for t in v if t not in stop_words]
        data_without_stop_words.append((k, words))
    return data_without_stop_words


data = remove_stop_words(data, './stop_words.txt')
data_train = remove_stop_words(data_train, './stop_words.txt')
data_test = remove_stop_words(data_test, './stop_words.txt')

In [4]:
def get_labels(data):
    _labels = []
    for k,v in data:
        _labels.append(k)   
    _unique_label = list(set(_labels))
    _unique_label_dict = {}
    for k,v in enumerate(_unique_label):
        _unique_label_dict[v] = k
    return _unique_label_dict

labels = get_labels(data)
labels_train = get_labels(data_train)
labels_test = get_labels(data_test)


In [5]:
len(labels)

50

In [None]:
labels

In [6]:
def append_labels(data, labels):        
    cleaned_data = []
    for k,v in data:
        cleaned_data.append((labels[k],v))
        
    return np.array(cleaned_data)

data = append_labels(data, labels)
data_train = append_labels(data_train, labels_train)
data_test = append_labels(data_test,labels_test)

In [7]:
data[:5]

array([[35, list(['how', 'serfdom', 'develop', 'leave', 'russia', '?'])],
       [0,
        list(['what', 'films', 'featured', 'character', 'popeye', 'doyle', '?'])],
       [35,
        list(['how', 'find', 'list', 'celebrities', "'", 'real', 'names', '?'])],
       [46,
        list(['what', 'fowl', 'grabs', 'spotlight', 'chinese', 'year', 'monkey', '?'])],
       [12, list(['what', 'full', 'form', '.com', '?'])]], dtype=object)

In [8]:
data_test[:5]

array([[15, list(['what', 'chiricahua', 'name', '?'])],
       [27,
        list(['what', "'s", 'last', 'line', 'dickens', "'s", 'christmas', 'carol', '?'])],
       [31, list(['what', 'names', 'andrew', 'christina', 'mean', '?'])],
       [4,
        list(['where', 'song', 'anything', 'goes', 'take', 'place', '?'])],
       [29, list(['how', 'clean', 'cache', '?'])]], dtype=object)

In [9]:
data_train[:5]

array([[44, list(['how', 'many', 'varieties', 'twins', '?'])],
       [0,
        list(['what', 'tv', 'quiz', 'show', 'left', 'air', '1975', 'tune', 'vera', 'lynn', "'s", "'ll", 'meet', '?'])],
       [26, list(['what', 'oldest', 'profession', '?'])],
       [38, list(['what', 'feudal', 'system', '?'])],
       [0, list(['jude', 'law', 'what', 'movie', '?'])]], dtype=object)

In [10]:
def create_indexed_vocab(data):
    vocab = []
    for _, sent in data:
        for word in sent:
            vocab.append(word)
    count = Counter(vocab)
    count = {w : count[w] for w in count if count[w] >= 2}
    vocab = []
    for k, v in count.items():
        vocab.append(k)
    indexed_vocab = {word: idx for idx, word in enumerate(vocab)}
    return indexed_vocab
def create_vocab(data):
    total_words_orig = []
    for k,sent in data:
        for word in sent:
            total_words_orig.append(word)
    total_words = list(set(total_words_orig))
    total_words_str = ' '.join(total_words)
    vocab = set(total_words_str.split()) 
    word2idx = {word: idx for idx, word in enumerate(vocab)} # create word index
    return word2idx
word2idx=create_vocab(data)
word2idx['#UNK#'] = len(word2idx)
indexed_vocab = create_indexed_vocab(data)
indexed_vocab['#UNK#'] = len(indexed_vocab)


In [11]:
def load_glove_embeddings(path, indexed_vocab, embedding_dim=300):
    with open(path) as f:
        embeddings = np.zeros((len(indexed_vocab), embedding_dim))
        for line in f.readlines():
            values = line.split()
            word = values[0]
            index = indexed_vocab.get(word)
            if index:
                vector = np.array(values[1:], dtype='float32')
                embeddings[index] = vector
        return torch.from_numpy(embeddings).float()

glove_random = load_glove_embeddings('./glove.small.txt', indexed_vocab)
glove_pre = load_glove_embeddings('./glove.small.txt', word2idx)
embeddings_random = nn.Embedding(glove_random.size(0), glove_random.size(1))
embeddings_pretrained = nn.Embedding.from_pretrained(glove_pre, freeze=True)


In [None]:
# def split_train_test(data, test_ratio):
#     data_copy = copy.deepcopy(data)
#     np.random.shuffle(data_copy)
#     test_set_size = int(len(data) * test_ratio)
#     test = data_copy[:test_set_size]
#     train = data_copy[test_set_size:]
#     return train, test

# train, test = split_train_test(data, 0.2)
# train,dev =split_train_test(train,0.1)

In [12]:
data_train[:4]

array([[44, list(['how', 'many', 'varieties', 'twins', '?'])],
       [0,
        list(['what', 'tv', 'quiz', 'show', 'left', 'air', '1975', 'tune', 'vera', 'lynn', "'s", "'ll", 'meet', '?'])],
       [26, list(['what', 'oldest', 'profession', '?'])],
       [38, list(['what', 'feudal', 'system', '?'])]], dtype=object)

In [13]:
class BOWClassifier(nn.Module):
    def __init__(self,input_size,hidden_size, num_labels):
        super(BOWClassifier, self).__init__()
        self.input_size = input_size
        self.hidden_size  = hidden_size
        self.fc1 = torch.nn.Linear(self.input_size, self.hidden_size)
        self.relu = torch.nn.ReLU()
        self.fc2 = torch.nn.Linear(self.input_size, num_labels)
        self.sigmoid = torch.nn.Sigmoid()
        
    def forward(self, x):
        hidden = self.fc1(x)
        relu = self.relu(hidden)
        output = self.fc2(x)
        #output = self.sigmoid(output)
        return output

In [None]:
def make_bow_vector(sentence, indexed_vocab,embeddings):
    pt_tensor= torch.zeros(300, dtype=torch.long)
    count = 0
    for word in sentence:
        count += 1
        if word in indexed_vocab:
            pt_tensor = torch.add(pt_tensor, embeddings(torch.LongTensor([indexed_vocab[word]]))[0])
        else:
            pt_tensor = torch.add(pt_tensor, embeddings(torch.LongTensor([indexed_vocab['#UNK#']]))[0])
    pt_tensor=torch.div(pt_tensor, count)
    return pt_tensor

def get_bow_rep(data,word2idx,embeddings):
    bow_data = []
    for label, sent in data:
        bow_data.append(make_bow_vector(sent, word2idx,embeddings))
    return torch.stack(bow_data)
        
training_set = get_bow_rep(data_train,word2idx,embeddings_pretrained)
training_set_rand = get_bow_rep(data_train,indexed_vocab,embeddings_random)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_size = 300
hidden_size = 100
num_classes = 50
num_epochs = 350
learning_rate = 0.0001
batch_size = 100

model = BOWClassifier(input_size, hidden_size, num_classes).to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) 
#training in batches
for epoch in range(num_epochs):
    permutation = torch.randperm(training_set.size()[0])
    for i in range(0, training_set.size()[0], batch_size):
        optimizer.zero_grad()
        indices = permutation[i:i + batch_size]
        batch_features = training_set[indices]
        batch_features = batch_features.reshape(-1, 300).to(device)
        batch_labels = torch.LongTensor([label for label, sent in data_train[indices]]).to(device)
        batch_outputs = model(batch_features)
        loss = criterion(batch_outputs, batch_labels)
        loss.backward()
        optimizer.step()
    print(loss.item())
    print(epoch)

In [None]:
#testing
test_len =len(data_test)
correct=0
for label,data in data_test:
    bow_vec = make_bow_vector(data, word2idx,embeddings_pretrained)
    logprobs = model(bow_vec)
    logprobs = F.softmax(logprobs)
    pred = np.argmax(logprobs.data.numpy())
    if pred==label:
        correct+=1
    print('prediction: {}'.format(pred))
    print('actual: {}'.format(label))
accuracy = correct/test_len
print('accuracy: {}'.format(accuracy))

In [None]:
class LSTMTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim*2, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        # Decode the hidden state of the last time step
        tag_space = self.hidden2tag(lstm_out[-1, :])
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

In [None]:
#LSTM training
vocab_size=glove_random.size(0)
embedding_dim=glove_random.size(1)
tagset_size=51
hidden_dim=300
learning_rate=0.0001
ModelLSTM = LSTMTagger(embedding_dim,hidden_dim,vocab_size,tagset_size)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(ModelLSTM.parameters(), lr=learning_rate)
num_epochs=2
for epoch in range(num_epochs):
    for label,sent in data_train:
        ids=[]
        unk = 3375
        for word in sent:
            index =indexed_vocab.get(word)
            if index:
                ids.append([indexed_vocab[word]])
            else:
                ids.append([unk])
        batch_labels=torch.tensor([label])
        sentence=torch.tensor(ids).squeeze()
        optimizer.zero_grad()
        batch_outputs = ModelLSTM(sentence)
        loss = criterion(batch_outputs, batch_labels)
        loss.backward()
        optimizer.step()
        print("loss",loss.item())
    print("epoch",epoch)

In [None]:
#LSTM testing
test_len =len(data_test)
with torch.no_grad():
    correct = 0
    for label,sent in data_test:
        ids=[]
        unk = 3375
        labels=[]
        for word in sent:
            index =indexed_vocab.get(word)
            if index:
                ids.append([indexed_vocab[word]])
            else:
                ids.append([unk])
        sentence=torch.tensor(ids).squeeze()
        batch_outputs = ModelLSTM(sentence)
        predicted = np.argmax(F.softmax(batch_outputs).data.numpy())
        if predicted==label:
             correct+=1
        print('prediction: {}'.format(predicted))
        print('actual: {}'.format(label))
    accuracy = correct/test_len
    print('accuracy: {}'.format(accuracy))

In [14]:
len(data_train)

4416