In [19]:
import torch
import torch.nn
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as td
import nltk
from nltk import word_tokenize
import json
import numpy as np
import pandas as pd
import re


In [20]:
def read_file(data):
    with open(data, 'r') as f:
        file = json.load(f)
    train_data=pd.read_json(data)
    length=len(file)
    X_train_dataset=[]
    Y_train_dataset=[0]*(int(0.2*length))
    
    #print(train_data)
    for i in range(int(0.2*length)):
        text = re.sub(r'^https?:\/\/.*[\r\n]*', '', train_data.iloc[0][i], flags=re.MULTILINE)
        X_train_dataset.append(text)
        emotion=train_data.iloc[6][i]
        #print(emotion)
        p=0
        for em in emotion.keys():
            if emotion[em]==True:
                #print(em)
                Y_train_dataset[i]=em
                p+=1
        #print(Y_train_dataset[i],'---------------')
    X_test_dataset=[]
    Y_test_dataset=[0]*(length-int(0.9*length))
    
    #print(train_data)
    q=0
    for i in range(int(0.9*length),length):
        text = re.sub(r'^https?:\/\/.*[\r\n]*', '', train_data.iloc[0][i], flags=re.MULTILINE)
        X_test_dataset.append(text)
        emotion=train_data.iloc[6][i]
        #print(emotion)
        p=0
        for em in emotion.keys():
            if emotion[em]==True:
                #print(em)
                Y_test_dataset[q]=em
                p+=1
        q+=1
    return X_train_dataset,Y_train_dataset,X_test_dataset,Y_test_dataset
    #print(X_train_dataset)
    #print(Y_train_dataset)
    
    
X_dataset,Y_dataset,X_test,Y_test=read_file('train.json')   

In [21]:
def read_glove(glove):
    f= open(glove, 'r')
    words = set()
    word_2_vec = {}
    for l in f:
        l = l.strip().split()
        curr_word = l[0]
        words.add(curr_word)
        word_2_vec[curr_word] = np.array(l[1:], dtype=np.float64)

    i = 1
    words_to_index = {}
    index_to_words = {}
    for w in sorted(words):
        words_to_index[w] = i
        index_to_words[i] = w
        i = i + 1
    return words_to_index, index_to_words, word_2_vec
word_to_index,index_to_words,word_2_vec=read_glove("glove.6B.50d.txt")

In [22]:
from nltk.tokenize.treebank import TreebankWordTokenizer
tokenizer = TreebankWordTokenizer()

symbols_to_delete='.,?!-;*"'+"…:—()'%#$&_/@＼・ω+=”“[]^–>\\0123456789"
#symbols_to_delete='http'
#isolate_dict = {ord(c):f' {c} ' for c in symbols_to_isolate}
remove_dict = {ord(c):f'' for c in symbols_to_delete}


def handle_punctuation(x):
    x = x.translate(remove_dict)
    #x = x.translate(isolate_dict)
    return x

def handle_contractions(x):
    x = tokenizer.tokenize(x)
    return x
def preprocess(x):
    x = handle_punctuation(x)
    x = handle_contractions(x)
    s = ''
    s = s.join(x)
    return s

In [23]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
emotion_dict = {
        "joy":1,
        "anger":2,
        "disgust":3,
        "anticipate":4,
        "fear":5,
        "optimism":6,
        "pessimism":7,
        "sadness":8,
        "surprise":9,
        "trust":10,
        "neutral":11,
        "love":0,   
}

#function is used to convert the senetences in the dataset to indices that was stored in word_to_index using glove
def dataset_to_index(X_dataset, word_to_index, max_len):
    
    num_examples = len(X_dataset)  # number of training examples
    #print(num_examples,maxLen)
    # Initialize X_indices as a numpy matrix of zeros and the correct shape
    X_word_to_index = np.zeros((num_examples,max_len))
    #print(X_word_to_index.shape)
    for i in range(num_examples):  # loop over training examples
        
        # Convert the ith sentence in lower case and split into a list of words
        sentence_words = X_dataset[i].lower().split()
        #print(sentence_words)
        # Initialize j to 0
        j = 0
        
        # Loop over the words of sentence_words
        for w in sentence_words:
            # Set the (i,j)th entry of X_indices to the index of the correct word.
            w=preprocess(w)
            #print(w)
            if w in word_to_index:
                X_word_to_index[i][j] = word_to_index[w]
                #print(i,j,X_word_to_index[i][j],word_to_index[w])
            else:
                X_word_to_index[i][j] =0
                #print(i,j,X_word_to_index[i][j])
            # Increment j to j + 1
            
            j = j + 1
    
    return X_word_to_index
