# Импорт всех библиотек

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import json
import numpy as np
import pandas as pd

from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import balanced_accuracy_score

import seaborn as sns
import matplotlib.pyplot as plt

import sklearn.svm
from sklearn.decomposition import PCA

from sklearn.preprocessing import StandardScaler

from pickle import dump as save
from pickle import load as download

import torch 
import transformers
import torch.nn as nn
from torch import cuda
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.optim.lr_scheduler import StepLR
from transformers import RobertaModel, RobertaTokenizer
from torch.utils.data import TensorDataset, DataLoader, WeightedRandomSampler

# Функции
## Для общей работы

In [None]:
def statistic(y_pred_test, y_test, y_pred_train, y_train): #статистика
    score_table = pd.DataFrame(columns=('train', 'test'))

    b_train = balanced_accuracy_score(y_pred_train, y_train)
    b_test = balanced_accuracy_score(y_pred_test, y_test)
    score_table.loc['balanced_accuracy_score', :] = (b_train, b_test)

    a_train = accuracy_score(y_pred_train, y_train)
    a_test = accuracy_score(y_pred_test, y_test)
    score_table.loc['accuracy_score', :] = (a_train, a_test)

    f1_train = f1_score(y_pred_train, y_train, average='weighted')
    f1_test = f1_score(y_pred_test, y_test, average='weighted')
    score_table.loc['f1 weighted', :] = (f1_train, f1_test)

    f1_train = f1_score(y_pred_train, y_train, average='micro')
    f1_test = f1_score(y_pred_test, y_test, average='micro')
    score_table.loc['f1 micro', :] = (f1_train, f1_test)

    f1_train = f1_score(y_pred_train, y_train, average='macro')
    f1_test = f1_score(y_pred_test, y_test, average='macro')
    score_table.loc['f1 macro', :] = (f1_train, f1_test)

    return score_table

In [None]:
def cm_plot(y_test, y_pred, y_type): #матрица ошибок

    cm = confusion_matrix(y_test, y_pred)
    # Normalise
    cmn = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    fig, ax = plt.subplots(figsize=(5,5))
    sns.heatmap(cmn, cmap='Blues', annot=True, fmt='.2f')
    sns.set(font_scale=1.3)
    plt.title(f'Confusion Matrix of {y_type}')

    return plt.show()

## Классы данных

In [None]:
class Dataset_audio(Dataset):
    def __init__(self, x, y=None):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return np.array([self.x[idx]]), self.y[idx]

In [None]:
class SentimentData(Dataset):
    def __init__(self, dataframe, y, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = pd.concat([dataframe, y], axis=1)
        self.text = dataframe.Utterance
        self.targets = y
        self.max_len = max_len
        self.utt = dataframe.Utterance_ID
        self.dia = dataframe.Dialogue_ID

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            pad_to_max_length=True,
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]

        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.float),
            'utt': self.utt[index],
            'dia': self.dia[index]
        }

## НС

In [None]:
class RobertaClass(torch.nn.Module):
    def __init__(self):
        super(RobertaClass, self).__init__()
        self.l1 = RobertaModel.from_pretrained("roberta-base")
        #secret :)

    def forward(self, input_ids, attention_mask, token_type_ids):
        
        #secret :))
        
        return output

# Работа с моделями
## Aудио

In [None]:
X_train = pd.read_csv('/kaggle/input/pooling/train_openSMILE.csv', header=0, index_col='file', sep=',')
X_val = pd.read_csv('/kaggle/input/pooling/dev_openSMILE.csv', header=0, index_col='file', sep=',')
X_test = pd.read_csv('/kaggle/input/pooling/test_openSMILE.csv', header=0, index_col='file', sep=',')

y_train = X_train['target']
X_train.drop(['target'], inplace=True, axis=1)

y_train = y_train.replace(['anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise'], 
                                              [0, 1, 2, 3, 4, 5, 6])
y_test = X_test['target']
X_test.drop(['target'], inplace=True, axis=1)

y_test = y_test.replace(['anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise'], 
                                              [0, 1, 2, 3, 4, 5, 6])

