# BERT Sarcasm Analysis

https://medium.com/@nguyenduchuyvn/sarcasm-detection-with-machine-learning-92538da893ec

In [1]:
from transformers import BertTokenizer, BertModel
import torch
from torch import nn
from torch.optim import Adam
import torchinfo

from tqdm import tqdm
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, roc_auc_score
from scipy.special import softmax

import sys
from IPython.core import ultratb
sys.excepthook = ultratb.FormattedTB(color_scheme='Linux', call_pdb=False)

tokenizer = BertTokenizer.from_pretrained('prajjwal1/bert-tiny')
device = torch.device("cpu")

  torch.utils._pytree._register_pytree_node(


In [2]:
class Dataset(torch.utils.data.Dataset):

    def __init__(self, X, Y):

        self.labels = np.array(Y)
        self.texts = [tokenizer(text, 
                                padding='max_length', max_length = 512,
                                # truncation=True,
                                return_tensors="pt") for text in X]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):

        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)

        return batch_texts, batch_y

In [3]:
class BertClassifier(nn.Module):

    def __init__(self, dropout=0.5):

        super(BertClassifier, self).__init__()

        self.bert = BertModel.from_pretrained('prajjwal1/bert-tiny')
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(128, 2)
        self.relu = nn.ReLU()

    def forward(self, input_id, mask):

        _, pooled_output = self.bert(input_ids= input_id, attention_mask=mask,return_dict=False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        final_layer = self.relu(linear_output)

        return final_layer

In [4]:
def train(model, X,Y, learning_rate, epochs, batch_size):

    train = Dataset(X,Y)
    train_dataloader = torch.utils.data.DataLoader(train, batch_size, shuffle=True)

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr= learning_rate)

    for epoch_num in range(epochs):

        total_acc_train = 0
        total_loss_train = 0

    for train_input, train_label in tqdm(train_dataloader):

        train_label = train_label.to(device)
        mask = train_input['attention_mask'].to(device)
        input_id = train_input['input_ids'].squeeze(1).to(device)

        output = model(input_id, mask)
      
        batch_loss = criterion(output, train_label)
        total_loss_train += batch_loss.item()
      
        acc = (output.argmax(dim=1) == train_label).sum().item()
        total_acc_train += acc

        model.zero_grad()
        batch_loss.backward()
        optimizer.step()
    
    
    print(f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(X): .3f} | Train Accuracy: {total_acc_train / len(X): .3f}')


def evaluate(model, X,Y, batch_size):

    test = Dataset(X, Y)
    test_dataloader = torch.utils.data.DataLoader(test, batch_size,shuffle=False)

    device = torch.device("cpu")
    y_pred = []
    
    with torch.no_grad():

        for test_input, test_label in tqdm(test_dataloader):

            test_label = test_label.to(device)
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)
            y_pred.append(output.argmax(dim=1))

    return y_pred

In [5]:
df = pd.read_json("./Sarcasm_Headlines_Dataset_v2.json", lines=True)

X = df['headline']
Y = df['is_sarcastic']

data_split = int(df.shape[0] * 0.75)
X_train, X_test = X[:data_split], X[data_split:]
y_train, y_test = Y[:data_split], Y[data_split:]

In [6]:
print(df.shape[0], 'headlines')
print(df.loc[df['is_sarcastic'] == 0].shape[0], 'serious headlines.')
print(df.loc[df['is_sarcastic'] == 1].shape[0], 'sarcastic headlines.')
df

28619 headlines
14985 serious headlines.
13634 sarcastic headlines.


Unnamed: 0,is_sarcastic,headline,article_link
0,1,thirtysomething scientists unveil doomsday clo...,https://www.theonion.com/thirtysomething-scien...
1,0,dem rep. totally nails why congress is falling...,https://www.huffingtonpost.com/entry/donna-edw...
2,0,eat your veggies: 9 deliciously different recipes,https://www.huffingtonpost.com/entry/eat-your-...
3,1,inclement weather prevents liar from getting t...,https://local.theonion.com/inclement-weather-p...
4,1,mother comes pretty close to using word 'strea...,https://www.theonion.com/mother-comes-pretty-c...
...,...,...,...
28614,1,jews to celebrate rosh hashasha or something,https://www.theonion.com/jews-to-celebrate-ros...
28615,1,internal affairs investigator disappointed con...,https://local.theonion.com/internal-affairs-in...
28616,0,the most beautiful acceptance speech this week...,https://www.huffingtonpost.com/entry/andrew-ah...
28617,1,mars probe destroyed by orbiting spielberg-gat...,https://www.theonion.com/mars-probe-destroyed-...


In [7]:
EPOCHS = 5
batch_size = 32
model = BertClassifier()
LR = 1e-4

torchinfo.summary(model, dtypes=['torch.IntTensor'])