def ans_to_index(Y_dataset):
    rows= len(Y_dataset) 
    Y = [0]*rows
    p,q=0,0
    for i in Y_dataset:
        q=0
        #print(p,q)
        #print (w)
        if i in emotion_dict:
            Y[p]=emotion_dict[i]
            q+=1
        p+=1    
    return Y

#Y_train_indices=ans_to_index(Y_dataset)
#print(Y_train_indices)

In [24]:
def pretrained_embedding_layer(word_2_vec, word_to_index, non_trainable=True):
    num_embeddings = len(word_to_index) + 1                   
    input_dim = word_2_vec["first"].shape[0]  #  dimensionality of GloVe word vectors (= 50)

    # Initialize the embedding matrix as a numpy array of zeros of shape (num_embeddings, embedding_dim)
    weights_matrix = np.zeros((num_embeddings, input_dim))

    # Set each row "index" of the embedding matrix to be the word vector representation of the "index"th word of the vocabulary
    for word, index in word_to_index.items():
        weights_matrix[index, :] = word_2_vec[word]

    embed = nn.Embedding.from_pretrained(torch.from_numpy(weights_matrix).type(torch.FloatTensor), freeze=non_trainable)

    return embed, num_embeddings, input_dim
embedding, vocab_size, input_dim = pretrained_embedding_layer(word_2_vec, word_to_index, non_trainable=True)


In [25]:
#import torch.nn 

class NN(nn.Module):
  def __init__(self, embedding, input_dim, hidden_dim, vocab_size, output_dim, batch_size):
      super(NN, self).__init__()

      self.batch_size = batch_size

      self.hidden_dim = hidden_dim

      self.word_embeddings = embedding
      #print (self.word_embeddings)
      self.lstm = torch.nn.LSTM(input_dim,hidden_dim,num_layers=2,dropout = 0.5,batch_first = True)

      # The linear layer that maps from hidden state space to output space
      self.fc = torch.nn.Linear(hidden_dim, output_dim)

  def forward(self, sentence):
      
      
      sentence = sentence.to(device)

      embeds = self.word_embeddings(sentence)
      #print ('Embedding layer output shape', embeds.shape)

      # initializing the hidden state to 0
      #hidden=None
      
      init_hidden=torch.zeros(1,1,self.hidden_dim,device=device)
      h0 = torch.zeros(2, sentence.size(0), hidden_dim).requires_grad_().to(device)
      c0 = torch.zeros(2, sentence.size(0), hidden_dim).requires_grad_().to(device)
      #packed = torch.nn.utils.rnn.pack_padded_sequence(embeds,len(sentences))
      #outputs,hidden=self.gru(packed, hidden)
      #outputs,_=torch.nn.util.rnn.pack_padded_sequence(outputs)
      #outputs=outputs[:,:,:self.hidden_dim]+outputs[:,:,self.hidden_dim:]
      #print('h0',h0)
      lstm_out, h = self.lstm(embeds,(h0, c0))
      #print('h',h)
      # get info from last timestep only
      lstm_out = lstm_out[:, -1, :]
      #print ('LSTM layer output shape', lstm_out.shape)
      #print ('LSTM layer output ', lstm_out)

      # Dropout
      lstm_out = F.dropout(lstm_out, 0.5)

      fc_out = self.fc(lstm_out)
      #print ('FC layer output shape', fc_out.shape)
      #print ('FC layer output ', fc_out)
      
      out = fc_out
      out = F.softmax(out, dim=1)
      #print ('Output layer output shape', out.shape)
      #print ('Output layer output ', out)
      return out
  

In [39]:

def train(model, trainloader, criterion, optimizer, epochs=10):
    
    model.to(device)
    running_loss = 0
    
    train_losses, test_losses, accuracies = [], [], []
    for e in range(epochs):

        running_loss = 0
        
        model.train()
        
        for sentences, labels in trainloader:

            sentences, labels = sentences.to(device), labels.to(device)
            #print('sentences',sentences,'Labels',labels)
            optimizer.zero_grad()

            pred = model.forward(sentences)
            #print(pred.shape,labels.shape)
            loss = criterion(pred, labels)

            loss.backward()

            optimizer.step()

            running_loss += loss.item()
        
        
        else:

          model.eval()

          test_loss = 0
          accuracy = 0
          
          # Turn off gradients for validation, saves memory and computations
          with torch.no_grad():
              for sentences, labels in test_loader:
                  sentences, labels = sentences.to(device), labels.to(device)
                  log_ps = model(sentences)
                  test_loss += criterion(log_ps, labels)
                  
                  ps = torch.exp(log_ps)
                  top_p, top_class = ps.topk(1, dim=1)
                  equals = top_class == labels.view(*top_class.shape)
                  accuracy += torch.mean(equals.type(torch.FloatTensor))
                  
          train_losses.append(running_loss/len(train_loader))
          test_losses.append(test_loss/len(test_loader))
          accuracies.append(accuracy / len(test_loader) * 100)

          print("Epoch: ",e+1,"Test Accuracy: {:.3f}".format(accuracy/len(test_loader))*100,'\n')



