In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split

import re
import pandas as pd
import numpy as np

In [34]:
glove_dir = './glove.6B.300d.txt'
input_dim = 300

vocab = {}
with open(glove_dir, encoding='utf8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        vector = np.asarray(values[1:], "float32")
        vocab[word] = vector
f.close()

print('Found %s word vectors.' %len(vocab))

Found 400000 word vectors.


In [35]:
max_len = 50
zero_padding = [0]*input_dim

def get_embeddings(text, emb="LSTM"):
    if emb == "DNN":
        embedding = [0]*input_dim
        i = 0
        for word in text.split(' '):
            if word in vocab:
                i += 1
                embedding += vocab[word]

        if i != 0:
            embedding /= i
    elif emb == "LSTM":
        embedding = []
        i = 0
        for word in text.split(' '):
            if i == max_len:
                break
            if word in vocab:
                i += 1
                embedding.append(vocab[word])
        
        while i < max_len:
            i += 1
            embedding.append(zero_padding)
            
    return embedding

In [36]:
def clean(text):
    """ Function to clean the text """
    text = text.lower()
    text = re.sub(r'^https?:\/\/.*[\r\n]*', '', text, flags=re.MULTILINE)
    texter = re.sub(r"<br />", " ", text)
    texter = re.sub(r"&quot;", "\"",texter)
    texter = re.sub('&#39;', "\"", texter)
    texter = re.sub('\n', " ", texter)
    texter = re.sub(' u '," you ", texter)
    texter = re.sub('`',"", texter)
    texter = re.sub(' +', ' ', texter)
    texter = re.sub(r"(!)\1+", r"!", texter)
    texter = re.sub(r"(\?)\1+", r"?", texter)
    texter = re.sub('&amp;', 'and', texter)
    texter = re.sub('\r', ' ',texter)
    
    # Remove numbers from string
    texter = re.sub(pattern=r"[+-]?\d+(?:\.\d+)?", repl="", string=texter, count=0, flags=0)
    texter = texter.replace("  ", " ")
    clean = re.compile('<.*?>')
    texter = texter.encode('ascii', 'ignore').decode('ascii')
    texter = re.sub(clean, '', texter)
    texter = re.sub(r'[^\w\s]', '', texter)
    if texter == "":
        texter = ""
    return texter

In [37]:
def transform(X, emb="LSTM"):
    embeddings = []
    for item in X:
        item = clean(item)
        embedding = get_embeddings(item, emb)
        embeddings.append(embedding)
    
    return embeddings

In [38]:
# Hyperparameters
num_epochs = 10
batch_size = 128
learning_rate = 0.0001
dropout = 0.25

In [39]:
inputs_file = "./tweet.text"
labels_file = "./tweet_labels.txt"

X = []
with open(inputs_file, encoding='utf8') as f:
    for line in f:
        X.append(line)
f.close()
# print(len(X))

y = []
with open(labels_file, encoding='utf8') as f:
    for line in f:
        y.append(int(line))
f.close()

In [40]:
X = transform(X, "DNN")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=42)

X_tr = torch.tensor(X_train, dtype=torch.float)
y_tr = torch.tensor(y_train)
train = TensorDataset(X_tr, y_tr)
trainloader = DataLoader(train, batch_size=batch_size)

X_te = torch.tensor(X_test, dtype=torch.float)
y_te = torch.tensor(y_test)
test = TensorDataset(X_te, y_te)
testloader = DataLoader(test)

In [50]:
class Net(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(300, 500)
        self.hidden1 = nn.Linear(500, 500)
        self.hidden2 = nn.Linear(500, 500)
        self.fc2 = nn.Linear(500, 20)
        self.dropout = nn.Dropout(0.25)
        
        self.batchnorm1 = nn.BatchNorm1d(500)
        self.batchnorm2 = nn.BatchNorm1d(500)
        self.batchnorm3 = nn.BatchNorm1d(500)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.batchnorm1(x)
        x = self.dropout(x)
        x = F.relu(self.hidden1(x))
        x = self.batchnorm2(x)
        x = self.dropout(x)
        x = F.relu(self.hidden2(x))
        x = self.batchnorm3(x)
        x = self.dropout(x)
#         x = self.fc2(x)
        x = F.softmax(self.fc2(x), dim=1)
        return x

net = Net()

In [51]:
# Loss function
criterion = nn.CrossEntropyLoss()

# create your optimizer
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

In [52]:
# Training
net.train()
for epoch in range(num_epochs):
    for i, data in enumerate(trainloader):
        inputs, labels = data

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 500 == 0:
            print ('Epoch [%d/%d], Step [%d/%d], Loss: %.4f'
                   %(epoch+1, num_epochs, i+1, len(trainloader), loss.data))

Epoch [1/10], Step [500/2344], Loss: 2.7786
Epoch [1/10], Step [1000/2344], Loss: 2.7783
Epoch [1/10], Step [1500/2344], Loss: 2.7620
Epoch [1/10], Step [2000/2344], Loss: 2.7433
Epoch [2/10], Step [500/2344], Loss: 2.7659
Epoch [2/10], Step [1000/2344], Loss: 2.7650
Epoch [2/10], Step [1500/2344], Loss: 2.7352
Epoch [2/10], Step [2000/2344], Loss: 2.7364
Epoch [3/10], Step [500/2344], Loss: 2.7440
Epoch [3/10], Step [1000/2344], Loss: 2.7417
Epoch [3/10], Step [1500/2344], Loss: 2.7530
Epoch [3/10], Step [2000/2344], Loss: 2.7242
Epoch [4/10], Step [500/2344], Loss: 2.7437
Epoch [4/10], Step [1000/2344], Loss: 2.7408
Epoch [4/10], Step [1500/2344], Loss: 2.7407
Epoch [4/10], Step [2000/2344], Loss: 2.7133
Epoch [5/10], Step [500/2344], Loss: 2.7521
Epoch [5/10], Step [1000/2344], Loss: 2.7425
Epoch [5/10], Step [1500/2344], Loss: 2.7418
Epoch [5/10], Step [2000/2344], Loss: 2.7152
Epoch [6/10], Step [500/2344], Loss: 2.7626
Epoch [6/10], Step [1000/2344], Loss: 2.7569
Epoch [6/10], St

In [53]:
# Testing
net.eval()
outputs = net(X_te)

_, predicted = torch.max(outputs, 1)

total += y_te.size(0)
correct += (predicted == y_te).sum()

print(f'Accuracy of the model is: {100*correct/total:.2f}%')

Accuracy of the model is: 28.97%


In [11]:
## LSTM
X = transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=42)

