In [25]:
import numpy as np
import random
import tensorflow as tf
import tensorflow_datasets as tfds
import torch
import torch.nn as nn
import torch.nn.functional as F
import bert
import transformers as T
import re
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
import seaborn as sns

In [2]:
train_text = open("C:/Users/HP/Desktop/AI/Emotions NLP/train.txt","r",encoding="utf8", errors="ignore").read().split("\n")
val_text = training = open("C:/Users/HP/Desktop/AI/Emotions NLP/val.txt","r",encoding="utf8", errors="ignore").read().split("\n")
test_text = training = open("C:/Users/HP/Desktop/AI/Emotions NLP/test.txt","r",encoding="utf8", errors="ignore").read().split("\n")

In [3]:
# Doing a first cleaning of the texts
def clean_text(text):
    text = text.lower()
    text = re.sub(r"i'm", "i am", text) # replace "i'm" with "i am"
    text = re.sub(r"im", "i am", text)
    text = re.sub(r"ive", "i have", text)
    text = re.sub(r"he's", "he is", text)
    text = re.sub(r"she's", "she is", text)
    text = re.sub(r"that's", "that is", text)
    text = re.sub(r"what's", "what is", text)
    text = re.sub(r"where's", "where is", text)
    text = re.sub(r"how's", "how is", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"n't", "not", text)
    text = re.sub(r"won't", "will not", text)
    text = re.sub(r"wont", "will not", text)
    text = re.sub(r"won t", "will not", text)
    text = re.sub(r"didn't", "did not", text)
    text = re.sub(r"didnt", "did not", text)
    text = re.sub(r"didn t", "did not", text)
    text = re.sub(r"can't", "cannot", text)
    text = re.sub(r"cant", "cannot", text)
    text = re.sub(r"can t", "cannot", text)
    #text = re.sub(r"[-()\"#/@:<>{}+=~|.?,!]", "", text)
    text = re.sub(r"[-()\"#/@;:<>{}+=~|.?,!,...]", "", text)
    return text

In [4]:
def split(text):
    X = []
    y = []
    #new_text = text.split('\n')
    for i in text:
        if ";" not in i:
            pass
            #X.append(i)
        else:
            ali = i.split(";")
            X.append(clean_text(ali[0]))
            y.append(clean_text(ali[1]))
        
    return X, y

X_train, y_train_unmapped = split(train_text)
X_test, y_test_unmapped = split(test_text)
X_val, y_val_unmapped = split(val_text)

In [5]:
y = set(y_train_unmapped)
emotions_dict = {"love":0, "sadness":1, "anger":2, "surprise":3, "joy":4, "fear":5}

y_train = [emotions_dict[i] for i in y_train_unmapped]
y_test = [emotions_dict[i] for i in y_test_unmapped]
y_val = [emotions_dict[i] for i in y_val_unmapped]

In [6]:
model_name = 'bert-base-cased'
tokenizer = T.BertTokenizer.from_pretrained(model_name)

In [26]:
token_lens = []
for sentence in X_train:
    tokens = tokenizer.encode(sentence, max_length=512)
    token_lens.append(len(tokens))

In [7]:
class EmotionDataset(Dataset):
    def __init__(self, sentences, targets, tokenizer, max_len):
        self.sentences = sentences
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, item):
        review = str(self.sentences[item])
        target = self.targets[item]

        encoding = self.tokenizer.encode_plus(
          sentences,
          add_special_tokens=True,
          max_length=self.max_len,
          return_token_type_ids=False,
          pad_to_max_length=True,
          return_attention_mask=True,
          return_tensors='pt',
        )

        return {
          'review_text': sentences,
          'input_ids': encoding['input_ids'].flatten(),
          'attention_mask': encoding['attention_mask'].flatten(),
          'targets': torch.tensor(target, dtype=torch.long)
        }

In [8]:
def create_data_loader(text_x, text_y, tokenizer, max_len, batch_size):
    ds = EmotionDataset(
        sentences=text_x,
        targets=text_y,
        tokenizer=tokenizer,
        max_len=max_len
      )

    return DataLoader(
        ds,
        batch_size=batch_size,
        num_workers=4
      )

