In [1]:
import torch
import torch.utils.data
import torch.nn as nn
import torch.nn.functional as F
import re
import numpy as np
import os as os
import pandas as pd
import pickle
import random

from operator import itemgetter

In [2]:
#load data and labels 
with open('tweet_embeddings25k.pkl', 'rb') as f:
    dataset = pickle.load(f)
    
with open('labels25k.pkl', 'rb') as f:
    labels = pickle.load(f)
#converting to tensor/adding labels
dataset = torch.stack(dataset)
labels = torch.tensor(labels.tolist())
dataset = torch.utils.data.TensorDataset(labels, dataset)

In [9]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

train_labels = train_dataset[:][0]
train_data = train_dataset[:][1]

test_labels = test_dataset[:][0]
test_data = test_dataset[:][1]

# check balance of dataset
# TODO: check in 'Training Model' if I actually need to 
# convert to labels to list or leave as tensor
train_labels = train_labels.tolist()
test_labels = test_labels.tolist()
print(train_labels.count(0))
print(train_labels.count(1))
print(test_labels.count(0))
print(test_labels.count(1))


10043
9957
2456
2544


# Model Definition

In [10]:
class CLSTM(nn.Module):
    def __init__(self):
        super(CLSTM,self).__init__()
        # expects input tensor of [1,1,768]
        # 1 input channel, 20 output channels, 1x5 convolution with padding
        self.conv1 = nn.Conv1d(1,20,5,stride = 2,padding = 2)
        # 20 input channel, 10 output channels, 1x5 convolution
        self.conv2 = nn.Conv1d(20,10,5,stride = 2,padding = 2)
        # end up with tensors [1,10,192]
        # these turn into tensors [1,10,48] after pooling which 
        # are turned into a list of 10 [1,48] tensors to pass into the lstm
        # each of these 10 will be taken in as token representations
        
        self.lstm = nn.LSTM(
            input_size = 48,hidden_size = 48,num_layers = 2,bidirectional = True)
        # the lstm will output tuple of ([4,1,48],[4,1,48]) where the first entry is
        # the hidden state for classification and the second entry is the cell state

        self.fc1 = nn.Linear(4*48,120)
        self.fc2 = nn.Linear(120,40)
        self.fc3 = nn.Linear(40,1)
    
    def forward(self,x):
        #set initial hidden state and cell state for lstm
        # 2 is for the number of layers and the other 2 is because of bidirection
        hidden = torch.randn(2*2,1,48)
        cell = torch.randn(2*2,1,48)
        
        x = F.max_pool1d(F.relu(self.conv1(x)),kernel_size = 2,stride = 2)
        x = F.max_pool1d(F.relu(self.conv2(x)),kernel_size = 2,stride = 2)
        # x is now a tensor of shape [1,10,48] because we did the pooling
        
        # now put the "tokens" into a list
        tweet_representation = []
        for i in range(len(x[0])):
            tweet_representation.append(x[0][i].unsqueeze(0))
        # tweet_representation is a list of the ten [1,48] tensors to
        # be passed into the lstm
        for j in tweet_representation:
            _, hidden_tuple = self.lstm(j.view(1,1,-1),(hidden,cell)) 

        # hidden state tensor [4,1,48]
        hidden_state = hidden_tuple[0]
        # flatten the hidden_state to tensor [192]
        x = hidden_state.reshape(-1)
        x = F.relu(self.fc1(x))
        # x is now 1 x 120
        x = F.relu(self.fc2(x))
        # x is now 1 x 30
        x = self.fc3(x)
        # x is now a single predicted class
        return torch.sigmoid(x)
model = CLSTM()
print(model)

CLSTM(
  (conv1): Conv1d(1, 20, kernel_size=(5,), stride=(2,), padding=(2,))
  (conv2): Conv1d(20, 10, kernel_size=(5,), stride=(2,), padding=(2,))
  (lstm): LSTM(48, 48, num_layers=2, bidirectional=True)
  (fc1): Linear(in_features=192, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=40, bias=True)
  (fc3): Linear(in_features=40, out_features=1, bias=True)
)


In [11]:
# small test of the model---------------
inp = torch.randn(1,1,768)
out = model(inp)
print(out)

tensor([0.5358], grad_fn=<SigmoidBackward>)


# Training Model

In [15]:
# criterion = nn.BCELoss()
# optimizer = torch.optim.Adam(model.parameters(),lr = .0005)

# #10 is the number of epochs
# total_step = len(train_data)
# for epoch in range(5):
#     running_loss = 0.0
#     for i, data in enumerate(train_data,0):
#         #inputs.shape is [1,1,768]
#         inputs = data
#         #label is int (1 or 0)
#         label = train_labels[i]
#         label = torch.tensor([float(label)])
#         optimizer.zero_grad()
#         inputs = inputs.unsqueeze(0).unsqueeze(0)
#         output = model(inputs)
#         loss = criterion(output,label)
#         loss.backward()
#         optimizer.step()
        
#         #print stats
#         running_loss +=loss.item()
#         if (i+1) % 100 == 0:
#             print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Output [{}], True label [{}]' 
#                    .format(epoch+1, 100, i+1, total_step, loss.item(),output.item(),label.item()))
#             running_loss = 0.0
# print('finished training')

# Testing Model

In [14]:
correct = 0
total = 0
with torch.no_grad():
    for i, data in enumerate(test_data,0):
        #inputs.shape is [1,1,768]
        inputs = data
        #label is int (1 or 0)
        label = test_labels[i]
        label = torch.tensor([float(label)])
        inputs = inputs.unsqueeze(0).unsqueeze(0)
        output = model(inputs)
        if(output >=.5):
            output = 1
        else:
            output = 0
        total+=1
        correct+= int(output == label)
        
print('Accuracy of the model on 5000 test tweets: %d %%' % (100*correct/total))


Accuracy of the model on 5000 test tweets: 86 %
