**Sarcasm Target Detection Codebase**

In [1]:
# Upload 'snippets.xlsx' from the local file system.
from google.colab import files
uploaded = files.upload()
for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving snippetshindi.xlsx to snippetshindi.xlsx
User uploaded file "snippetshindi.xlsx" with length 41575 bytes


In [2]:
# Mount google drive to use 'crawl-300d-2M.vec'. 
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [4]:
# Import basic maths and processing libraries.
import pandas as pd
import numpy as np

from keras.layers import Dense ,LSTM,concatenate,Input,Flatten
from keras.models import Model
from keras.callbacks import EarlyStopping
from keras.optimizers import Adam

from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder

import pickle
import codecs
from collections import deque

In [5]:
# Excel file contains no header column and names gives custom column names
df = pd.read_excel("snippetshindi.xlsx", sheet_name=None, header=None, names=['Snippet','target'])
df = df['Sheet1']
embedding_file = '/gdrive/MyDrive/gloveHindi.txt'

In [6]:
# Removes all non-alpha numeric characters except ',' from a given strring
def clearLine(line):
  cleanedLine = ''
  for letter in line:
    if letter == '?' or letter == '!' or letter == ',' or letter == '"' or letter == '-' or letter == ';' or letter == '.' :
      cleanedLine += ' '
    else:
      cleanedLine += letter
  return cleanedLine

In [7]:
# Load pre-trained fasttext word embedding from the file
def loadEmbed():
    print('loading word embeddings...')
    embeddings_index = {}
    f = codecs.open(embedding_file, encoding='utf-8')
    for line in f:
        # Line has the format : Word val1 val2 val3 ..... val300
        values = line.rstrip().rsplit(' ')
        word = values[0]
        coefs = np.asarray(values[-100:], dtype='float32')
        embeddings_index[word] = coefs
    f.close()
    print('found %s word vectors' % len(embeddings_index))
    return embeddings_index

In [8]:
# Loading pre-trained embeddings
model=loadEmbed()

loading word embeddings...
found 336023 word vectors


In [9]:
# Stores all distinct words in the dataset.
all_words=[]

# Extract all poosible words in the dataset
for i in range(len(df['Snippet'])):
    line = clearLine(df['Snippet'][i])
    all_words.extend(line.split())

# get all unique words from the dataset
all_words=list(dict.fromkeys(all_words))
all_words=[x.lower() for x in all_words]

In [10]:
# Stores the vector representations of the words from the dataset.
embeddings={}
for each in all_words:
    # Get vectors for words in the dataset
    if each not in model.keys(): 
        embeddings[each]=[1]*100
    else:
        embeddings[each]=model[each]

# make <pad> as the 0-vector
embeddings['<pad>'] = [0]*100
# make <start> token to start vector
embeddings['<start>'] = [1] + [0]*99
# make <end> token to end vector 
embeddings['<end>'] = [0]*99 + [1]

In [11]:
# Prepare the left context, right context, and candidate word from the dataset.
# The pre-processed data has the data type string
# Pre-Processed Data (ppd)

ppd = {}
ppd['left_context'] = []
ppd['right_context'] = []
ppd['Candidate_word'] = []
ppd['target_status'] = []

for i in range(len(df['Snippet'])):
  # clean the line and split the line into individual words
  line = clearLine(df['Snippet'][i].lower());
  line = line.split()
  # get each individual word in sarcasm target to identify possible sarcasm
  # target words in the sentence
  targetList = df['target'][i].lower().split(",")
  targetWord = []
  for tl in targetList:
    targetWord.extend(tl.split())
  
  # Choosing each word in the sentence as candidate word, extract the left context,
  # right context and the target label.
  for i in range(len(line)):
    word = line[i]
    ppd['Candidate_word'].append(word)
    ppd['left_context'].append(["<start>"] + line[:i])
    ppd['right_context'].append(line[i+1:] + ["<end>"])
    ppd['target_status'].append(int(word in targetWord))

In [47]:
# Embedding Left Context
# Convert each word in left context into vector representation to be used as input 
keras_left_context = []
for i in range(len(ppd['left_context'])):
    one_vector = []
    temp = ppd['left_context'][i]
    for m in temp:
        one_vector.append(embeddings[m])
    one_vector.extend([embeddings['<pad>'] for x in range(100 - len(ppd['left_context'][i]))])
    keras_left_context.append(one_vector)

In [48]:
# Embedding Right Context 
# Convert each word in right context into vector representation to be used as input 
keras_right_context = []
for i in range(len(ppd['right_context'])):
    one_vector = []
    temp = ppd['right_context'][i]
    for m in temp:
        one_vector.append(embeddings[m])
    one_vector.extend([embeddings['<pad>'] for x in range(100 - len(ppd['right_context'][i]))])
    keras_right_context.append(one_vector)