In [9]:
BATCH_SIZE = 16
MAX_LEN = 512
train_data_loader = create_data_loader(X_train, y_train, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(X_val, y_val, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(X_test, y_test, tokenizer, MAX_LEN, BATCH_SIZE)

In [11]:
bert_model = T.BertModel.from_pretrained(model_name)

100%|████████████████████████████████████████████████████████████████| 435779157/435779157 [04:43<00:00, 1536422.88B/s]


In [None]:
last_hidden_state, pooled_output = bert_model(
  input_ids=encoding['input_ids'], 
  attention_mask=encoding['attention_mask']
)

In [14]:
class SentimentClassifier(nn.Module):

    def __init__(self, n_classes):
        super(SentimentClassifier, self).__init__()
        self.bert = T.BertModel.from_pretrained(model_name)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size, n_classes)

    def forward(self, input_ids, attention_mask):
        _, pooled_output = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        output = self.drop(pooled_output)
        return self.out(output)

In [21]:
model = SentimentClassifier(n_classes=6)

EPOCHS = 10

# Optimizer Adam 
optimizer = T.AdamW(model.parameters(), lr=2e-5)

total_steps = len(train_data_loader) * EPOCHS

#scheduler = T.get_linear_schedule_with_warmup(
#    optimizer,
#    num_warmup_steps=0,
#    num_training_steps=total_steps
#)

# Set the loss function 
loss_fn = nn.CrossEntropyLoss()

In [29]:
# Function for a single training iteration
def train_epoch(model, data_loader, loss_fn, optimizer, n_examples):
    model = model.train()
    losses = []
    correct_predictions = 0
    
    for d in data_loader:
        input_ids = d["input_ids"]
        attention_mask = d["attention_mask"]
        targets = d["targets"]
        
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, targets)
        correct_predictions += torch.sum(preds == targets)
        losses.append(loss.item())
        
        # Backward prop
        loss.backward()
        
        # Gradient Descent
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        #scheduler.step()
        optimizer.zero_grad()
    
    return correct_predictions.double() / n_examples, np.mean(losses)


In [23]:
def eval_model(model, data_loader, loss_fn, n_examples):
    model = model.eval()
    
    losses = []
    correct_predictions = 0
    
    with torch.no_grad():
        for d in data_loader:
            input_ids = d["input_ids"]
            attention_mask = d["attention_mask"]
            targets = d["targets"]
            
            # Get model ouptuts
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, targets)
            
            correct_predictions += torch.sum(preds == targets)
            losses.append(loss.item())
            
    return correct_predictions.double() / n_examples, np.mean(losses)

In [None]:
history = defaultdict(list)
best_accuracy = 0

for epoch in range(EPOCHS):
    
    # Show details 
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    print("-" * 10)
    
    train_acc, train_loss = train_epoch(
        model,
        train_data_loader,
        loss_fn,
        optimizer,
        len(X_train)
    )
    
    print(f"Train loss {train_loss} accuracy {train_acc}")
    
    # Get model performance (accuracy and loss)
    val_acc, val_loss = eval_model(
        model,
        va_data_loader,
        loss_fn,
        len(X_val)
    )
    
    print(f"Val   loss {val_loss} accuracy {val_acc}")
    print()
    
    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_loss'].append(val_loss)
    
    # If we beat prev performance
    if val_acc > best_accuracy:
        torch.save(model.state_dict(), 'best_model_state.bin')
        best_accuracy = val_acc

In [None]:
# Plot training and validation accuracy
plt.plot(history['train_acc'], label='train accuracy')
plt.plot(history['val_acc'], label='validation accuracy')

# Graph chars
plt.title('Training history')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.ylim([0, 1]);

In [None]:
test_acc, _ = eval_model(
  model,
  test_data_loader,
  loss_fn,
  device,
  len(df_test)
)

test_acc.item()

In [None]:
def get_predictions(model, data_loader):
    model = model.eval()

    review_texts = []
    predictions = []
    prediction_probs = []
    real_values = []

    with torch.no_grad():
        for d in data_loader:
            texts = d["review_text"]
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            targets = d["targets"].to(device)

            # Get outouts
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            _, preds = torch.max(outputs, dim=1)

            review_texts.extend(texts)
            predictions.extend(preds)
            prediction_probs.extend(outputs)
            real_values.extend(targets)

    predictions = torch.stack(predictions).cpu()
    prediction_probs = torch.stack(prediction_probs).cpu()
    real_values = torch.stack(real_values).cpu()

    return review_texts, predictions, prediction_probs, real_values

In [None]:
y_review_texts, y_pred, y_pred_probs, y_test = get_predictions(
    model,
    test_data_loader
)

In [None]:
print(classification_report(y_test, y_pred, target_names=class_names))

In [None]:
def show_confusion_matrix(confusion_matrix):
    hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
    hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
    hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
    plt.ylabel('True sentiment')
    plt.xlabel('Predicted sentiment');

cm = confusion_matrix(y_test, y_pred)
df_cm = pd.DataFrame(cm, index=class_names, columns=class_names)
show_confusion_matrix(df_cm)

In [None]:
review_text = "I love completing my todos! Best app ever!!!"

encoded_review = tokenizer.encode_plus(
    review_text,
    max_length=MAX_LEN,
    add_special_tokens=True,
    return_token_type_ids=False,
    pad_to_max_length=True,
    return_attention_mask=True,
    return_tensors='pt',
)

input_ids = encoded_review['input_ids'].to(device)
attention_mask = encoded_review['attention_mask'].to(device)

output = model(input_ids, attention_mask)
_, prediction = torch.max(output, dim=1)

print(f'Review text: {review_text}')
print(f'Sentiment  : {class_names[prediction]}')