## Testing a single head model on custom dataset

In [None]:
%load_ext autoreload
%autoreload 2
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from ignite.metrics import Accuracy, Loss, Fbeta, recall, precision
from transformers import AdamW, AutoModelForSequenceClassification, AutoTokenizer
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from tqdm.notebook import trange, tqdm
from utils import load_dataset, tokenize_dataset, create_dataloader
import os
import glob
from models import ThreeHeadedMonster
from PIL import Image

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
ARTEMIS_EMOTIONS = ['amusement', 'awe', 'contentment', 'excitement',
                'anger', 'disgust',  'fear', 'sadness', 'something else']
EMOTION_ID = {e: i for i, e in enumerate(ARTEMIS_EMOTIONS)}

In [None]:
bert_version = 'xlm-roberta-base'
tokenizer = AutoTokenizer.from_pretrained(bert_version, padding_side='right')

data_root_path = 'dataset/'
model_root_path = './'

In [None]:
all_models = glob.glob('three_model/*')
print(all_models)
langs = ['english', 'arabic', 'chinese']

In [None]:
for lang in langs:
    for model_path in all_models:
        # loading the model
        model_path = os.path.join(model_root_path, model_path)
        model_name = model_path.split('/')[-1]
        model = ThreeHeadedMonster.load_pretrained(model_path, num_emo_classes=9)
        model.to(device)
        model.eval()

        data_path = os.path.join(data_root_path, f'test_{lang}/test_{lang}.csv')
        sentences, labels = load_dataset(data_path, ARTEMIS_EMOTIONS, split='test')
        tokens, masks = tokenize_dataset(tokenizer, sentences)
        dataloader = create_dataloader(tokens, masks, labels, batch_size=128, mode='test')
        
        # evaluation loop
        print(f'========= {model_name} :: {lang} =========')
        t = trange(len(dataloader), desc='ML')
        model.eval()
        metrics = {'Accuracy': Accuracy(), 
                   'Precision': precision.Precision(average=True), 
                   'Recall': recall.Recall(average=True), 
                   'F1': Fbeta(1),
                   'Loss': Loss(F.cross_entropy)
                  }
        for metric in metrics.values():
            metric.reset()
        for step, batch in zip(t, dataloader):
            input_ids = batch[0].to(device)
            input_mask = batch[1].to(device)
            labels = batch[2].to(device)
            with torch.no_grad():
                outputs = model(input_ids, 
                                token_type_ids=None, 
                                attention_mask=input_mask,
                                language=lang) 
            for metric in metrics.values():
                metric.update((outputs, labels.argmax(dim=1))) 
#             t.set_description(f'ML (loss={loss_avg.compute():.5f})')
            t.set_description('ML')
        for n, metric in metrics.items():
            print(f'   {n}: {metric.compute():.5f}')
        print(f'==========================================')

## 3-headed monster Analysis

In [None]:
# loading best model
model_path = "PATH TO BEST MODEL"
model = ThreeHeadedMonster.load_pretrained(model_path, num_emo_classes=9)
model.to(device)
model.eval();

bert_version = 'xlm-roberta-base'
tokenizer = AutoTokenizer.from_pretrained(bert_version, padding_side='right')

In [None]:
emo_classes = 9
def load_dataset(path):
    df = pd.read_csv(path)
    df = df.dropna()
    sentences = df['utterance']
    labels = df['emotion'].apply(lambda x: x.lower()).replace('other', 'something else').values
    labels_pt = torch.zeros((labels.shape[0], emo_classes))
    for i, emo in enumerate(labels):
        labels_pt[i, EMOTION_ID[emo]] = 1
    tokenized = tokenizer(sentences.to_list(), add_special_tokens=True, max_length=128,
                        truncation=True, padding='max_length', return_tensors='pt', return_attention_mask=True)
    train_inputs, train_masks, train_labels = tokenized['input_ids'], tokenized['attention_mask'], labels_pt
    batch_size = 2048
    train_dataset = TensorDataset(train_inputs, train_masks, train_labels)
    train_sampler = SequentialSampler(train_dataset)
    train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size)
    return df, train_dataloader

In [None]:
en_df, en_train_dataloader = load_dataset(os.path.join(data_root_path, f'english/train/artemis_preprocessed.csv'))
ar_df, ar_train_dataloader = load_dataset(os.path.join(data_root_path, f'arabic/train/artemis_preprocessed.csv'))
ch_df, ch_train_dataloader = load_dataset(os.path.join(data_root_path, f'chinese/train/artemis_preprocessed.csv'))