X_tr = torch.tensor(X_train, dtype=torch.float)
y_tr = torch.tensor(y_train)
train = TensorDataset(X_tr, y_tr)
trainloader = DataLoader(train, batch_size=batch_size)

X_te = torch.tensor(X_test, dtype=torch.float)
y_te = torch.tensor(y_test)
test = TensorDataset(X_te, y_te)
testloader = DataLoader(test)

print(len(y_tr))
print(X_tr.size())

  X_tr = torch.tensor(X_train, dtype=torch.float)


299936
torch.Size([299936, 40, 50])


In [20]:
# Hyperparameters
num_epochs = 3
batch_size = 256
learning_rate = 0.0005
dropout = 0.5

In [21]:
class LSTM(nn.Module): 
    def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length, dropout):
        super(LSTM, self).__init__()
        self.num_classes = num_classes #number of classes
        self.num_layers = num_layers #number of layers
        self.input_size = input_size #input size
        self.hidden_size = hidden_size #hidden state
        self.seq_length = seq_length #sequence length

        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                          num_layers=num_layers, dropout=dropout, batch_first=True) #lstm
        self.fc1 =  nn.Linear(hidden_size, 128) #fully connected 1
        self.fc2 = nn.Linear(128, num_classes) #fully connected last layer

    def forward(self, x):
        h_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) #hidden state
        c_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        
        # Propagate input through LSTM
        x, _ = self.lstm(x, (h_0, c_0)) #lstm with input, hidden, and internal state
        
        # Flatten lstm output
        x = x[:, -1, :]
        
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=1)
        return x

lstm = LSTM(20, 50, 128, 2, 50, dropout)

In [22]:
# Loss function
criterion = nn.CrossEntropyLoss()

# create your optimizer
optimizer = optim.Adam(lstm.parameters(), lr=learning_rate)

In [23]:
# Training
for epoch in range(num_epochs):
    running_loss = 0
    for i, data in enumerate(trainloader):
        inputs, labels = data

        outputs = lstm(inputs)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        optimizer.zero_grad()

        running_loss += loss.item()
        
        if (i+1) % 250 == 0:
            print ('Epoch [%d/%d], Step [%d/%d], Loss: %.4f'
                   %(epoch+1, num_epochs, i+1, len(X_tr)//batch_size, loss.data))

Epoch [1/3], Step [100/2343], Loss: 2.8438
Epoch [1/3], Step [200/2343], Loss: 2.8750
Epoch [1/3], Step [300/2343], Loss: 2.8594
Epoch [1/3], Step [400/2343], Loss: 2.9219
Epoch [1/3], Step [500/2343], Loss: 2.8516
Epoch [1/3], Step [600/2343], Loss: 2.9219
Epoch [1/3], Step [700/2343], Loss: 2.8360
Epoch [1/3], Step [800/2343], Loss: 2.8516
Epoch [1/3], Step [900/2343], Loss: 2.8360
Epoch [1/3], Step [1000/2343], Loss: 2.8828
Epoch [1/3], Step [1100/2343], Loss: 2.8750
Epoch [1/3], Step [1200/2343], Loss: 2.8360
Epoch [1/3], Step [1300/2343], Loss: 2.8438
Epoch [1/3], Step [1400/2343], Loss: 2.9063
Epoch [1/3], Step [1500/2343], Loss: 2.8594
Epoch [1/3], Step [1600/2343], Loss: 2.8672
Epoch [1/3], Step [1700/2343], Loss: 2.9375
Epoch [1/3], Step [1800/2343], Loss: 2.9063
Epoch [1/3], Step [1900/2343], Loss: 2.8203
Epoch [1/3], Step [2000/2343], Loss: 2.7500
Epoch [1/3], Step [2100/2343], Loss: 2.8360
Epoch [1/3], Step [2200/2343], Loss: 2.8672
Epoch [1/3], Step [2300/2343], Loss: 2.88

In [25]:
# Testing
total, correct = 0, 0
with torch.no_grad():
    for data in testloader:
        inputs, labels = data

        outputs = lstm(inputs)

        _, predicted = torch.max(outputs, 1)

        total += labels.size(0)
        correct += (predicted == labels).sum()

print(f'Accuracy of the model is: {100*correct/total:.2f}%')

Accuracy of the model is: 21.85%