y_val = X_val['target']
X_val.drop(['target'], inplace=True, axis=1)

y_val = y_val.replace(['anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise'], 
                                              [0, 1, 2, 3, 4, 5, 6])

In [None]:
f = open('/kaggle/input/roberta-meld/scaler.pickle', 'rb')
scaler = download(f)
f.close

f = open('/kaggle/input/roberta-meld/PCA.pickle', 'rb')
pca = download(f)
f.close

f = open('/kaggle/input/roberta-meld/SVC_model_audio_36score.pickle', 'rb')
classifier = download(f)
f.close

<function BufferedReader.close>

In [None]:
def get_audio_prob(data):
    targets = {0: 'anger', 1: 'disgust', 2: 'fear', 3: 'sadness', 4: 'neutral', 5: 'joy', 
                                               6: 'surprise'}
    probs_Audio = pd.DataFrame(columns=('name', 'anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise', 'targets'))
    
    for index in data.index:
        row = np.array(data.loc[index])
        row = row.reshape(-1, 1).T
        row = scaler.transform(row)
        row = pca.transform(row)
        probs = classifier.predict_proba(row)
        
        l = tuple([index[:-4]] + list(probs[0]) + [targets[probs.argmax()]])
        probs_Audio.loc[probs_Audio.shape[0], :] = l

    
    return probs_Audio

In [None]:
probs_Audio_dev = get_audio_prob(X_val)
probs_Audio_train = get_audio_prob(X_train)
probs_Audio_test = get_audio_prob(X_test)

In [None]:
probs_Audio_dev = probs_Audio_dev.set_index('name')
probs_Audio_train = probs_Audio_train.set_index('name')
probs_Audio_test = probs_Audio_test.set_index('name')

probs_Audio_train.to_csv('probs_audio_train.csv')
probs_Audio_dev.to_csv('probs_audio_dev.csv')
probs_Audio_test.to_csv('probs_audio_test.csv')

## Как вытащить роберту и словарь

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base', truncation=True, do_lower_case=True)

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

Downloading (…)olve/main/merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

In [None]:
model = torch.load('/kaggle/input/pooling/pytorch_roberta_sentiment_65-32.bin')

model.eval()
None

In [None]:
train = pd.read_csv('/kaggle/input/pooling/train_sent_emo.csv', delimiter=',', header=0)
val = pd.read_csv('/kaggle/input/pooling/dev_sent_emo.csv', delimiter=',', header=0)
test = pd.read_csv('/kaggle/input/pooling/test_sent_emo.csv', delimiter=',', header=0)

train['Utterance'] = train['Utterance'].str.replace('\x92','\'')
val['Utterance'] = val['Utterance'].str.replace('\x92','\'')
test['Utterance'] = test['Utterance'].str.replace('\x92','\'')

y_train = train['Emotion'].replace(['anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise'], 
                                              [0, 1, 2, 3, 4, 5, 6])
y_val = val['Emotion'].replace(['anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise'], 
                                              [0, 1, 2, 3, 4, 5, 6])
y_test = test['Emotion'].replace(['anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise'], 
                                              [0, 1, 2, 3, 4, 5, 6])

train.drop(['Sr No.', 'Speaker', 'Emotion', 'Sentiment', 'Season', 'Episode', 'StartTime', 'EndTime'], axis=1, inplace=True)
val.drop(['Sr No.', 'Speaker', 'Emotion', 'Sentiment', 'Season', 'Episode', 'StartTime', 'EndTime'], axis=1, inplace=True)
test.drop(['Sr No.', 'Speaker', 'Emotion', 'Sentiment', 'Season', 'Episode', 'StartTime', 'EndTime'], axis=1, inplace=True)

device = 'cuda' if cuda.is_available() else 'cpu'

In [None]:
parameters = { 
    #secret
             }

In [None]:
train_set = SentimentData(train, y_train, tokenizer, parameters['max_len'])
val_set = SentimentData(val, y_val, tokenizer, parameters['max_len'])
test_set = SentimentData(test, y_test, tokenizer, parameters['max_len'])