In [49]:
# Embedding Candidate Word
# Convert each candidate word into vector representation to be used as input 
keras_middle = []
for i in range(len(ppd['Candidate_word'])):
    keras_middle.append(embeddings[ppd['Candidate_word'][i]])

labels = ppd['target_status']

Model


In [50]:
# Function to group together candidate-words belonging to the same line.
def compress():
    lengths = []
    for i in range (1,len(ppd['left_context'])):
        # If left context contains only '<start>'
        if len(ppd['left_context'][i]) == 1:
            lengths.append(i)
    lengths.append(len(ppd['left_context']))
    compressor = []
    compressor.append(range(lengths[0]))
    for i in range (1,len(lengths)):
        compressor.append(range(lengths[i-1],lengths[i]))
    return compressor

In [51]:
comp = compress()

In [52]:
# Training Data, Test Data Preparation

def prep (train_indices, test_indices):
    print (len(train_indices), len(test_indices))

    # Ungroup training dataset candidate-words belonging to the same line.
    train_ids = []
    for i in range(len(train_indices)):
        train_ids.extend(comp[train_indices[i]])

    # Ungroup testing dataset candidate-words belonging to the same line.
    test_ids = []
    for i in range(len(test_indices)):
        test_ids.extend(comp[test_indices[i]])

    # Training Data Preparation
    train_left   = []
    train_right  = []
    train_middle = []
    train_labels = []

    for id in train_ids:
        train_left.append(keras_left_context[id])
        train_right.append(keras_right_context[id])
        train_middle.append(keras_middle[id])
        train_labels.append(labels[id])

    train_left   = np.array(train_left)
    train_right  = np.array(train_right)
    train_middle = np.array(train_middle)
    train_labels = np.array(train_labels)
    train_middle = np.expand_dims(train_middle,axis=1)

    # Testing Data Preparation
    val_left   = []
    val_right  = []
    val_middle = []
    val_labels = []

    for id in test_ids:
        val_left.append(keras_left_context[id])
        val_right.append(keras_right_context[id])
        val_middle.append(keras_middle[id])
        val_labels.append(labels[id])

    val_left   = np.array(val_left)
    val_right  = np.array(val_right)
    val_middle = np.array(val_middle)
    val_labels = np.array(val_labels)
    val_middle = np.expand_dims(val_middle, axis=1)

    # Below part only for TD lstm
    if mode == 'TD':	    
      train_left = np.concatenate((train_left, train_middle), axis=1)
      train_right = np.concatenate((train_middle, train_right), axis=1)
      val_left = np.concatenate((val_left, val_middle), axis=1)
      val_right = np.concatenate((val_middle, val_right), axis=1)

    return(train_left,train_right,train_middle,train_labels,val_left,val_right,val_middle,val_labels)

In [53]:
def de_comp(arr, test_indices):
    arr = deque(arr)
    fin = []
    for i in test_indices:
        temp = []
        for j in range(len(comp[i])):
            temp.append(arr.popleft())
        fin.append(temp)
    return (fin)    

In [54]:
def accuracy (pred, labels, test_indices):
    pred = pred[0]
    num_sent = len(comp)
    num_words = len(pred)
    threshold = 0
    cnt = 0
    # Threshold calculation for binary classification problem
    for a,b in zip (pred,labels):
        if b==1.0:
            threshold+=a
            cnt+=1
    threshold = threshold.item()/cnt

    pred_th = []
    for x in pred:
        if (x<=threshold):
            pred_th.append(0)
        else :
            pred_th.append(1)

    pred_th = np.array(pred_th)
    print ("Number of Test sentences : {}".format(len(test_indices)))
    error = pred_th-labels
    error_d  = de_comp(error,test_indices)
    labels_d = de_comp(labels,test_indices)
    pred_d   = de_comp(pred_th,test_indices)
    em_cnt = 0
    ds_cnt = 0
    mic_f1 = 0

    for err in error_d:
        if (sum(err)==0):
           em_cnt += 1
        ds_cnt += float(len(err)-sum(np.abs(err)))/len(err)

    for lab, pre in zip(labels_d,pred_d):

        tp = 0
        fp = 0
        fn = 0
        tn = 0
              
        for i,j in zip(lab,pre):
            if (int(i) == 1)  and (int(j) ==0) :
                fn += 1
            elif (int(i) == 0)  and (int(j) ==1) :
                fp += 1
            elif (int(i) == 0)  and (int(j) ==0) :
                tn += 1
            elif (int(i) == 1) and (int(j) ==1):
                tp += 1
        try : 
            mic_f1 += float(2*tp) / (2*tp + fn +fp)
        except :
            pass

    TP=0
    TN=0
    FP=0
    FN=0
    for a,b in zip(labels, pred_th):

        if int(a)==0 and b==0:
            TN+=1
        if int(a)==1 and b==1:
            TP+=1
        if int(a)==0 and b==1:
            FP+=1
        if int(a)==1 and b==0:
            FN+=1 
    print ("TP = {}, TN = {},FP = {}, FN = {}".format(TP,TN,FP,FN))
    F1 = float(2*TP)/(2*TP + FP+ FN)
    EM = float(em_cnt)/len(test_indices)
    DS = float(ds_cnt)/len(test_indices)
    uF1= float(mic_f1)/len(test_indices)
    print ("EM Accuracy : {}".format(EM))
    print ("DS Accuracy : {}".format(DS))
    print ("Micro F1    : {}".format(uF1))
    print ("Macro F1 Score = {}".format(F1))
    return (pred_d, labels_d)

