# Sentiment Analysis

### Imports

In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
import sklearn
import re
import pandas as pd
import string
import torch
from torch.utils.data import DataLoader
from transformers import (
    BertPreTrainedModel,
    AutoModel,
    AutoTokenizer,
    AutoConfig
)
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import numpy as np
from torch.optim import AdamW
from tqdm.notebook import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


### Parameters

In [3]:
vad_reg = {
    'model' : 'bert-base-cased',  #'roberta-base', 'bert-base-cased'
    'batch_size' : 16,
    'max_seq_len' : 256,
    'n_epochs' : 10,
    'learning_rate': 3e-05,
    'load_checkpoint': False,
}

sentiment = {
    #'model' : 'bert-base-cased',
    'batch_size' : 16,
    'max_seq_len': 512,
}

#root_path = '/content/drive/MyDrive/Colab Notebooks/'
root_path = './'
save_path = root_path + 'checkpoint/'
emobank_path = root_path + 'emobank/emobank_split.csv'
imdb_path = root_path + 'imdb/IMDB Dataset.csv'

### Dataset

In [4]:
class EmobankDataset():
    def __init__(self, df):
        self.df = df    #(split), (V), (A), (D), (text)

    def __len__(self):
        return self.df.shape[0]
    
    def _preprocessing_text(self, text):
        t = text.strip('\"').strip('\'').strip()
        t = re.sub(r'([{}])'.format(string.punctuation), r' \1 ', t)
        t = re.sub('\s{2,}', ' ', t)  # pad punctuations for bpe
        t = t.strip()
        return t

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = row['text']
        score = row[['V','A','D']]
        return self._preprocessing_text(text), torch.FloatTensor(score)

In [5]:
eb = pd.read_csv(emobank_path)

train_eb = eb.loc[eb.split == "train", :]
train_eb = train_eb.drop("split", axis= 1)

dev_eb = eb.loc[eb.split == "dev", :]
dev_eb = dev_eb.drop("split", axis= 1)

test_eb = eb.loc[eb.split == "test", :]
test_eb = test_eb.drop("split", axis= 1)

print(len(train_eb), len(dev_eb), len(test_eb))

eb_train = EmobankDataset(train_eb)
eb_train_loader = DataLoader(eb_train, batch_size=vad_reg['batch_size'], shuffle=True)
eb_val = EmobankDataset(dev_eb)
eb_val_loader = DataLoader(eb_val, batch_size=vad_reg['batch_size'], shuffle=True)
eb_test = EmobankDataset(test_eb)
eb_test_loader = DataLoader(eb_test, batch_size=vad_reg['batch_size'], shuffle=True)

8062 1000 1000


In [6]:
class ImdbDataset():
    def __init__(self, df):
        self.df = df    # review, sentiment

    def __len__(self):
        return self.df.shape[0]
    
    def _preprocessing_text(self, text):
        t = text.strip('\"').strip('\'').strip()
        t = re.sub(r'([{}])'.format(string.punctuation), r' \1 ', t)
        t = re.sub('\s{2,}', ' ', t)  # pad punctuations for bpe
        t = t.strip()

        html_tag_reg = re.compile('<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});')
        t = re.sub(html_tag_reg, '', t)
        
        return t

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = row['review']
        score = row['sentiment']
        return self._preprocessing_text(text), torch.tensor(score)

In [7]:
imdb = pd.read_csv(imdb_path)

str_to_bool = {"positive": 1, "negative": 0}
imdb["sentiment"] = imdb["sentiment"].map(str_to_bool)

train_imdb_df, test_imdb_df = train_test_split(imdb.index,
                                               stratify=imdb.sentiment,
                                               random_state=42,
                                               test_size=0.1
                                               )

print(len(train_imdb_df), len(test_imdb_df))

imdb_train = ImdbDataset(imdb.loc[train_imdb_df])
imdb_train_loader = DataLoader(imdb_train, batch_size=sentiment['batch_size'], shuffle=True)

imdb_test = ImdbDataset(imdb.loc[test_imdb_df])
imdb_test_loader = DataLoader(imdb_test, batch_size=sentiment['batch_size'], shuffle=True)

45000 5000


# Model - Emobank/VAD-regression

### Model