Layer (type:depth-idx)                                  Param #
BertClassifier                                          --
├─BertModel: 1-1                                        --
│    └─BertEmbeddings: 2-1                              --
│    │    └─Embedding: 3-1                              3,906,816
│    │    └─Embedding: 3-2                              65,536
│    │    └─Embedding: 3-3                              256
│    │    └─LayerNorm: 3-4                              256
│    │    └─Dropout: 3-5                                --
│    └─BertEncoder: 2-2                                 --
│    │    └─ModuleList: 3-6                             396,544
│    └─BertPooler: 2-3                                  --
│    │    └─Linear: 3-7                                 16,512
│    │    └─Tanh: 3-8                                   --
├─Dropout: 1-2                                          --
├─Linear: 1-3                                           258
├─ReLU: 1-4                 

In [8]:
train(model, X_train, y_train, LR, EPOCHS, batch_size)

100%|████████████████████████████████████████████████████████████████████████████████| 671/671 [10:31<00:00,  1.06it/s]


Epochs: 5 | Train Loss:  0.013 | Train Accuracy:  0.803


In [9]:
y_pred = evaluate(model, X_test,y_test, batch_size)

y_pred_ = torch.cat(y_pred, dim=0)

y_pred_ = y_pred_.cpu().detach().numpy()

100%|████████████████████████████████████████████████████████████████████████████████| 224/224 [02:20<00:00,  1.59it/s]


In [10]:
print(classification_report(y_test.values, y_pred_))
print(roc_auc_score(y_test, y_pred_))

              precision    recall  f1-score   support

           0       0.83      0.93      0.88      3745
           1       0.91      0.79      0.85      3410

    accuracy                           0.86      7155
   macro avg       0.87      0.86      0.86      7155
weighted avg       0.87      0.86      0.86      7155

0.8596055738051518


In [11]:
# torch.save(model.state_dict(), 'sarcasm.model")

## Load and Predict

In [12]:
model = BertClassifier().to(device)
model.load_state_dict(torch.load("sarcasm.model"))

torchinfo.summary(model, dtypes=['torch.IntTensor'])

Layer (type:depth-idx)                                  Param #
BertClassifier                                          --
├─BertModel: 1-1                                        --
│    └─BertEmbeddings: 2-1                              --
│    │    └─Embedding: 3-1                              3,906,816
│    │    └─Embedding: 3-2                              65,536
│    │    └─Embedding: 3-3                              256
│    │    └─LayerNorm: 3-4                              256
│    │    └─Dropout: 3-5                                --
│    └─BertEncoder: 2-2                                 --
│    │    └─ModuleList: 3-6                             396,544
│    └─BertPooler: 2-3                                  --
│    │    └─Linear: 3-7                                 16,512
│    │    └─Tanh: 3-8                                   --
├─Dropout: 1-2                                          --
├─Linear: 1-3                                           258
├─ReLU: 1-4                 

In [13]:
def single_batch_predict(x):
    with torch.no_grad():
        texts = tokenizer(x, padding='max_length', max_length = 512, return_tensors="pt")
        
        mask = texts['attention_mask'].to(device)
        input_id = texts['input_ids'].squeeze(1).to(device) # squueze remove all dimensions size 1
        print('mask')
        print('\tmask.shape =', mask.shape)
        print('\tmask[0][:20] =', mask[0][:20])
        print('input_id')
        print('\tinput_id.shape =', input_id.shape)
        print('\tinput_id[0][:20] =', input_id[0][:20])
        
        output = model(input_id, mask)
        print('output =', output)
        
        return output
    
labels = ['NO', 'YES']
prediction = single_batch_predict("student need no more than 3 hours of sleep, UIUC researchers suggest")
prediction = softmax(prediction[0])
print('label =', labels[prediction.argmax()])
print('score =', prediction[prediction.argmax()])

mask
	mask.shape = torch.Size([1, 512])
	mask[0][:20] = tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
input_id
	input_id.shape = torch.Size([1, 512])
	input_id[0][:20] = tensor([  101,  3076,  2342,  2053,  2062,  2084,  1017,  2847,  1997,  3637,
         1010, 21318, 14194,  6950,  6592,   102,     0,     0,     0,     0])
output = tensor([[0.2917, 2.5287]])
label = YES
score = 0.90352714


In [14]:
def single_batch_train(x, y):
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=1e-4)
    
    texts = tokenizer(x, padding='max_length', max_length = 512, return_tensors="pt")
    mask = texts['attention_mask'].to(device)
    input_id = texts['input_ids'].squeeze(1).to(device) # squueze remove all dimensions size 1
    
    output = model(input_id, mask)

    batch_loss = criterion(output, torch.tensor(y))
    
    model.zero_grad()
    batch_loss.backward() # calculate gradients and store then in the model's tensors
    optimizer.step() # optimize model parameters using the stored gradients

# single_batch_train(['employee returns from vacation refreshed, ready to waste time'], [0])