In [None]:
train_params = {'batch_size': parameters['batch_size'],
                'shuffle': False,
                'num_workers': 0
                }

val_params = {'batch_size': parameters['batch_size'],
                'shuffle': False,
                'num_workers': 0
                }

test_params = {'batch_size': parameters['batch_size'],
                'shuffle': False,
                'num_workers': 0
                }

train_loader = DataLoader(train_set, **train_params)
val_loader = DataLoader(val_set, **val_params)
test_loader = DataLoader(test_set, **test_params)

In [None]:
class SentimentData(Dataset):
    def __init__(self, text, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.text = text
        self.max_len = max_len

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = self.text[index]
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            # pad_to_max_length=True,
            padding='max_length',
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]

        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'text': self.text
        }

In [None]:
from scipy.special import softmax

In [None]:
def get_probs(model, loader):
    
    model.eval()
    device = 'cuda' if cuda.is_available() else 'cpu'
    targets = {0: 'anger', 1: 'disgust', 2: 'fear', 3: 'sadness', 4: 'neutral', 5: 'joy', 
                                               6: 'surprise'}
    probs_Text = pd.DataFrame(columns=('name', 'anger', 'disgust', 'fear', 'sadness', 'neutral', 'joy', 
                                               'surprise', 'targets'))
    
    with torch.no_grad():
        for _, data in (enumerate(loader, 0)):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            
            outputs = model(ids, mask, token_type_ids)
            ind_targed = int(outputs.argmax(1).int())
            
            name = ['dia' + str(int(data['dia'].int())) + '_utt' + str(int(data['utt'].int()))]
            l = tuple(name + list(softmax(np.array(outputs.cpu()))[0]) + [targets[ind_targed]])
            probs_Text.loc[probs_Text.shape[0], :] = l

    
    return probs_Text


In [None]:
probs_Text_val = get_probs(model, val_loader)
probs_Text_train = get_probs(model, train_loader)
probs_Text_test = get_probs(model, test_loader)

In [None]:
probs_Text_val = probs_Text_val.set_index('name')
probs_Text_train = probs_Text_train.set_index('name')
probs_Text_test = probs_Text_test.set_index('name')

probs_Text_train.to_csv('probs_text_train.csv')
probs_Text_val.to_csv('probs_text_dev.csv')
probs_Text_test.to_csv('probs_text_test.csv')

In [None]:
def get_emb(model, loader):
    
    dict_out_pooler = {}
    dict_out_dim = {}
    model.eval()
    device = 'cuda' if cuda.is_available() else 'cpu'
    targets = {0: 'anger', 1: 'disgust', 2: 'fear', 3: 'sadness', 4: 'neutral', 5: 'joy', 
                                               6: 'surprise'}
    
    with torch.no_grad():
        for _, data in (enumerate(loader, 0)):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            ind_target = int(data['targets'].to(device, dtype = torch.long).int())
            
            name = 'dia' + str(int(data['dia'].int())) + '_utt' + str(int(data['utt'].int()))
            
            pooler_outp = model.l1(input_ids=ids, attention_mask=mask, 
                     token_type_ids=token_type_ids).pooler_output
            
            emb_pooler = np.array(pooler_outp[0].tolist())
            emb_out_dim = np.array(model.pre_classifier(pooler_outp)[0].tolist())
            
            dict_out_pooler[name] = {'emb': emb_pooler, 'target': targets[ind_target]}
            dict_out_dim[name] = {'emb': emb_out_dim, 'target': targets[ind_target]}
            

    return dict_out_pooler, dict_out_dim


In [None]:
dev_pooler, dev_dim = get_emb(model, val_loader)
train_pooler, train_dim = get_emb(model, train_loader)
test_pooler, test_dim = get_emb(model, test_loader)

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


In [None]:
np.save('dev_text_emb_768', dev_pooler)
np.save('train_text_emb_768', train_pooler)
np.save('test_text_emb_768', test_pooler)

np.save('dev_text_emb_512', dev_dim)
np.save('train_text_emb_512', train_dim)
np.save('test_text_emb_512', test_dim)