In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
from sklearn import metrics

In [None]:
torch.manual_seed(0)

model_name = 'cardiffnlp/twitter-roberta-base-emotion-multilabel-latest'

tokenizer = AutoTokenizer.from_pretrained(model_name)

max_len = 128
batch_size = 16
epochs = 4
learning_rate = 1e-05

In [None]:
class Dataset(torch.utils.data.Dataset) :
    def __init__(self, df) :
        self.text = df["text"]
        self.targets = df.iloc[:,1:].values

    def __len__(self) :
        return len(self.text)

    def tokenize_text(self, text) :
        return tokenizer.encode_plus(text,
            add_special_tokens=True,
            max_length=max_len,
            padding="max_length",
            return_token_type_ids=True,
            return_attention_mask=True,
            return_tensors="pt",
            truncation=True)

    def __getitem__(self, index) :
        tokenized_text = self.tokenize_text(self.text[index])
        return {
        "input_ids": tokenized_text["input_ids"].flatten(),
        "attention_mask": tokenized_text["attention_mask"].flatten(),
        "token_type_ids": tokenized_text["token_type_ids"].flatten(),
        "targets": torch.FloatTensor(self.targets[index])
        }

In [None]:
train_df = pd.read_csv("./train.csv")
val_df = pd.read_csv("./validation.csv")
test_df = pd.read_csv("./test.csv")

train_dataset = Dataset(train_df)
val_dataset = Dataset(val_df)
test_dataset = Dataset(test_df)

train_data_loader = torch.utils.data.DataLoader(train_dataset,
    batch_size=batch_size,
    shuffle=True)

val_data_loader = torch.utils.data.DataLoader(val_dataset,
    batch_size=batch_size,
    shuffle=False)

test_data_loader = torch.utils.data.DataLoader(test_dataset,
    batch_size=batch_size,
    shuffle=False)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