In [40]:

maxLen = len(max(X_dataset, key=len).split())
maxLen=2000
X_train_indices = dataset_to_index(X_dataset, word_to_index, maxLen)
Y_train_indices=ans_to_index(Y_dataset)
#Y_train_indices=np.asarray(Y_train_indices)
#Y_train_one_hot = np.eye(5)[Y_train_indices.reshape(-1)]
print(len(X_dataset),len(Y_dataset))

maxLen = len(max(X_test, key=len).split())

X_test_indices=dataset_to_index(X_test, word_to_index, maxLen)
Y_test_indices=ans_to_index(Y_test)
hidden_dim=128
output_size=12
batch_size = 16
print(len(X_train_indices),len(Y_train_indices))
#print ('Embedding layer is ', embedding)
#print ('Embedding layer weights ', embedding.weight.shape)

model = NN(embedding, input_dim, hidden_dim, vocab_size, output_size, batch_size)
#criterion = nn.BCEWithLogitsLoss()
criterion=nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.002)
epochs = 10
train_dataset = td.TensorDataset(torch.tensor(X_train_indices).type(torch.LongTensor), torch.tensor(Y_train_indices).type(torch.LongTensor))
train_loader = td.DataLoader(train_dataset, batch_size=batch_size)

test_dataset = torch.utils.data.TensorDataset(torch.tensor(X_test_indices).type(torch.LongTensor), torch.tensor(Y_test_indices).type(torch.LongTensor))
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

train(model, train_loader, criterion, optimizer, epochs)

298 298
298 298
Epoch:  1 Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy: 0.062Test Accuracy:

Epoch:  6 Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accuracy: 0.069Test Accur

In [41]:
test_loss = 0
accuracy = 0
model.eval()
with torch.no_grad():
    for sentences, labels in test_loader:
        sentences, labels = sentences.to(device), labels.to(device)
        ps = model(sentences)
        test_loss += criterion(ps, labels).item()

        # Accuracy
        top_p, top_class = ps.topk(1, dim=1)
        equals = top_class == labels.view(*top_class.shape)
        #print (equals)
        accuracy += torch.mean(equals.type(torch.FloatTensor))
model.train()
print("Test Accuracy:",(accuracy/len(test_loader))*100)
running_loss = 0

Test Accuracy: tensor(6.8750)


In [None]:
import re
emotion_dictionary = {
        1:"Joy",
        2:"Anger",
        3:"Disgust",
        4:"Anticipate",
        5:"Fear",
        6:"Optimism",
        7:"Pessimism",
        8:"Sadness",
        9:"Surprise",
        10:"Trust",
        11:"Neutral",
        0:"Love",   
}
def test_sentence(test_file):
    with open(test_file, 'r') as f:
        file = json.load(f)
    train_data=pd.read_json(test_file)
    length=len(file)
    test_dataset=[]
    
    #print(train_data)
    for i in range(int(0.2*length)):
        text = re.sub(r'^https?:\/\/.*[\r\n]*', '', train_data.iloc[0][i], flags=re.MULTILINE)
        test_dataset.append(text)
    #maxLen = len(max(test_dataset, key=len).split())
    maxLen=1600
    X_test_indices=dataset_to_index(test_dataset,word_to_index,maxLen)
    for sentences in X_test_indices:
        sentences = torch.tensor(X_test_indices).type(torch.LongTensor)
        print(sentences)
        ps = model(sentences)
        top_p, top_class = ps.topk(1, dim=1)
        label = int(top_class[0][0])
        print(label,emotion_dictionary[label])
    #print("\nInput Text: \t"+ input_text +'\nEmotion: \t'+  emotion_dict[label])
    print(label)
    return label
label=test_sentence('train.json')
#print(label,emotion_dictionary[label])

In [None]:
def test_input(input_text):
  
  # Convert the input to the model
  x_test = np.array([input_text])
  X_test_indices = sentences_to_indices(x_test, word_to_index, maxLen)
  sentences = torch.tensor(X_test_indices).type(torch.LongTensor)

  # Get the class label
  ps = model(sentences)
  top_p, top_class = ps.topk(1, dim=1)
  label = int(top_class[0][0])

  print("\nInput Text: \t"+ input_text +'\nEmotion: \t'+  emotion_dict[label])

test_input('')