In [8]:
class PretrainedLMModel(BertPreTrainedModel):
    
    def __init__(self, config):
        super(PretrainedLMModel, self).__init__(config)
        self.config = config
        self.pre_trained_lm = AutoModel.from_pretrained(vad_reg['model'])       

        # dropout
        self.dropout = nn.Dropout(self.config.hidden_dropout_prob)
        self.projection_lm = nn.Linear(self.config.hidden_size, self.config.vocab_size, bias=False)
        self.activation = nn.Sigmoid()

        # classification/regression head
        self.label_num = 1

        self.head =nn.Linear(
            self.config.hidden_size,
            self.label_num * 3
        )


    def forward(
            self,
            input_ids=None,
            attention_mask=None,
            n_epoch=None,
            token_type_ids=None,
            position_ids=None,
            head_mask=None,
            inputs_embeds=None,
        ):

        lm_outputs = self.pre_trained_lm(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            return_dict=False,
        )

        # hidden states: (batch_size, seq_len, embed_dim)
        # pooled_output: (batch_size, embed_dim)
        # loading pretrained weights
        hidden_states, pooled_output = lm_outputs

        lm_logits = self.projection_lm(hidden_states)

        # add head over [CLS] token
        # ramdomly initialized layers
        pooled_output = self.dropout(pooled_output)
        logits = self.head(pooled_output)

        #logits = self.activation(logits)

        preds = F.relu(logits)

        return lm_logits, logits, preds

In [9]:
#Eval metrics : MSE-Loss, Pearson Correlation (pearsonr)

from scipy.stats import pearsonr

def compute_vad_eval_metric(predictions, labels):
    assert predictions.size() == labels.size()
    predictions = predictions.cpu().detach().numpy()
    labels = labels.cpu().detach().numpy()
    metrics = {}
    for x, y, name in zip(predictions.T, labels.T, ["v_cor", 'a_cor', 'd_cor']):
        metrics[name] = pearsonr(x, y)
    return metrics

def predict(model, tokenizer, criterion, dataloader, return_loss=True, max_seq_len=vad_reg['max_seq_len']):
    total_preds = []
    total_labels = []
    total_losses = []
    for idx, (text, labels) in enumerate(dataloader) :
        encoded = tokenizer.batch_encode_plus(
                            text,                      # Sentence to encode.
                            add_special_tokens = True, # Add '[CLS]' and '[SEP]'
                            max_length = max_seq_len,           # Pad & truncate all sentences.
                            padding = 'max_length',
                            return_attention_mask = True,   # Construct attn. masks.
                            return_tensors = 'pt',     # Return pytorch tensors.
                            truncation=True
                    )
        _, logits, preds = net(input_ids=encoded['input_ids'].to(device), attention_mask=encoded['attention_mask'].to(device))

        if return_loss :
            loss = criterion(logits, labels.to(device))
            total_losses.append(loss.cpu().detach())
        
        total_preds.append(preds.cpu().detach())
        total_labels.append(labels.cpu().detach())
    
    total_preds = torch.cat(total_preds)
    total_labels = torch.cat(total_labels)

    if return_loss :
        total_losses = torch.FloatTensor(total_losses)

    return total_preds, total_labels, total_losses


def evaluate(model, tokenizer, criterion, dataloader):
    total_preds, total_labels, total_losses = predict(model, tokenizer, criterion, dataloader)

    eval_loss = torch.mean(total_losses)
    eval_metrics = compute_vad_eval_metric(total_preds, total_labels)

    return eval_loss, eval_metrics




### Training

In [10]:
config = AutoConfig.from_pretrained(vad_reg['model'])
tokenizer = AutoTokenizer.from_pretrained(vad_reg['model'])
net = PretrainedLMModel(config).to(device)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.decoder.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [11]:
EPOCHS = vad_reg['n_epochs']

criterion = nn.MSELoss()
optimizer = AdamW(
    net.parameters(),
    lr=vad_reg["learning_rate"],
    betas=(0.9, 0.98),
    eps=1e-06,
    #correct_bias=False,
)

net.train()
for epoch in tqdm(range(EPOCHS)):
    for idx, (text, labels) in tqdm(enumerate(eb_train_loader), total=len(eb_train_loader), leave=False):
        optimizer.zero_grad()
        encoded = tokenizer.batch_encode_plus(
                        text,                      # Sentence to encode.
                        add_special_tokens = True, # Add '[CLS]' and '[SEP]'
                        max_length = vad_reg['max_seq_len'],           # Pad & truncate all sentences.
                        padding = 'max_length',
                        return_attention_mask = True,   # Construct attn. masks.
                        return_tensors = 'pt',     # Return pytorch tensors.
                        truncation=True
        )
        #print(encoded['input_ids'].shape, encoded['attention_mask'].shape)
        
        # forward
        input_ids = encoded['input_ids'].to(device)
        attention_mask = encoded['attention_mask'].to(device)
        _, logits, preds = net(input_ids=input_ids, attention_mask=attention_mask)
        
        # compute loss
        #print("logits : {} / labels : {}".format(logits,labels))
        loss = criterion(F.relu(logits), labels.to(device))
        
        # backward
        loss.backward()

        # scheduler, optimizer step
        optimizer.step()
    #print("Epoch {} : loss = {}".format(epoch+1,loss.item()))

torch.save({
    'model' : net.state_dict(),
}, save_path+'vad_reg_model_{}_{}.pth'.format(vad_reg['model'],EPOCHS))

  0%|          | 0/10 [00:00<?, ?it/s]

  0%|          | 0/504 [00:00<?, ?it/s]

