In [1]:
train = open("train.txt").readlines()
test = open("test.txt").readlines()

In [2]:
import numpy as np
train = np.array(list(map(lambda x: x.rstrip('\n'),train)))
test = np.array(list(map(lambda x: x.rstrip('\n'),test)))

In [3]:
xTrain = np.array(list(map(lambda x: x.split(';')[0], train)))
yTrain = np.array(list(map(lambda x: x.split(';')[1], train)))

xTest = np.array(list(map(lambda x: x.split(';')[0], test)))
yTest = np.array(list(map(lambda x: x.split(';')[1], test)))
yTest

array(['sadness', 'sadness', 'sadness', ..., 'joy', 'joy', 'fear'],
      dtype='<U8')

In [4]:
labelToNumber = {
    'anger':0,
    'fear':1,
    'sadness':2,
    'joy':3,
    'love':4,
    'surprise':5
}
yTrainNumber = np.array(list(map(lambda x: labelToNumber[x],yTrain)))
yTestNumber = np.array(list(map(lambda x: labelToNumber[x],yTest)))

In [5]:
# Word2Vec
import gensim.downloader as api
wv = api.load('glove-twitter-200')

In [6]:
xTrainSplit = np.array(list(map(lambda x: x.split(' '),xTrain)))
xTestSplit = np.array(list(map(lambda x: x.split(' '),xTest)))
xTrainSplit[0]

  xTrainSplit = np.array(list(map(lambda x: x.split(' '),xTrain)))
  xTestSplit = np.array(list(map(lambda x: x.split(' '),xTest)))


['i', 'didnt', 'feel', 'humiliated']

In [7]:
# Embedd
def embeddWord(word):
    if (word in wv):
        return wv[word][0:200]
    else:
        return [0]*200
def embeddSentence(sentence):
    return np.array(list(map(lambda x: embeddWord(x),sentence)))
embeddXTrain = np.array(list(map(lambda x: embeddSentence(x),xTrainSplit)))
embeddXTest = np.array(list(map(lambda x: embeddSentence(x),xTestSplit)))

  embeddXTrain = np.array(list(map(lambda x: embeddSentence(x),xTrainSplit)))
  embeddXTest = np.array(list(map(lambda x: embeddSentence(x),xTestSplit)))


In [8]:
size = 100
def pad(arr):
    if (len(arr)>size): return arr[0:size]
    elif len(arr)<size: return np.append(arr, np.array((size-len(arr))*[[0]*200]),axis=0)
    return arr

In [9]:
xTrainPad = np.array(list(map(lambda x: pad(x),embeddXTrain)))
xTestPad = np.array(list(map(lambda x: pad(x),embeddXTest)))

In [10]:
xTrainPad.shape

(16000, 100, 200)

In [11]:
from keras.layers import Layer
import keras.backend as K
class attention(Layer):
    
    """This class implents the attention mechanism layer: We will define a class named Attention as a derived class of the Layer class. We need to define four functions as per the Keras custom layer generation rule. 
    These are build(),call (), compute_output_shape() and get_config()."""
    def __init__(self,**kwargs):
        super(attention,self).__init__(**kwargs)
        
        
    """Inside build (), we will define our weights and biases, i.e., Wa and B . 
    If the previous LSTM layer’s output shape is (None, 32, 100)
    then our output weight should be (100, 1) and bias should be (100, 1) dimensional."""
    def build(self,input_shape):
        self.w=self.add_weight(name="att_weight",shape=(input_shape[-1],1),initializer="normal")
        self.b=self.add_weight(name="att_bias",shape=(input_shape[1],1),initializer="zeros")
        super(attention,self).build(input_shape)
        
        
    """Inside call (), we will write the main logic of Attention. We simply must create a Multi-Layer Perceptron (MLP). 
    Therefore, we will take the dot product of weights and inputs followed by the addition of bias terms. 
    After that, we apply a ‘tanh’ followed by a softmax layer. This softmax gives the alignment scores. 
    Its dimension will be the number of hidden states in the LSTM, i.e., 32 in this case. 
    Taking its dot product along with the hidden states will provide the context vector:"""    
    def call(self,x):
        et=K.squeeze(K.tanh(K.dot(x,self.w)+self.b), axis=-1)
        at=K.softmax(et)
        at=K.expand_dims(at,axis=-1)
        output=x*at
        return K.sum(output,axis=1)
    
    def compute_output_shape(self,input_shape):
        return (input_shape[0],input_shape[-1])
    "The get_config() method collects the input shape and other information about the model."""
    def get_config(self):
        return super(attention,self).get_config() 

In [16]:
import tensorflow as tf
from tensorflow.keras import layers
model = tf.keras.Sequential()
model.add(tf.keras.Input(shape=(100,200),dtype=tf.float32))

model.add(layers.Bidirectional(layers.LSTM(128,return_sequences=True)))
model.add(layers.Conv1D(64,3))
model.add(layers.MaxPooling1D())

model.add(layers.Bidirectional(layers.LSTM(128,return_sequences=True)))

model.add(layers.Dense(256))
model.add(layers.Dense(256))
model.add(attention())

model.add(layers.Dense(6,activation="softmax"))
model.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])

In [15]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bidirectional_2 (Bidirecti  (None, 100, 256)          336896    
 onal)                                                           
                                                                 
 conv1d_1 (Conv1D)           (None, 98, 64)            49216     
                                                                 
 max_pooling1d (MaxPooling1  (None, 49, 64)            0         
 D)                                                              
                                                                 
 bidirectional_3 (Bidirecti  (None, 49, 256)           197632    
 onal)                                                           
                                                                 
 dense_3 (Dense)             (None, 49, 256)           65792     
                                                      

In [17]:
# Try weighting the word embeddings that relate to sentiment analysis
weightedWords = open('SentimentWords','r').readlines()
weightedWords = np.array(list(map(lambda x: x.rstrip('\n'),weightedWords)))

In [18]:
count = 0
total = 0
for i in range(len(xTrainSplit)):
    for k in range(len(xTrainSplit[i])):
        total+=1
        if (xTrainSplit[i][k] in weightedWords):
            count+=1
            xTrainPad[i][k]*=2
count/total

0.1009290389061537

In [19]:
count = 0
total = 0
for i in range(len(xTestSplit)):
    for k in range(len(xTestSplit[i])):
        total+=1
        if (xTestSplit[i][k] in weightedWords):
            count+=1
            xTestPad[i][k]*=2
count/total

0.10055340920956458

In [20]:
model.fit(np.array(list(xTrainPad)),np.array(list(yTrainNumber)),
          validation_data=(np.array(list(xTestPad)),np.array(list(yTestNumber))),epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x19ccb861d50>