class Model(torch.nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.bert_model = AutoModel.from_pretrained(model_name)
        self.dropout = torch.nn.Dropout(0.4)
        self.linear = torch.nn.Linear(768, 3) # input:embedding length (768), output:number of labels (3)
    def forward(self, input_ids, attention_mask, token_type_ids) :
        bert_output = self.bert_model(input_ids, attention_mask, token_type_ids).pooler_output
        bert_output = self.dropout(bert_output)
        final_output = self.linear(bert_output)
        return final_output

In [None]:
model = Model()
model.to(device)

In [None]:
class_weights_train = (1-train_df.iloc[:,1:].value_counts(normalize=True)).values
class_weights_train = torch.from_numpy(class_weights_train).float().to(device)

class_weights_validation = (1-val_df.iloc[:,1:].value_counts(normalize=True)).values
class_weights_validation = torch.from_numpy(class_weights_validation).float().to(device)

class_weights_test = (1-test_df.iloc[:,1:].value_counts(normalize=True)).values
class_weights_test = torch.from_numpy(class_weights_test).float().to(device)

In [None]:
def loss_function(output_values, target_values, weights):
    return torch.nn.BCEWithLogitsLoss(weight=weights)(output_values, target_values)

optimizer = torch.optim.Adam(model.parameters(), learning_rate)

In [None]:
# function for training in an epoch
def train_for_one_epoch(model, training_loader, optimizer) :
    predictions = []
    labels = []
    model.train()
    training_loss = 0
    # loop for every batch
    for batch_data in training_loader :
        optimizer.zero_grad()
        input_ids = batch_data["input_ids"].to(device, dtype = torch.long)
        attention_mask = batch_data["attention_mask"].to(device, dtype = torch.long)
        token_type_ids = batch_data["token_type_ids"].to(device, dtype = torch.long)
        targets = batch_data["targets"].to(device, dtype = torch.float)
        # find model output
        calculated_labels = model(input_ids, attention_mask, token_type_ids)
        # find loss aka difference of output and target labels
        loss = loss_function(calculated_labels, targets, class_weights_train)
        # get target labels and predictions
        labels.extend(targets.cpu().detach().numpy().tolist())
        predictions.extend(calculated_labels.cpu().detach().numpy().tolist())
        # compute the gradient of the loss with respect to each weight
        loss.backward()
        # adjust the weights by the gradients collected in the backward pass.
        optimizer.step()
        training_loss += loss.item()
        
    # find loss per batch
    training_loss /= len(training_loader.dataset)
    # prediction for a label is 1 if itɐs >= 0.5. Otherwise itɐs 0
    predictions = np.array(predictions) >= 0.5

    # find f1 score (macro)
    f1_macro = metrics.f1_score(labels, predictions, average="macro")
    print("\t training: " + f"loss: {training_loss:.3f}, F1 macro: {f1_macro:.3f}")

In [None]:
# function for validating in an epoch
def validate_for_one_epoch(model, validation_loader) :
    predictions = []
    labels = []
    model.eval()
    validation_loss = 0
    with torch.no_grad() :
    # loop for every batch
        for batch_data in validation_loader :
            input_ids = batch_data["input_ids"].to(device, dtype = torch.long)
            attention_mask = batch_data["attention_mask"].to(device, dtype = torch.long)
            token_type_ids = batch_data["token_type_ids"].to(device, dtype = torch.long)
            targets = batch_data["targets"].to(device, dtype = torch.float)
            # find model output
            calculated_labels = model(input_ids, attention_mask, token_type_ids)
            # find loss aka difference of output and target labels
            validation_loss += loss_function(calculated_labels, targets, class_weights_validation).item()
            # get target labels and predictions
            labels.extend(targets.cpu().detach().numpy().tolist())
            predictions.extend(calculated_labels.cpu().detach().numpy().tolist())

    # find loss per batch
    validation_loss /= len(validation_loader.dataset)

    # prediction for a label is 1 if itɐs >= 0.5. Otherwise itɐs 0
    predictions = np.array(predictions) >= 0.5
    
    # find f1 score (macro)
    f1_macro = metrics.f1_score(labels, predictions, average="macro")
    print("\t validation: " + f"loss: {validation_loss:.3f}, F1 macro: {f1_macro:.3f}")

In [None]:
# function for testing
def test(model, test_loader) :
    predictions = []
    labels = []

    model.eval()
    with torch.no_grad() :
        for batch_data in test_loader :
            input_ids = batch_data["input_ids"].to(device, dtype = torch.long)
            attention_mask = batch_data["attention_mask"].to(device, dtype = torch.long)
            token_type_ids = batch_data["token_type_ids"].to(device, dtype = torch.long)
            targets = batch_data["targets"].to(device, dtype = torch.float)
            # find model output
            calculated_labels = model(input_ids, attention_mask, token_type_ids)
            # get target labels and predictions
            labels.extend(targets.cpu().detach().numpy().tolist())
            predictions.extend(calculated_labels.cpu().detach().numpy().tolist())
                               
    # prediction for a label is 1 if itɐs >= 0.5. Otherwise itɐs 0
    predictions = np.array(predictions) >= 0.5

    # find f1 scores for each class and macro
    f1_scores = metrics.f1_score(labels, predictions, average=None)
    f1_macro = metrics.f1_score(labels, predictions, average="macro")

    print("\t testing: " + f"F1 scores: {dict(zip(train_df.columns[1:].tolist(), f1_scores))}")
    print(f"\t F1 macro: {f1_macro}")

In [None]:
# function for training, validating for n_epochs and ultimately testing
def train_for_n_epochs(model, training_loader, validation_loader, test_loader, optimizer, n_epochs):
    for epoch in range(n_epochs) :
        print(f"epoch {epoch + 1}")
        train_for_one_epoch(model, training_loader, optimizer)
        validate_for_one_epoch(model, validation_loader)
        print()
        test(model, test_loader)
        return model

In [None]:
trained_model = train_for_n_epochs(model, train_data_loader, val_data_loader, test_data_loader, optimizer, epochs)

In [None]:
import contractions
import re
import string
import emoji
import json
from collections import Counter

In [None]:
# function for getting sentiment category of a text
def get_category(model, text) :
    def convert_emoji(text):
        text = emoji.demojize(text).replace(":"," ")
        return text
    
    punctuation_marks = string.punctuation.replace("'","").replace("-","")
    def remove_punctuation(text):
        return text.translate(str.maketrans(punctuation_marks, " "*len(punctuation_marks)))
    
    def convert_lower(text) :
        return text.lower()
    
    with open("./chat_words.txt", "r") as f :
        chat_words = json.load(f)
    
    def expand_chat_words(text) :
        return " ".join([chat_words[i] if i in chat_words else i for i in text.split()])
    
    def expand_contractions(text):
        return contractions.fix(text)
    
    def substitute_laugh(text) :
        return re.sub("ha", "laugh ", text)
    
    def remove_non_latin_emoji(text):
        return re.sub(r"[^A-Za-z- '\U0001F300-\U0001F64F\U0001F680-\U0001F6FF\U0001F910-\U0001F96B\U0001F980-\U0001F9E0]", '', text)

    def preprocessing_pipeline(text) :
        text = text.replace("[NAME]", "").replace("[RELIGION]", "")
        text = remove_punctuation(text)
        text = re.sub(r'[’‘´`“”"]', "'", text)
        text = convert_lower(text)
        text = substitute_laugh(text)
        text = expand_chat_words(text)
        text = expand_contractions(text)
        text = remove_non_latin_emoji(text)
        return text
    
    t = tokenizer.encode_plus(preprocessing_pipeline(text),
                                add_special_tokens=True,
                                max_length=max_len,
                                padding="max_length",
                                return_token_type_ids=True,
                                return_attention_mask=True,
                                return_tensors="pt",
                                truncation=True)
    
    input_ids = t["token_type_ids"].to(device, dtype = torch.long)
    attention_mask = t["token_type_ids"].to(device, dtype = torch.long)
    token_type_ids = t["token_type_ids"].to(device, dtype = torch.long)

    calculated_labels = model(input_ids, attention_mask, token_type_ids)
    results = (dict(zip(["positive", "negative", "neutral"],torch.sigmoid(calculated_labels).cpu().detach().numpy().tolist()[0])))
    
    return max(results, key=results.get)

In [None]:
def create_sentimental_profile(model, sentences_personA, sentences_personB) :
    # get sentiments for sentences for every person
    categories_personA = [get_category(model, item) for item in sentences_personA]
    categories_personB = [get_category(model, item) for item in sentences_personB]

    # find frequencies of sentiments for every person
    frequencies_personA = Counter(categories_personA)
    frequencies_personB = Counter(categories_personB)

    # find non-negative intensity for every person
    intensity_personA = (frequencies_personA["positive"] + frequencies_personA["neutral"])/len(categories_personA)
    intensity_personB = (frequencies_personB["positive"] + frequencies_personB["neutral"])/len(categories_personB)
    
    # find interaction intensity as arithmetic mean
    return (intensity_personA + intensity_personB)/2