### Load Checkpoint

In [None]:
if vad_reg['load_checkpoint'] :
    model_path = save_path+'vad_reg_model_{}_{}.pth'.format(vad_reg['model'],vad_reg['n_epochs'])
    chk = torch.load(model_path)
    net.load_state_dict(chk['model'])

In [None]:
net.eval()
with torch.no_grad():
    eval_loss, eval_metrics = evaluate(net,tokenizer,criterion,eb_test_loader)
    print(eval_loss)
    print(eval_metrics)

### Apply model on IMDB dataset

In [None]:
# Train set
with torch.no_grad():
    total_preds, total_labels, _ = predict(net, tokenizer, criterion, imdb_train_loader, return_loss=False, max_seq_len=sentiment['max_seq_len'])

    #for Sklearn
    X_train = total_preds.detach().numpy()
    y_train = total_labels.detach().numpy()

    np.save(save_path+'imdb_X_train.npy',X_train)
    np.save(save_path+'imdb_y_train.npy',y_train)

In [None]:
# Test set
with torch.no_grad():
    total_preds, total_labels, _ = predict(net, tokenizer, criterion, imdb_test_loader, return_loss=False, max_seq_len=sentiment['max_seq_len'])

    #for Sklearn
    X_test = total_preds.detach().numpy()
    y_test = total_labels.detach().numpy()

    np.save(save_path+'imdb_X_test.npy',X_test)
    np.save(save_path+'imdb_y_test.npy',y_test)

# SKlearn Classifier

### Load Checkpoint

In [None]:
X_train = np.load(save_path+'imdb_X_train.npy')
y_train = np.load(save_path+'imdb_y_train.npy')
X_test = np.load(save_path+'imdb_X_test.npy')
y_test = np.load(save_path+'imdb_y_test.npy')

In [None]:
#Some examples
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
print(X_train[:10])
print(y_train[:10])

### Parameters

In [None]:
#Parameters for classifiers
clf_params = {
    #KNN
    'n_neighbors' : 10,            #use K-nearest neighbors
    #Naive-Bayes
    'alpha' : 1.0,                #Smoothing
    #SVM
    'C' : 1.0,                    #Regularization Strength
    'kernel' : 'rbf',             #{‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’, ‘precomputed’}
    'degree' : 3,                 #Only for 'poly'
    'gamma' : 'scale',            #{‘scale’, ‘auto’}    
    #RF
    'n_estimators' : 100,         #Number of trees
    'max_samples' : 0.632,        #Bootstrap-subset size
    #LDA
    'solver' : 'svd',             #{‘svd’, ‘lsqr’, ‘eigen’}
    'shrinkage' : None,           #None - 'svd / None, 'Auto' - {'lsqr','eigen'}
}

### KNN

In [None]:
from sklearn.neighbors import KNeighborsClassifier

clf_KNN = KNeighborsClassifier(n_neighbors=clf_params['n_neighbors'])

clf_KNN.fit(X_train, y_train)

y_pred = clf_KNN.predict(X_test)
print(classification_report(y_test, y_pred))

### Naive-Bayes

In [None]:
from sklearn.naive_bayes import MultinomialNB

clf_NB = MultinomialNB(alpha=clf_params['alpha'],
                       )

clf_NB.fit(X_train,y_train)

y_pred_NB = clf_NB.predict(X_test)
print(classification_report(y_test, y_pred_NB))

### SVM

In [None]:
from sklearn.svm import SVC

clf_SVM = SVC(C=clf_params['C'],
              kernel=clf_params['kernel'],
              degree=clf_params['degree'],
              gamma=clf_params['gamma'],
              random_state=0
              )

clf_SVM.fit(X_train,y_train)

y_pred_SVM = clf_SVM.predict(X_test)
print(classification_report(y_test, y_pred_SVM))

### Decision Tree

In [None]:
from sklearn.tree import DecisionTreeClassifier

clf_DT = DecisionTreeClassifier(random_state=0)

clf_DT.fit(X_train,y_train)

y_pred_DT = clf_DT.predict(X_test)
print(classification_report(y_test, y_pred_DT))

### Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier

clf_RF = RandomForestClassifier(n_estimators=clf_params['n_estimators'],
                                bootstrap=True,
                                max_samples=clf_params['max_samples']
                                )

clf_RF.fit(X_train,y_train)

y_pred_RF = clf_RF.predict(X_test)
print(classification_report(y_test, y_pred_RF))

### LDA (Linear Discriminant Analysis)

In [None]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

clf_LDA = LinearDiscriminantAnalysis(solver=clf_params['solver'],
                                     shrinkage=clf_params['shrinkage']
                                     )

clf_LDA.fit(X_train,y_train)

y_pred_LDA = clf_LDA.predict(X_test)
print(classification_report(y_test, y_pred_LDA))