In [None]:
sp_df, sp_train_dataloader = load_dataset(os.path.join(data_root_path, f'test_spanish/test_spanish.csv'))

In [None]:
datasets = ['english', 'arabic', 'chinese', 'spanish']
dataloaders = {}
for d in datasets:
    dataloaders[d] = load_dataset(os.path.join(data_root_path, f'test_{d}/test_{d}.csv'))

In [None]:
def evaluate_model(model, train_dataloader, num_classes=3):
    batch_size = train_dataloader.batch_size
    all_scores = np.zeros((len(train_dataloader.dataset), num_classes))
    for step, batch in enumerate(tqdm(train_dataloader)):
        b_input_ids = batch[0].to(device)
        b_input_mask = batch[1].to(device)
        with torch.no_grad():
            outputs = model.predict_all(input_ids=b_input_ids, 
                        token_type_ids=None, 
                        attention_mask=b_input_mask)
        all_scores[step*batch_size : (step+1)*batch_size] = outputs
    return all_scores

In [None]:
from sklearn.metrics import accuracy_score

In [None]:
def get_mode(arr):
    preds = np.zeros(arr.shape[0])
    for i, row in enumerate(arr):
        v, c = np.unique(row, return_counts=True)
        idx = np.argmax(c) if np.any(c > 1) else np.random.choice(3)
        preds[i] = v[idx]
    return preds

In [None]:
for d_name, (dff, dl) in dataloaders.items():
    sp_emotions = evaluate_model(model, dl)
    modes = get_mode(sp_emotions)
    print(f'========== {d_name} ==========')
    print(f' english score: {accuracy_score(dff["emotion_label"].values, sp_emotions[:,0])}')
    print(f' arabic score: {accuracy_score(dff["emotion_label"].values, sp_emotions[:,1])}')
    print(f' chinese score: {accuracy_score(dff["emotion_label"].values, sp_emotions[:,2])}')
    print(f' mode score: {accuracy_score(dff["emotion_label"].values, modes)}')

In [None]:
def get_mode(arr):
    preds = np.zeros(arr.shape[0])
    for i, row in enumerate(arr):
        v, c = np.unique(row, return_counts=True)
        idx = np.argmax(c) if np.any(c > 1) else np.random.choice(3)
        preds[i] = v[idx]
    return preds

In [None]:
print(f' english score: {accuracy_score(sp_df["emotion_label"].values, sp_emotions[:,0])}')

In [None]:
print(f' arabic score: {accuracy_score(sp_df["emotion_label"].values, sp_emotions[:,1])}')

In [None]:
print(f' chinese score: {accuracy_score(sp_df["emotion_label"].values, sp_emotions[:,2])}')

## Heat Maps

In [None]:
en_emotions = evaluate_model(model, en_train_dataloader)
ar_emotions = evaluate_model(model, ar_train_dataloader)
ch_emotions = evaluate_model(model, ch_train_dataloader)

In [None]:
en_df[['en_emo', 'ar_emo', 'ch_emo']] = en_emotions
ar_df[['en_emo', 'ar_emo', 'ch_emo']] = ar_emotions
ch_df[['en_emo', 'ar_emo', 'ch_emo']] = ch_emotions

In [None]:
en_df.to_csv('monster_datasets/monster_english.csv', index=False)
ar_df.to_csv('monster_datasets/monster_arabic.csv', index=False)
ch_df.to_csv('monster_datasets/monster_chinese.csv', index=False)

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
ARTEMIS_EMOTIONS = ['amusement', 'awe', 'contentment', 'excitement',
                'anger', 'disgust',  'fear', 'sadness', 'something else']
EMOTION_ID = {e: i for i, e in enumerate(ARTEMIS_EMOTIONS)}

In [None]:
en_df = pd.read_csv('monster_datasets/monster_english.csv')
ar_df = pd.read_csv('monster_datasets/monster_arabic.csv')
ch_df = pd.read_csv('monster_datasets/monster_chinese.csv')