In [55]:
# Tuned-Hyper Parameters
hidden_size = 32
num_epochs=30
layer_size = 16
batch_size = 64

# 'Uni' : Unidirectional LSTM  |  'Bi' : Bidirectional LSTM  | 'TD' : Target-dependent LSTM
mode = 'Uni' 

In [56]:
# Define the model and run the model for given epochs
def model(train_l,train_r,train_m,train_labels,test_l,test_r,test_m,test_labels,test_indices):
    # Create placeholder for each input type to define the model
    x=Input(shape=(100, 100))
    y=Input(shape=(100, 100))
    z=Input(shape=(1, 100))

    if mode == 'Bi':
      # Define birectional layer on LSTM and use concat as the merge mode.
    	left_out=Bidirectional(LSTM(hidden_size//2,return_sequences=False),input_shape=(train_l.shape[1:]))(x)      
    	middle = Bidirectional(LSTM(hidden_size//2,return_sequences=False),input_shape=(train_m.shape[1:]))(z)
    	right_out=Bidirectional(LSTM(hidden_size//2,return_sequences=False),input_shape=(train_r.shape[1:]))(y)

    else:
      # Define LSTM units
    	left_out  = LSTM(hidden_size,return_sequences=False)(x)
    	middle    = LSTM(hidden_size,return_sequences=False)(z)
    	right_out = LSTM(hidden_size,return_sequences=False)(y)

    if mode == 'TD':
      out=concatenate([left_out,right_out],axis=-1)

    if mode != 'TD':
      out=concatenate([left_out,middle,right_out],axis=-1)

    out=Dense(layer_size, activation='relu')(out)
    output=Dense(1, activation='sigmoid')(out)
    model = Model(inputs=[x,y,z], outputs=output)
    model.compile(optimizer=Adam(lr=10e-5),loss='binary_crossentropy',metrics=['accuracy'])
    print ("Starting Epochs")
    for i in range(num_epochs):
        model.fit([train_l,train_r,train_m],train_labels,batch_size=batch_size, epochs=1,verbose=0)
        print('***************************************************************')
        print ("predicting_ Epoch : {}".format(i))
        pred_val=[]
        pred_val.append(model.predict([test_l,test_r,test_m]))
        pre_d, lab_d = accuracy (pred_val, test_labels,test_indices)
    return model

In [57]:
# Dataset division for 3-fold cross validation
indices = list(range(len(comp)))
np.random.shuffle(indices)
bins = []
bins.append(indices[:int(0.33*len(indices))])
bins.append(indices[int(0.33*len(indices)):int(0.66*len(indices))])
bins.append(indices[int(0.66*len(indices)):])

In [58]:
for i in range (3):
    print ("Fold {}".format(i+1))
    print (len(bins[0] + bins[1]),len(bins[2]))
    train_left,train_right,train_middle,train_labels,val_left,val_right,val_middle,val_labels = prep (bins[i%3] + bins[(i+1)%3], bins[(i+2)%3])
    Sar_model=model(train_left,train_right,train_middle,train_labels,val_left,val_right,val_middle,val_labels,bins[(i+2)%3])
    Sar_model.save_weights("Bert Tweets Aug.h5")
    print("Saved model to disk")

Fold 1
147 77
147 77
Starting Epochs
***************************************************************
predicting_ Epoch : 0
Number of Test sentences : 77
TP = 95, TN = 1170,FP = 1094, FN = 65
EM Accuracy : 0.06493506493506493
DS Accuracy : 0.5451067649481633
Micro F1    : 0.15379407817784077
Macro F1 Score = 0.14084507042253522
***************************************************************
predicting_ Epoch : 1
Number of Test sentences : 77
TP = 92, TN = 1244,FP = 1020, FN = 68
EM Accuracy : 0.025974025974025976
DS Accuracy : 0.5409092650766281
Micro F1    : 0.14820563049824637
Macro F1 Score = 0.14465408805031446
***************************************************************
predicting_ Epoch : 2
Number of Test sentences : 77
TP = 98, TN = 1203,FP = 1061, FN = 62
EM Accuracy : 0.06493506493506493
DS Accuracy : 0.48766412319655633
Micro F1    : 0.14128377230758607
Macro F1 Score = 0.14859742228961334
***************************************************************
predicting_ Epoch : 3