In [None]:
def get_report(df, l1, l2):
    y_true, y_pred = df[l1+'_emo'], df[l2+'_emo']
    cm= confusion_matrix(y_true, y_pred, normalize='pred').round(3)
    df_cm = pd.DataFrame(cm, index = [l1+'_'+i for i in ARTEMIS_EMOTIONS],
                         columns = [l2+'_'+i for i in ARTEMIS_EMOTIONS])
    plt.figure(figsize=(12,9))
    sns.heatmap(df_cm, annot=True)
    plt.show()
    print(classification_report(y_true, y_pred, target_names=ARTEMIS_EMOTIONS))

In [None]:
all_df = pd.concat([en_df,ar_df,ch_df])

In [None]:
lang_df = en_df
l1 = 'en'
l2 = 'ar'

In [None]:
y_true, y_pred = lang_df[l1+'_emo'], lang_df[l2+'_emo']
cm= confusion_matrix(y_true, y_pred, normalize='pred').round(2) * 100
df_cm = pd.DataFrame(cm, index = [i for i in ARTEMIS_EMOTIONS],
                     columns = [i for i in ARTEMIS_EMOTIONS])
fig = plt.figure(figsize=(16,13))
sns.set(font_scale=1)
sns.heatmap(df_cm, annot=True, annot_kws={"size": 28}, fmt='.0f')
plt.xticks(fontsize=36, rotation=90)
plt.yticks(fontsize=36, rotation=0)
plt.show()
fig.savefig(f'{l1}_{l2}.svg')
plt.show()

In [None]:
# loading the art genre
genres = pd.read_csv('dataset/wiki_art_genre_class.csv')
# adding art genre to the dataframe
df = pd.merge(lang_df, genres, on=['art_style', 'painting'])

In [None]:
all_genre_counts = df['genre'].value_counts()

In [None]:
emo1 = 'amusement'
emo2 = 'disgust'

In [None]:
ttdf = df[(df[f'{l1}_emo']==EMOTION_ID[emo1])
                           &(df[f'{l2}_emo']==EMOTION_ID[emo2])][['utterance', 'image_file', 'genre',
                                                                 'en_emo', 'ar_emo', 'ch_emo']]

In [None]:
ttdf = ttdf[ttdf['genre']=='nude_painting']

In [None]:
df = df[df['genre']=='portrait']

In [None]:
df = df[(df['en_emo'] < 4)& (df['ar_emo'] >= 4)]

In [None]:
idx = 44
print(df.iloc[idx]['utterance'])
print(df.iloc[idx][['en_emo', 'ar_emo', 'ch_emo']])
Image.open(df.iloc[idx]['image_file'])

In [None]:
idx = 34
print(df.iloc[idx]['utterance'])
print(df.iloc[idx][['en_emo', 'ar_emo', 'ch_emo']])
Image.open(df.iloc[idx]['image_file'])

In [None]:
# idx = 15
print(ttdf.iloc[idx]['utterance'])
print(ttdf.iloc[idx][['en_emo', 'ar_emo', 'ch_emo']])
Image.open(ttdf.iloc[idx]['image_file'])

In [None]:
idx = 11
print(ttdf.iloc[idx]['utterance'])
print(ttdf.iloc[idx][['en_emo', 'ar_emo', 'ch_emo']])
Image.open(ttdf.iloc[idx]['image_file'])

In [None]:
idx = 10
print(ttdf.iloc[idx]['utterance'])
print(ttdf.iloc[idx][['en_emo', 'ar_emo', 'ch_emo']])
Image.open(ttdf.iloc[idx]['image_file'])

In [None]:
conflict_genre_counts = df[(df['ar_emo']==EMOTION_ID['awe'])
                           &(df['ch_emo']==EMOTION_ID['fear'])]['genre'].value_counts()
#    &(df['utterance'].str.contains('nud'))]
zz = (conflict_genre_counts / all_genre_counts).sort_values(ascending=False)
plt.bar([z.replace('_', ' ') for z in zz.index], zz.values / sum(zz))
plt.xticks(fontsize=18, rotation=90)
plt.yticks(fontsize=18, rotation=0)
plt.show()

In [None]:
conflict_genre_counts = df[(df['ar_emo']==EMOTION_ID['contentment'])
                           &(df['ch_emo']==EMOTION_ID['sadness'])]['genre'].value_counts()
#    &(df['utterance'].str.contains('nud'))]
zz = (conflict_genre_counts / all_genre_counts).sort_values(ascending=False)
plt.bar(list(zz.index), zz.values)
plt.xticks(rotation=90)
plt.show()

In [None]:
get_report(all_df, 'en', 'ar')