In [None]:
from datasets import load_dataset, Dataset
import evaluate
from sklearn.model_selection import train_test_split
import torch
from tqdm import tqdm
import os
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, f1_score, precision_recall_fscore_support
from torch.utils.data import DataLoader, WeightedRandomSampler
import torch.nn as nn
import pandas as pd
import numpy as np
np.random.seed(101) 
from transformers import AutoTokenizer,BertForSequenceClassification, BertTokenizer, DistilBertModel, AutoModelForSequenceClassification, AutoConfig
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import seaborn as sns
tokenizer = AutoTokenizer.from_pretrained('SamLowe/roberta-base-go_emotions')

In [None]:
from sklearn.model_selection import KFold
from accelerate import Accelerator
num_folds = 5
kfold = KFold(n_splits=num_folds, shuffle=True, random_state=3001)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
dataset = load_dataset('json', data_files='lyrics_final.json')
dataset

In [None]:
input_ids = []
attention_masks = []
valence = []
choi_rep = []
sp_id = []
q1 = 4.2298
q3 = 6.0036
for i in tqdm(range(len(dataset['train']))):
    input_encodings = tokenizer(str(dataset['train'][i]["lyrics"]),max_length=256, 
                                truncation=True)
    input_ids.append(torch.tensor(input_encodings['input_ids']))
    attention_masks.append(torch.tensor(input_encodings['attention_mask']))
    v_class = dataset['train'][i]['valence']

    if v_class < q1:
        valence.append(0)
    elif v_class >= q3:
        valence.append(2)
    else:
        valence.append(1)
    id = dataset['train'][i]['id']
    sp_id.append(id)
    choi_path = os.path.join('D:/311511053/muse_music4all/choi_representaion',id+'.npy')
    choi = np.load(choi_path)
    choi = choi.flatten()
    choi_rep.append(choi)
    '''choi_path = 'D:/311511053/muse_music4all/choi_representaion_all/'+id+'_'+str(1)+'.npy'
    choi = np.load(choi_path)
    for j in range(2,6):
        choi_path = 'D:/311511053/muse_music4all/choi_representaion_all/'+id+'_'+str(j)+'.npy'
        choi_tmp = np.load(choi_path)
        choi = np.hstack((choi, choi_tmp))
    choi = choi.flatten()
    choi_rep.append(choi)'''

In [None]:
choi_rep = torch.tensor(choi_rep)
valence = torch.tensor(valence)

Train convNet

In [None]:
import torch.nn.functional as F
class ConvNet(nn.Module):
    def __init__(self, num_conv_layers, nums_feat_maps, feat_scale_factor,
                 conv_sizes, pool_sizes, dropout_conv, input_shape,
                 num_nin_layers=1, conv_until=None):
        super(ConvNet, self).__init__()

        if conv_until is None:
            conv_until = num_conv_layers  # end-inclusive.
        input_shape_specified = False
        layers = []
        for conv_idx in range(num_conv_layers):
            n_feat_here = int(nums_feat_maps[conv_idx] * feat_scale_factor)
            for _ in range(num_nin_layers):
                if not input_shape_specified:
                    layers.append(nn.Conv2d(input_shape[0], n_feat_here, kernel_size=conv_sizes[conv_idx], padding=(conv_sizes[conv_idx][0] // 2, conv_sizes[conv_idx][1] // 2)))
                    input_shape_specified = True
                else:
                    layers.append(nn.Conv2d(n_feat_here, n_feat_here, kernel_size=conv_sizes[conv_idx], padding=(conv_sizes[conv_idx][0] // 2, conv_sizes[conv_idx][1] // 2)))
                #layers.append(nn.BatchNorm2d(n_feat_here))
                layers.append(nn.ELU(alpha=1.0))  # or choose your activation function

            layers.append(nn.MaxPool2d(kernel_size=pool_sizes[conv_idx]))
            if dropout_conv != 0.0:
                layers.append(nn.Dropout(dropout_conv))
            if conv_idx == conv_until:
                break
                
        self.conv_layers = nn.Sequential(*layers)
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.linear = nn.Sequential(
            nn.Linear(32,32),
            nn.ReLU(),
            nn.Linear(32,3)
        )
    def forward(self, mel):
        x = self.conv_layers(mel)
        x = self.pool(x)
        #x = F.adaptive_avg_pool2d(x, (1,1))
        #x = x.flatten()
        x = x.squeeze(2).squeeze(2)
        x = self.linear(x)
        return x

In [None]:
convNet_weight = np.load('convNet_weight_5layers.npy',allow_pickle=True)
convNet_weight

In [None]:
poolings = [(2, 4), (3, 4), (2, 5), (2, 4), (4, 4)]
args = [5,#num_conlayer
            [32, 32, 32, 32, 32],#num_feat_map
            1.0, #feat_scale_factor
            [(3, 3), (3, 3), (3, 3), (3, 3), (3, 3)], #convSizes
            poolings, #pool_sozes
            0.0, #dropout_conv
            (1,1,96,1360)]#intputshape
model = ConvNet(*args, conv_until = 5)
pytorch_state_dict = model.state_dict()
count_c = 0
count_b = 0
for i, (name, param) in enumerate(pytorch_state_dict.items()):
    # Convert weights from Keras to PyTorch format
    if 'weight' in name and 'linear' not in name :
        pytorch_state_dict[name] = torch.from_numpy(np.transpose(convNet_weight[1][0+count_c*6], (3,2,0,1)))
        count_c += 1
    elif 'bias' in name and 'linear' not in name :
        pytorch_state_dict[name] = torch.from_numpy(np.asarray(convNet_weight[1][1+count_b*6]))
        count_b += 1


In [None]:
class TrainDataset(torch.utils.data.Dataset):
    def __init__(self, id, label):
      self.id = id
      self.labels = label
    def __getitem__(self, index):
    
      mel = np.load('choi_mel/' + self.id[index] +'.npy') 
      mel = torch.tensor(mel.reshape(1, 96, 1360))
      label = self.labels[index]
      return mel, label
    def __len__(self):
      return len(self.id)


In [None]:
def train_convNet(train_dataloader, val_dataloader,fold, num_epochs = 10, validation = True, save_file = 'valence_convNet_3class', train_batch_size = 8, learning_rate = 5e-5):
    history = dict()
    train_history_loss = []
    train_history_acc = []
    val_history_loss = []
    val_history_acc = []
    poolings = [(2, 4), (3, 4), (2, 5), (2, 4), (4, 4)]
    args = [5,#num_conlayer
                [32, 32, 32, 32, 32],#num_feat_map
                1.0, #feat_scale_factor
                [(3, 3), (3, 3), (3, 3), (3, 3), (3, 3)], #convSizes
                poolings, #pool_sozes
                0.0, #dropout_conv
                (1,1,96,1360)]#intputshape
    model = ConvNet(*args, conv_until = 5)
    pytorch_state_dict = model.state_dict()
    '''for i, (name, param) in enumerate(pytorch_state_dict.items()):
        # Convert weights from Keras to PyTorch format
        if 'weight' in name and 'linear' not in name :
            pytorch_state_dict[name] = torch.from_numpy(np.transpose(convNet_weight[1][0], (3,2,0,1)))
        elif 'bias' in name and 'linear' not in name :
            pytorch_state_dict[name] = torch.from_numpy(np.asarray(convNet_weight[1][i]))'''
    pytorch_state_dict = model.state_dict()
    count_c = 0
    count_b = 0
    for i, (name, param) in enumerate(pytorch_state_dict.items()):
        # Convert weights from Keras to PyTorch format
        if 'weight' in name and 'linear' not in name :
            pytorch_state_dict[name] = torch.from_numpy(np.transpose(convNet_weight[1][0+count_c*6], (3,2,0,1)))
            count_c += 1
        elif 'bias' in name and 'linear' not in name :
            pytorch_state_dict[name] = torch.from_numpy(np.asarray(convNet_weight[1][1+count_b*6]))
            count_b += 1
    model = model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    loss_function = torch.nn.CrossEntropyLoss()
    best_val_loss = np.inf
    best_val_accuracy = 0
    for epoch in range(num_epochs):
        total_loss = 0
        train_loss = 0
        batch_id = 0
        correct = 0
        print(f"Epoch: {epoch + 1}",'training')
        for batch in train_dataloader:
            model.train()
            mel = batch[0].to(device)
            mel = mel.to(torch.float32)
            labels = batch[1].to(device)
            outputs = model(
                mel = mel
            )
            loss = loss_function(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            train_loss += loss.item()
            _,predict_label = torch.max(outputs,1)
            correct += (predict_label==labels).sum()
            print('batch:', batch_id, '/',str(len(train_dataloader)), 'loss:', loss.item(), end='\r')
            batch_id += 1
        average_loss = total_loss / len(train_dataloader)
        train_history_loss.append(average_loss)
        train_history_acc.append(correct.item()/((len(train_dataloader) - 1) * train_batch_size + len(labels)))
        print(f"Loss: {average_loss:.4f}", 'accuracy:', correct.item()/((len(train_dataloader) - 1) * train_batch_size + len(labels)))
        if validation:
            print('validation')
            model.eval()
            prediction = []
            ans = []
            val_loss = 0.0
            batch_id = 0
            correct = 0
            with torch.no_grad(): 
                for batch in val_dataloader:
                    '''mel = torch.tensor([batch['choi']]).to(device)
                    mel = mel.to(torch.float32)'''
                    mel = batch[0].to(device)
                    mel = mel.to(torch.float32)
                    #labels = batch['labels'].clone().detach().to(device)
                    labels = batch[1].to(device)
                    output = model( 
                        mel = mel
                    )
                    loss = loss_function(output, labels.to(device))
                    val_loss += loss.item()
                    _,predict_label = torch.max(output,1)
                    correct += (predict_label==labels).sum()  
                    prediction.append(predict_label.cpu().item())
                    ans.append(labels.cpu().item())
                    print('batch:', batch_id, '/',str(len(val_dataloader)), 'loss:', loss.item(), end='\r')
                    batch_id += 1
            val_loss /= len(val_dataloader)
            val_accuracy = correct.item() / len(val_dataloader)
            val_history_loss.append(val_loss)
            val_history_acc.append(val_accuracy)
            print('loss:', val_loss)
            print('accuracy:',val_accuracy)
            print('f1 score:', f1_score(ans, prediction, average='macro'))
            #if val_monitor == 'loss':
            if val_loss <= best_val_loss:
                best_val_loss = val_loss
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                torch.save(model.conv_layers, save_file + '/fold_{}/model_best_loss1'.format(fold) + '.pt')
                torch.save(model.linear, save_file + '/fold_{}/model_best_loss2'.format(fold) + '.pt')
            if val_accuracy >= best_val_accuracy:
                best_val_accuracy = val_accuracy
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                #model.save_pretrained(save_file + '/fold_{}/model_best'.format(fold))
                torch.save(model.conv_layers, save_file + '/fold_{}/model_best_acc1'.format(fold) + '.pt')
                torch.save(model.linear, save_file + '/fold_{}/model_best_acc2'.format(fold) + '.pt')
        else:
            if not os.path.isdir(save_file):
                    os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
        if epoch%5 == 4:
            if not os.path.isdir(save_file):
                os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
            torch.save(model.conv_layers, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_1.pt')
            torch.save(model.linear, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_2.pt')
        if epoch < 30:
            if not os.path.isdir(save_file):
                os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
            torch.save(model.conv_layers, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_1.pt')
            torch.save(model.linear, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_2.pt')
    if not os.path.isdir(save_file):
        os.mkdir(save_file)
    if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
        os.mkdir(save_file + '/fold_{}'.format(fold))
    torch.save(model.conv_layers, save_file + '/fold_{}/model_final1'.format(fold) + '.pt')
    torch.save(model.linear, save_file + '/fold_{}/model_final2'.format(fold) + '.pt')
    history['train_loss'] = train_history_loss
    history['train_accuracy'] = train_history_acc
    history['val_loss'] = val_history_loss
    history['val_accuracy'] = val_history_acc
    return history
def test_convNet(test_dataloader,fold, load_best = 'loss', load_file = 'valence_convNet_3class'):
    print('testing')
    poolings = [(2, 4), (3, 4), (2, 5), (2, 4), (4, 4)]
    args = [5,#num_conlayer
                [32, 32, 32, 32, 32],#num_feat_map
                1.0, #feat_scale_factor
                [(3, 3), (3, 3), (3, 3), (3, 3), (3, 3)], #convSizes
                poolings, #pool_sozes
                0.0, #dropout_conv
                (1,1,96,1360)]#intputshape
    if load_best == 'loss':
        model = ConvNet(*args, conv_until = 0).to(device)
        model.conv_layers = torch.load(load_file + "/fold_{}/model_best_loss1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/model_best_loss2".format(fold)+ '.pt')
    elif load_best == 'accuracy':
        model = ConvNet(*args, conv_until = 0).to(device)
        model.conv_layers = torch.load(load_file + "/fold_{}/model_best_acc1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/model_best_acc2".format(fold)+ '.pt')
    else:
        #model = BertForSequenceClassification.from_pretrained(load_file + "/fold_{}/model5".format(fold)).to(device)
        model = ConvNet(*args, conv_until = 0).to(device)
        model.conv_layers = torch.load(load_file + "/fold_{}/model_final1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/model_final2".format(fold)+ '.pt')
    model.eval()
    prediction = []
    ans = []
    batch_id = 0
    correct = 0
    with torch.no_grad(): 
        for batch in test_dataloader:
            mel = batch[0].to(device)
            mel = mel.to(torch.float32)
            #labels = torch.tensor(batch['labels'])
            labels = batch[1].to(device)
            output = model(
                mel = mel
            )
            _,predict_label = torch.max(output,1)
            correct += (predict_label==labels.to(device)).sum()
            prediction.append(predict_label.cpu().item())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')
    accuracy = correct.item() / len(test_dataloader)
    f1 = f1_score(ans, prediction, average=None)
    prec_recall = precision_recall_fscore_support(ans, prediction)
    conf_m = confusion_matrix(prediction,ans)
    return accuracy, f1, prec_recall,  conf_m

In [None]:
test_accuracy = []
conf_m = []
#test_order_a = []
#pred_order_a = []
confusion_matrixs = []
test_macro_f1 = []
test_f1_0 = []
test_f1_1 = []
test_f1_2 = []
test_precision_0 = []
test_recall_0 = []
test_precision_1 = []
test_recall_1 = []
test_precision_2 = []
test_recall_2 = []
history = []
batch_size = 32
for i, (train, test) in tqdm(enumerate(kfold.split(sp_id, valence))):
    
    print('Fold {}:'.format(i+1))
    #train_id = StandardScaler().fit_transform(id[train])
    '''train_id = choi_rep[train]
    scaler = StandardScaler().fit(train_id)
    train_id = scaler.transform(train_id)'''
    train_id, val_id, train_label, val_label = train_test_split(np.array(sp_id, dtype="object")[train], valence[train], test_size=0.1, random_state=42)
    
    train_dataset = TrainDataset(id=train_id, label = train_label)
    train_dataloader =  DataLoader(train_dataset,batch_size=batch_size,shuffle=False)
    
    val_dataset = TrainDataset(id=val_id, label = val_label)
    val_dataloader =  DataLoader(val_dataset,batch_size=1,shuffle=False)
    #test_choi = StandardScaler().fit_transform(id[test])
    #test_id = scaler.transform(id[test])
    test_id = np.array(sp_id, dtype="object")[test]
    test_dataset = TrainDataset(id=test_id, label = valence[test])
    test_dataloader =  DataLoader(test_dataset,batch_size=1,shuffle=False)
    
    his = train_convNet(train_dataloader, val_dataloader,i+1,num_epochs=20, train_batch_size = batch_size, learning_rate=5e-4)
    history.append(his)
    accuracy, f1, prec_recall, confusion_m = test_convNet(test_dataloader,i+1)
    print("test accuracy:",accuracy,"f1 score:",f1)
    test_accuracy.append(accuracy)
    test_f1_0.append(f1[0])
    test_f1_1.append(f1[1])
    test_f1_2.append(f1[2])
    test_precision_0.append(prec_recall[0][0])
    test_precision_1.append(prec_recall[0][1])
    test_precision_2.append(prec_recall[0][2])
    test_recall_0.append(prec_recall[1][0])
    test_recall_1.append(prec_recall[1][1])
    test_recall_2.append(prec_recall[1][2])
    confusion_matrixs.append(confusion_m)

In [None]:
total_conf = confusion_matrixs[0]*0
for i in range(0,len(test_accuracy)):
    print("Fold {}:".format(i+1))
    print("Confusion Matrix:")
    print(confusion_matrixs[i])
    total_conf += confusion_matrixs[i]
    print('accuracy:', test_accuracy[i])
    print('recall 0:',test_recall_0[i],',recall 1:',test_recall_1[i],',recall 2:',test_recall_2[i])
    print('precision 0:',test_precision_0[i],',precision 1:',test_precision_1[i],',precision 2:',test_precision_2[i])
    print('F1 score 0:', test_f1_0[i], 'F1 score 1:', test_f1_1[i],',f1 2:',test_f1_2[i])
print("Total Confusion Matrix:\n",total_conf)
print("Avg accuracy:",np.array(test_accuracy).mean())
print("Avg recall 0:",np.array(test_recall_0).mean(),",Avg recall 1:",np.array(test_recall_1).mean(),",Avg recall 2:",np.array(test_recall_2).mean())
print("Avg precision 0:",np.array(test_precision_0).mean(),",Avg precision 1:",np.array(test_precision_1).mean(),",Avg precision 2:",np.array(test_precision_2).mean())
print("Avg f1 score 0:",np.array(test_f1_0).mean(),",Avg f1 score 1:",np.array(test_f1_1).mean(),",Avg f1 score 2:",np.array(test_f1_2).mean())
#print("Avg f1 score:",np.array(test_f1).mean())

In [None]:
train_loss = []
val_loss = []
t_0 = 0.98
t_rate = 0.7
v_0 = 1.12
v_rate = 0.7
for i in range(20):
    train_loss.append(t_0)
    val_loss.append(v_0)
    t_0 -= t_rate*0.5 
    v_0 -= v_rate*0.5 
    t_rate = t_rate * 0.9
    v_rate = v_rate * 0.9

In [None]:
fig = plt.figure(figsize = (20,3))
plt.subplot(151)
plt.plot(history[0]['train_loss'])
plt.plot(history[0]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(152)
plt.plot(history[1]['train_loss'])
plt.plot(history[1]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(153)
plt.plot(history[2]['train_loss'])
plt.plot(history[2]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(154)
plt.plot(history[3]['train_loss'])
plt.plot(history[3]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(155)
plt.plot(history[4]['train_loss'])
plt.plot(history[4]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
fig = plt.figure(figsize = (20,3))
plt.subplot(151)
plt.plot(history[0]['train_accuracy'])
plt.plot(history[0]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(152)
plt.plot(history[1]['train_accuracy'])
plt.plot(history[1]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(153)
plt.plot(history[2]['train_accuracy'])
plt.plot(history[2]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(154)
plt.plot(history[3]['train_accuracy'])
plt.plot(history[3]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(155)
plt.plot(history[4]['train_accuracy'])
plt.plot(history[4]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
class get_intermediate_output(nn.Module):
    def __init__(self, num_conv_layers, nums_feat_maps, feat_scale_factor,
                 conv_sizes, pool_sizes, dropout_conv, input_shape,
                 num_nin_layers=1, conv_until=None):
        super(get_intermediate_output, self).__init__()

        if conv_until is None:
            conv_until = num_conv_layers  # end-inclusive.
        input_shape_specified = False
        layers = []
        for conv_idx in range(num_conv_layers):
            n_feat_here = int(nums_feat_maps[conv_idx] * feat_scale_factor)
            for _ in range(num_nin_layers):
                if not input_shape_specified:
                    layers.append(nn.Conv2d(input_shape[0], n_feat_here, kernel_size=conv_sizes[conv_idx], padding=(conv_sizes[conv_idx][0] // 2, conv_sizes[conv_idx][1] // 2)))
                    input_shape_specified = True
                else:
                    layers.append(nn.Conv2d(n_feat_here, n_feat_here, kernel_size=conv_sizes[conv_idx], padding=(conv_sizes[conv_idx][0] // 2, conv_sizes[conv_idx][1] // 2)))
                #layers.append(nn.BatchNorm2d(n_feat_here))
                layers.append(nn.ELU(alpha=1.0))  # or choose your activation function

            layers.append(nn.MaxPool2d(kernel_size=pool_sizes[conv_idx]))
            if dropout_conv != 0.0:
                layers.append(nn.Dropout(dropout_conv))
            if conv_idx == conv_until:
                break
                
        self.conv_layers = nn.Sequential(*layers)
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.linear = nn.Sequential(
            nn.Linear(32,32),
            nn.ReLU(),
            nn.Linear(32,3)
        )
    def forward(self, mel):
        x = self.conv_layers(mel)
        x = self.pool(x)
        #x = F.adaptive_avg_pool2d(x, (1,1))
        #x = x.flatten()
        x = x.squeeze(2).squeeze(2)
        x = self.linear[0](x)
        return x

In [None]:
def intermediate_output(test_dataloader,fold, load_best = 'loss', load_file = 'valence_convNet_3class'):
    print('testing')
    poolings = [(2, 4), (3, 4), (2, 5), (2, 4), (4, 4)]
    args = [5,#num_conlayer
                [32, 32, 32, 32, 32],#num_feat_map
                1.0, #feat_scale_factor
                [(3, 3), (3, 3), (3, 3), (3, 3), (3, 3)], #convSizes
                poolings, #pool_sozes
                0.0, #dropout_conv
                (1,1,96,1360)]#intputshape
    if load_best == 'loss':
        model = get_intermediate_output(*args, conv_until = 0).to(device)
        model.conv_layers = torch.load(load_file + "/fold_{}/model_best_loss1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/model_best_loss2".format(fold)+ '.pt')
    elif load_best == 'accuracy':
        model = get_intermediate_output(*args, conv_until = 0).to(device)
        model.conv_layers = torch.load(load_file + "/fold_{}/model_best_acc1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/model_best_acc2".format(fold)+ '.pt')
    else:
        #model = BertForSequenceClassification.from_pretrained(load_file + "/fold_{}/model5".format(fold)).to(device)
        model = get_intermediate_output(*args, conv_until = 0).to(device)
        model.conv_layers = torch.load(load_file + "/fold_{}/model_final1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/model_final2".format(fold)+ '.pt')
    model.eval()
    layer_output = []
    ans = []
    batch_id = 0
    correct = 0
    with torch.no_grad(): 
        for batch in test_dataloader:
            mel = batch[0].to(device)
            mel = mel.to(torch.float32)
            #labels = torch.tensor(batch['labels'])
            labels = batch[1].to(device)
            output = model(
                mel = mel
            )
            layer_output.append(output[0].cpu())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')

    return layer_output, ans

In [None]:
lyr_conv_tsne = TSNE(n_components=2, init='random', random_state=5, verbose=1).fit_transform(nplayer_output)

In [None]:
df = pd.DataFrame()
df["y"] = ans[0:1000]
df["comp-1"] = lyr_conv_tsne[:,0]
df["comp-2"] = lyr_conv_tsne[:,1]
markers = {"0": "s", "1": "X",  "2": "O"}
sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 3),
                data=df).set(title="convNet T-SNE projection") 

Train Choi

In [None]:
class TrainDataset(torch.utils.data.Dataset):
    def __init__(self, choi_rep, label):
      self.choi_rep = choi_rep
      self.labels = label
    def __getitem__(self, index):
    
      coala = self.choi_rep[index]    
      label = self.labels[index]
      return coala, label
    def __len__(self):
      return len(self.choi_rep)


In [None]:
class ChoiModel(nn.Module):
    
    def __init__(self):
        super(ChoiModel, self).__init__()

        self.linear1 = nn.Sequential(
            nn.Linear(32,32),
            nn.ReLU()
        )
        self.dropout = nn.Dropout(0.5)

        self.linear2 = nn.Linear(32,3)
    def forward(self, choi_rep):     
        outputs = self.linear1(choi_rep)
        outputs = self.dropout(outputs)
        outputs = self.linear2(outputs)
        return outputs


In [None]:
def train_choi(train_dataloader, val_dataloader,fold, num_epochs = 10, validation = True, save_file = 'valence_choi', train_batch_size = 32, learning_rate = 5e-5):
    history = dict()
    train_history_loss = []
    train_history_acc = []
    val_history_loss = []
    val_history_acc = []
    model = ChoiModel()
    '''for name, module in model.linear1.named_modules():
        if isinstance(module, nn.Linear):
            module.weight_regularizer = nn.L1L2(l1=1e-5, l2=1e-4)  # Equivalent to kernel_regularizer in Keras
            module.bias_regularizer = nn.L2(1e-4)  # Equivalent to bias_regularizer in Keras
            module.activity_regularizer = nn.L2(1e-5)  # Equivalent to activity_regularizer in Keras'''
    model = model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    loss_function = torch.nn.CrossEntropyLoss()
    best_val_loss = np.inf
    best_val_accuracy = 0
    for epoch in range(num_epochs):
        total_loss = 0
        train_loss = 0
        batch_id = 0
        correct = 0
        print(f"Epoch: {epoch + 1}",'training')
        for batch in train_dataloader:
            model.train()
            choi_rep = batch[0].to(device)
            choi_rep = choi_rep.to(torch.float32)
  
            labels = batch[1].to(device)
            outputs = model(
                choi_rep = choi_rep
            )
            loss = loss_function(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            train_loss += loss.item()
            _,predict_label = torch.max(outputs,1)
            correct += (predict_label==labels).sum()
            print('batch:', batch_id, '/',str(len(train_dataloader)), 'loss:', loss.item(), end='\r')
            batch_id += 1
        average_loss = total_loss / len(train_dataloader)
        train_history_loss.append(average_loss)
        train_history_acc.append(correct.item()/((len(train_dataloader) - 1) * train_batch_size + len(labels)))
        print(f"Loss: {average_loss:.4f}", 'accuracy:', correct.item()/((len(train_dataloader) - 1) * train_batch_size + len(labels)))
        if validation:
            print('validation')
            model.eval()
            prediction = []
            ans = []
            val_loss = 0.0
            batch_id = 0
            correct = 0
            with torch.no_grad(): 
                for batch in val_dataloader:
                    '''choi_rep = torch.tensor([batch['choi']]).to(device)
                    choi_rep = choi_rep.to(torch.float32)'''
                    choi_rep = batch[0].to(device)
                    choi_rep = choi_rep.to(torch.float32)
                    #labels = batch['labels'].clone().detach().to(device)
                    labels = batch[1].to(device)
                    output = model( 
                        choi_rep = choi_rep
                    )
                    loss = loss_function(output, labels.to(device))
                    val_loss += loss.item()
                    _,predict_label = torch.max(output,1)
                    correct += (predict_label==labels).sum()  
                    prediction.append(predict_label.cpu().item())
                    ans.append(labels.cpu().item())
                    print('batch:', batch_id, '/',str(len(val_dataloader)), 'loss:', loss.item(), end='\r')
                    batch_id += 1
            val_loss /= len(val_dataloader)
            val_accuracy = correct.item() / len(val_dataloader)
            val_history_loss.append(val_loss)
            val_history_acc.append(val_accuracy)
            print('loss:', val_loss)
            print('accuracy:',val_accuracy)
            print('f1 score:', f1_score(ans, prediction, average='macro'))
            #if val_monitor == 'loss':
            if val_loss <= best_val_loss:
                best_val_loss = val_loss
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                torch.save(model.linear1, save_file + '/fold_{}/model_best_loss1'.format(fold) + '.pt')
                torch.save(model.linear2, save_file + '/fold_{}/model_best_loss2'.format(fold) + '.pt')
            if val_accuracy >= best_val_accuracy:
                best_val_accuracy = val_accuracy
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                #model.save_pretrained(save_file + '/fold_{}/model_best'.format(fold))
                torch.save(model.linear1, save_file + '/fold_{}/model_best_acc1'.format(fold) + '.pt')
                torch.save(model.linear2, save_file + '/fold_{}/model_best_acc2'.format(fold) + '.pt')
        else:
            if not os.path.isdir(save_file):
                    os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
        if epoch%5 == 4:
            if not os.path.isdir(save_file):
                os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
            torch.save(model.linear1, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_1.pt')
            torch.save(model.linear2, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_2.pt')
        if epoch < 10:
            if not os.path.isdir(save_file):
                os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
            torch.save(model.linear1, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_1.pt')
            torch.save(model.linear2, save_file + '/fold_{}/model_epoch_'.format(fold)+str(epoch+1)+'_2.pt')
    if not os.path.isdir(save_file):
        os.mkdir(save_file)
    if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
        os.mkdir(save_file + '/fold_{}'.format(fold))
    torch.save(model.linear1, save_file + '/fold_{}/model_final1'.format(fold) + '.pt')
    torch.save(model.linear2, save_file + '/fold_{}/model_final2'.format(fold) + '.pt')
    history['train_loss'] = train_history_loss
    history['train_accuracy'] = train_history_acc
    history['val_loss'] = val_history_loss
    history['val_accuracy'] = val_history_acc
    return history
def test_choi(test_dataloader,fold, load_best = 'loss', load_file = 'valence_choi'):
    print('testing')
    if load_best == 'loss':
        model = ChoiModel().to(device)
        model.linear1 = torch.load(load_file + "/fold_{}/model_best_loss1".format(fold)+ '.pt')
        model.linear2 = torch.load(load_file + "/fold_{}/model_best_loss2".format(fold)+ '.pt')
    elif load_best == 'accuracy':
        model = ChoiModel().to(device)
        model.linear1 = torch.load(load_file + "/fold_{}/model_best_acc1".format(fold)+ '.pt')
        model.linear2 = torch.load(load_file + "/fold_{}/model_best_acc2".format(fold)+ '.pt')
    else:
        #model = BertForSequenceClassification.from_pretrained(load_file + "/fold_{}/model5".format(fold)).to(device)
        model = ChoiModel().to(device)
        '''model.linear1 = torch.load(load_file + "/fold_{}/model_epoch_70_1".format(fold)+ '.pt')
        model.linear2 = torch.load(load_file + "/fold_{}/model_epoch_70_2".format(fold)+ '.pt')'''
        model.linear1 = torch.load(load_file + "/fold_{}/model_final1".format(fold)+ '.pt')
        model.linear2 = torch.load(load_file + "/fold_{}/model_final2".format(fold)+ '.pt')
    model.eval()
    prediction = []
    ans = []
    batch_id = 0
    correct = 0
    with torch.no_grad(): 
        for batch in test_dataloader:
            choi_rep = batch[0].to(device)
            choi_rep = choi_rep.to(torch.float32)
            #labels = torch.tensor(batch['labels'])
            labels = batch[1].to(device)
            output = model(
                choi_rep = choi_rep
            )
            _,predict_label = torch.max(output,1)
            correct += (predict_label==labels.to(device)).sum()
            prediction.append(predict_label.cpu().item())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')
    accuracy = correct.item() / len(test_dataloader)
    f1 = f1_score(ans, prediction, average=None)
    prec_recall = precision_recall_fscore_support(ans, prediction)
    conf_m = confusion_matrix(prediction,ans)
    return accuracy, f1, prec_recall,  conf_m

In [None]:
test_accuracy = []
conf_m = []
#test_order_a = []
#pred_order_a = []
confusion_matrixs = []
test_macro_f1 = []
test_f1_0 = []
test_f1_1 = []
test_f1_2 = []
test_precision_0 = []
test_recall_0 = []
test_precision_1 = []
test_recall_1 = []
test_precision_2 = []
test_recall_2 = []
history = []
batch_size = 64
for i, (train, test) in tqdm(enumerate(kfold.split(choi_rep, valence))):
    
    print('Fold {}:'.format(i+1))
    #train_chois = StandardScaler().fit_transform(choi_rep[train])
    train_chois = choi_rep[train]
    scaler = StandardScaler().fit(train_chois)
    train_chois = scaler.transform(train_chois)
    train_chois, val_coala, train_label, val_label = train_test_split(train_chois, valence[train], test_size=0.1, random_state=42)
    
    train_dataset = TrainDataset(choi_rep=train_chois, label = train_label)
    train_dataloader =  DataLoader(train_dataset,batch_size=batch_size,shuffle=False)
    
    val_dataset = TrainDataset(choi_rep=val_coala, label = val_label)
    val_dataloader =  DataLoader(val_dataset,batch_size=1,shuffle=False)
    #test_choi = StandardScaler().fit_transform(choi_rep[test])
    test_chois = scaler.transform(choi_rep[test])
    test_dataset = TrainDataset(choi_rep=test_chois, label = valence[test])
    test_dataloader =  DataLoader(test_dataset,batch_size=1,shuffle=False)
    
    his = train_choi(train_dataloader, val_dataloader,i+1,num_epochs=200, train_batch_size = batch_size, learning_rate=5e-4)
    history.append(his)
    accuracy, f1, prec_recall, confusion_m = test_choi(test_dataloader,i+1)
    print("test accuracy:",accuracy,"f1 score:",f1)
    test_accuracy.append(accuracy)
    test_f1_0.append(f1[0])
    test_f1_1.append(f1[1])
    test_f1_2.append(f1[2])
    test_precision_0.append(prec_recall[0][0])
    test_precision_1.append(prec_recall[0][1])
    test_precision_2.append(prec_recall[0][2])
    test_recall_0.append(prec_recall[1][0])
    test_recall_1.append(prec_recall[1][1])
    test_recall_2.append(prec_recall[1][2])
    confusion_matrixs.append(confusion_m)

In [None]:
total_conf = confusion_matrixs[0]*0
for i in range(0,len(test_accuracy)):
    print("Fold {}:".format(i+1))
    print("Confusion Matrix:")
    print(confusion_matrixs[i])
    total_conf += confusion_matrixs[i]
    print('accuracy:', test_accuracy[i])
    print('recall 0:',test_recall_0[i],',recall 1:',test_recall_1[i],',recall 2:',test_recall_2[i])
    print('precision 0:',test_precision_0[i],',precision 1:',test_precision_1[i],',precision 2:',test_precision_2[i])
    print('F1 score 0:', test_f1_0[i], 'F1 score 1:', test_f1_1[i],',f1 2:',test_f1_2[i])
print("Total Confusion Matrix:\n",total_conf)
print("Avg accuracy:",np.array(test_accuracy).mean())
print("Avg recall 0:",np.array(test_recall_0).mean(),",Avg recall 1:",np.array(test_recall_1).mean(),",Avg recall 2:",np.array(test_recall_2).mean())
print("Avg precision 0:",np.array(test_precision_0).mean(),",Avg precision 1:",np.array(test_precision_1).mean(),",Avg precision 2:",np.array(test_precision_2).mean())
print("Avg f1 score 0:",np.array(test_f1_0).mean(),",Avg f1 score 1:",np.array(test_f1_1).mean(),",Avg f1 score 2:",np.array(test_f1_2).mean())
#print("Avg f1 score:",np.array(test_f1).mean())

In [None]:
class get_intermediate_output(nn.Module):
    
    def __init__(self):
        super(get_intermediate_output, self).__init__()
        self.linear1 = nn.Sequential(
            nn.Linear(32,32),
            nn.ReLU()
        )
        self.dropout = nn.Dropout(0.5)

        self.linear2 = nn.Linear(32,3)
    def forward(self, choi_rep):     
        outputs = self.linear1(choi_rep)

        return outputs


In [None]:
def intermediate_output(test_dataloader,fold, load_best = 'loss', load_file = 'valence_choi'):
    print('testing')
    if load_best == 'loss':
        model = get_intermediate_output().to(device)
        model.linear1 = torch.load(load_file + "/fold_{}/model_best_loss1".format(fold)+ '.pt')
        #model.linear2 = torch.load(load_file + "/fold_{}/model_best_loss2".format(fold)+ '.pt')
    elif load_best == 'accuracy':
        model = get_intermediate_output().to(device)
        model.linear1 = torch.load(load_file + "/fold_{}/model_best_acc1".format(fold)+ '.pt')
        #model.linear2 = torch.load(load_file + "/fold_{}/model_best_acc2".format(fold)+ '.pt')
    else:
        #model = BertForSequenceClassification.from_pretrained(load_file + "/fold_{}/model5".format(fold)).to(device)
        model = get_intermediate_output().to(device)
        model.linear1 = torch.load(load_file + "/fold_{}/model_final1".format(fold)+ '.pt')
        #model.linear2 = torch.load(load_file + "/fold_{}/model_final2".format(fold)+ '.pt')
    model.eval()
    layer_output = []
    ans = []
    batch_id = 0
    correct = 0
    #alpha = 0.5
    with torch.no_grad(): 
        for batch in test_dataloader:
            choi_rep = batch[0].to(device)
            choi_rep = choi_rep.to(torch.float32)
            labels = batch[1].to(device)
            outputs = model(
                choi_rep = choi_rep
            )
            layer_output.append(outputs[0].cpu())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')
    
    return layer_output, ans

In [None]:

for i, (train, test) in tqdm(enumerate(kfold.split(input_ids, choi_rep, valence))):
    
    print('Fold {}:'.format(i+1))
    if i != 4:
        continue
    #train_chois = StandardScaler().fit_transform(choi_rep[train])
    scaler = StandardScaler().fit(choi_rep[train])
    train_chois = scaler.transform(choi_rep[train])
    train_chois, val_coala, train_label, val_label = train_test_split(train_chois,  valence[train], test_size=0.1, random_state=42)

    test_chois = scaler.transform(choi_rep[test])
    test_dataset = TrainDataset(choi_rep=test_chois, label = valence[test])
    test_dataloader =  DataLoader(test_dataset,batch_size=1,shuffle=False)

    layer_output, ans = intermediate_output(test_dataloader,i+1)


In [None]:
nplayer_output = [t.numpy() for t in layer_output[0:1000]]
nplayer_output = np.asarray(nplayer_output)
nplayer_output.shape

In [None]:
from sklearn.manifold import TSNE
import seaborn as sns

In [None]:
lyr_conv_tsne = TSNE(n_components=2, init='random', random_state=5, verbose=1).fit_transform(nplayer_output)

In [None]:
df = pd.DataFrame()
df["y"] = ans[0:1000]
df["comp-1"] = lyr_conv_tsne[:,0]
df["comp-2"] = lyr_conv_tsne[:,1]

sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 3),
                data=df).set(title="T-SNE projection") 


In [None]:
fig = plt.figure(figsize = (20,3))
plt.subplot(151)
plt.plot(history[0]['train_loss'])
plt.plot(history[0]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(152)
plt.plot(history[1]['train_loss'])
plt.plot(history[1]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(153)
plt.plot(history[2]['train_loss'])
plt.plot(history[2]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(154)
plt.plot(history[3]['train_loss'])
plt.plot(history[3]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(155)
plt.plot(history[4]['train_loss'])
plt.plot(history[4]['val_loss'])
plt.title('loss history')
plt.ylabel('loss')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
fig = plt.figure(figsize = (20,3))
plt.subplot(151)
plt.plot(history[0]['train_accuracy'])
plt.plot(history[0]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(152)
plt.plot(history[1]['train_accuracy'])
plt.plot(history[1]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(153)
plt.plot(history[2]['train_accuracy'])
plt.plot(history[2]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(154)
plt.plot(history[3]['train_accuracy'])
plt.plot(history[3]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.subplot(155)
plt.plot(history[4]['train_accuracy'])
plt.plot(history[4]['val_accuracy'])
plt.title('accuracy history')
plt.ylabel('accuracy')
plt.xlabel('epochs')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
for i in range(5):
    print("fold {}".format(i+1))
    print("max_loss:",np.max(history[i]['val_loss']),"min_loss:", np.min(history[i]['val_loss']), "loss_diff:",np.max(history[i]['val_loss'])-np.min(history[i]['val_loss']))

In [None]:
for i in range(5):
    print("fold {}".format(i+1))
    print(np.argmin(history[i]['val_loss']))

In [None]:
#choi_start_ep = [135, 140, 140, 135, 135]
choi_start_ep = [185, 190, 190, 190, 185]

In [None]:
for i in range(5):
    print("fold {}".format(i+1))
    print(history[i]['val_loss'][choi_start_ep[i]], np.min(history[i]['val_loss']))

In [None]:
for i in range(5):
    print("fold {}".format(i+1))
    print(np.max(history[i]['val_loss']) - 0.7 * (np.max(history[i]['val_loss'])-np.min(history[i]['val_loss'])))

In [None]:
history[0]['val_loss'][35], history[1]['val_loss'][30], history[2]['val_loss'][35] ,history[3]['val_loss'][30], history[4]['val_loss'][35]

In [None]:
for i in range(5):
    print("fold {}".format(i+1))
    print("max_accuracy:",np.max(history[i]['val_accuracy']),"min_accuracy:", np.min(history[i]['val_accuracy']), "accuracy_diff:",np.max(history[i]['val_accuracy'])-np.min(history[i]['val_accuracy']))

Train Lyrics

In [None]:
class PosModel(nn.Module):
    def __init__(self,pretrain_weight):
        super(PosModel, self).__init__()
        
        self.base_model = AutoModelForSequenceClassification.from_pretrained(pretrain_weight)
        self.dropout = nn.Dropout(0.5)
        self.linear = nn.Linear(28, 3) # output features from bert is 768 and 2 is ur number of labels
        self.sigmoid = nn.Sigmoid()
    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids, attention_mask=attention_mask)
        # You write you new head here
        outputs = self.dropout(outputs[0])
        outputs = self.linear(outputs)
        
        return outputs


In [None]:
model = PosModel('SamLowe/roberta-base-go_emotions')

In [None]:
class TrainDataset(torch.utils.data.Dataset):
    def __init__(self, input_ids, attention_masks, label):
      self.input_ids = input_ids
      self.attention_masks = attention_masks
      self.labels = label
    def __getitem__(self, index):
      input_id = self.input_ids[index]
      attention_mask = self.attention_masks[index]
      label = self.labels[index]
      return input_id, attention_mask, label
    def __len__(self):
      return len(self.input_ids)


In [None]:
def train_lyrics(train_dataloader, val_dataloader,fold, num_epochs = 10, validation = True, save_file = 'valence_lyrics_3class', learning_rate = 5e-5):
    model = PosModel('SamLowe/roberta-base-go_emotions')
    accelerator = Accelerator()
    model  = accelerator.prepare(model)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    loss_function = torch.nn.CrossEntropyLoss()
    best_val_loss = np.inf
    best_val_accuracy = 0
    for epoch in range(num_epochs):
        #model.train()
        total_loss = 0
        train_loss = 0
        batch_id = 0
        correct = 0
        print(f"Epoch: {epoch + 1}",'training')
        for batch in train_dataloader:
            model.train()
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            #print(input_ids.dtype,attention_mask.dtype, coala_rep.dtype)
            labels = batch[2].to(device)
            #print(labels)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            loss = loss_function(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            train_loss += loss.item()
            #print(outputs.logits)
            _,predict_label = torch.max(outputs,1)
            correct += (predict_label==labels).sum()
            print('batch:', batch_id, '/',str(len(train_dataloader)), 'loss:', loss.item(), end='\r')
            batch_id += 1
            
        average_loss = total_loss / len(train_dataloader)
        print(f"Loss: {average_loss:.4f}", 'accuracy:', correct.item()/len(train_dataloader))
        #model.eval()
        if validation:
            print('validation')
            model.eval()
            prediction = []
            ans = []
            val_loss = 0.0
            batch_id = 0
            correct = 0
            #loss_function = torch.nn.CrossEntropyLoss()
            with torch.no_grad(): 
                for batch in val_dataloader:
                    input_ids =  batch[0].to(device)
                    attention_mask =  batch[1].to(device)
                    labels = batch[2].to(device)
                    output = model(
                        input_ids=input_ids,
                        attention_mask=attention_mask,
                    )
                    loss = loss_function(output, labels.to(device))
                    val_loss += loss.item()
                    _,predict_label = torch.max(output,1)
                    correct += (predict_label==labels).sum()
                    prediction.append(predict_label.cpu().item())
                    ans.append(labels.cpu().item())
                    print('batch:', batch_id, '/',str(len(val_dataloader)), 'loss:', loss.item(), end='\r')
                    batch_id += 1
            val_loss /= len(val_dataloader)
            val_accuracy = correct.item() / len(val_dataloader)
            print('loss:', val_loss)
            print('accuracy:',val_accuracy)
            print('f1 score:', f1_score(ans, prediction, average='macro'))
            #if val_monitor == 'loss':
            if val_loss <= best_val_loss:
                best_val_loss = val_loss
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                model.base_model.save_pretrained(save_file + '/fold_{}/model_best_loss'.format(fold))
                torch.save(model.linear, save_file + '/fold_{}/model_best_loss1'.format(fold) + '.pt')
                tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer_best_loss'.format(fold))
            #else:
            if val_accuracy >= best_val_accuracy:
                best_val_accuracy = val_accuracy
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                #model.save_pretrained(save_file + '/fold_{}/model_best'.format(fold))
                model.base_model.save_pretrained(save_file + '/fold_{}/model_best_acc'.format(fold))
                torch.save(model.linear, save_file + '/fold_{}/model_best_acc1'.format(fold) + '.pt')
                tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer_best_acc'.format(fold))
        else:
            if not os.path.isdir(save_file):
                    os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
            model.save_pretrained(save_file + '/fold_{}/model'.format(fold) + str(epoch+1))
            tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer'.format(fold) + str(epoch+1))
    if not os.path.isdir(save_file):
        os.mkdir(save_file)
    if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
        os.mkdir(save_file + '/fold_{}'.format(fold))
    #model.save_pretrained(save_file + '/fold_{}/model_final'.format(fold))
    model.base_model.save_pretrained(save_file + '/fold_{}/model_final'.format(fold))
    torch.save(model.linear, save_file + '/fold_{}/model_final1'.format(fold) + '.pt')
    tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer_final'.format(fold))
def test_lyrics(test_dataloader,fold, load_best = 'loss', load_file = 'valence_lyrics_3class'):
    print('testing')
    if load_best == 'loss':
        model = PosModel(load_file + "/fold_{}/model_best_loss".format(fold)).to(device)
        model.linear = torch.load(load_file + "/fold_{}/model_best_loss1".format(fold)+ '.pt')
    elif load_best == 'accuracy':
        model = PosModel(load_file + "/fold_{}/model_best_acc".format(fold)).to(device)
        model.linear = torch.load(load_file + "/fold_{}/model_best_acc1".format(fold)+ '.pt')
    else:
        #model = BertForSequenceClassification.from_pretrained(load_file + "/fold_{}/model5".format(fold)).to(device)
        model = PosModel(load_file + "/fold_{}/model_final".format(fold)).to(device)
        model.linear = torch.load(load_file + "/fold_{}/model_final1".format(fold)+ '.pt')
    model.eval()
    prediction = []
    ans = []
    batch_id = 0
    correct = 0
    with torch.no_grad(): 
        for batch in test_dataloader:
            input_ids =  batch[0].to(device)
            attention_mask =  batch[1].to(device)
            labels = batch[2].to(device)
            output = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            #outputs = model(**data)
            _,predict_label = torch.max(output,1)
            correct += (predict_label==labels.to(device)).sum()
            #summary_text = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
            #print(summary_text)
            prediction.append(predict_label.cpu().item())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')
    accuracy = correct.item() / len(test_dataloader)
    f1 = f1_score(ans, prediction, average=None)
    prec_recall = precision_recall_fscore_support(ans, prediction)
    conf_m = confusion_matrix(prediction,ans)
    return accuracy, f1, prec_recall,  conf_m

In [None]:
test_accuracy = []
conf_m = []
#test_order_a = []
#pred_order_a = []
confusion_matrixs = []
test_macro_f1 = []
test_f1_0 = []
test_f1_1 = []
test_f1_2 = []
test_precision_0 = []
test_recall_0 = []
test_precision_1 = []
test_recall_1 = []
test_precision_2 = []
test_recall_2 = []
#batch_size = 32
for i, (train, test) in tqdm(enumerate(kfold.split(np.array(input_ids,dtype="object"), valence))):
    
    print('Fold {}:'.format(i+1))
    train_coalas = StandardScaler().fit_transform(choi_rep[train])
    train_ids, val_ids, train_mask, val_mask, train_label, val_label = train_test_split(np.array(input_ids,dtype="object")[train], np.array(attention_masks,dtype="object")[train], valence[train], test_size=0.1, random_state=42)
    
    train_dataset = TrainDataset(input_ids = train_ids, attention_masks = train_mask, label = train_label)
    train_dataloader =  DataLoader(train_dataset,batch_size=1,shuffle=False)
    
    val_dataset = TrainDataset(input_ids = val_ids, attention_masks = val_mask, label = val_label)
    val_dataloader =  DataLoader(val_dataset,batch_size=1,shuffle=False)
    test_coalas = StandardScaler().fit_transform(choi_rep[test])
    test_dataset = TrainDataset(input_ids = np.array(input_ids,dtype="object")[test], attention_masks = np.array(attention_masks,dtype="object")[test], label = valence[test])
    test_dataloader =  DataLoader(test_dataset,batch_size=1,shuffle=False)
    
    train_lyrics(train_dataloader, val_dataloader,i+1,num_epochs=5, learning_rate=5e-7)
    
    accuracy, f1, prec_recall, confusion_m = test_lyrics(test_dataloader,i+1)
    print("test accuracy:",accuracy,"f1 score:",f1)
    test_accuracy.append(accuracy)
    test_f1_0.append(f1[0])
    test_f1_1.append(f1[1])
    test_f1_2.append(f1[2])
    test_precision_0.append(prec_recall[0][0])
    test_precision_1.append(prec_recall[0][1])
    test_precision_2.append(prec_recall[0][2])
    test_recall_0.append(prec_recall[1][0])
    test_recall_1.append(prec_recall[1][1])
    test_recall_2.append(prec_recall[1][2])
    confusion_matrixs.append(confusion_m)

In [None]:
total_conf = confusion_matrixs[0]*0
for i in range(0,len(test_accuracy)):
    print("Fold {}:".format(i+1))
    print("Confusion Matrix:")
    print(confusion_matrixs[i])
    total_conf += confusion_matrixs[i]
    print('accuracy:', test_accuracy[i])
    print('recall 0:',test_recall_0[i],',recall 1:',test_recall_1[i],',recall 2:',test_recall_2[i])
    print('precision 0:',test_precision_0[i],',precision 1:',test_precision_1[i],',precision 2:',test_precision_2[i])
    print('F1 score 0:', test_f1_0[i], 'F1 score 1:', test_f1_1[i],',f1 2:',test_f1_2[i])
print("Total Confusion Matrix:\n",total_conf)
print("Avg accuracy:",np.array(test_accuracy).mean())
print("Avg recall 0:",np.array(test_recall_0).mean(),",Avg recall 1:",np.array(test_recall_1).mean(),",Avg recall 2:",np.array(test_recall_2).mean())
print("Avg precision 0:",np.array(test_precision_0).mean(),",Avg precision 1:",np.array(test_precision_1).mean(),",Avg precision 2:",np.array(test_precision_2).mean())
print("Avg f1 score 0:",np.array(test_f1_0).mean(),",Avg f1 score 1:",np.array(test_f1_1).mean(),",Avg f1 score 2:",np.array(test_f1_2).mean())
#print("Avg f1 score:",np.array(test_f1).mean())

In [None]:
class get_intermediate_output(nn.Module):
    def __init__(self,pretrain_weight):
        super(get_intermediate_output, self).__init__()
        
        self.base_model = AutoModelForSequenceClassification.from_pretrained(pretrain_weight)
        self.dropout = nn.Dropout(0.5)
        self.linear = nn.Linear(28, 3) # output features from bert is 768 and 2 is ur number of labels
        self.sigmoid = nn.Sigmoid()
    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids, attention_mask=attention_mask)
        # You write you new head here
        outputs = self.dropout(outputs[0])
        #outputs = self.linear(outputs)
        
        return outputs


In [None]:
def intermediate_output(test_dataloader,fold, load_best = 'loss', load_file = 'valence_lyrics_3class'):
    print('testing')
    if load_best == 'loss':
        model = get_intermediate_output(load_file + "/fold_{}/model_best_loss".format(fold)).to(device)
        model.linear = torch.load(load_file + "/fold_{}/model_best_loss1".format(fold)+ '.pt')
    elif load_best == 'accuracy':
        model = get_intermediate_output(load_file + "/fold_{}/model_best_acc".format(fold)).to(device)
        model.linear = torch.load(load_file + "/fold_{}/model_best_acc1".format(fold)+ '.pt')
    else:
        #model = BertForSequenceClassification.from_pretrained(load_file + "/fold_{}/model5".format(fold)).to(device)
        model = get_intermediate_output(load_file + "/fold_{}/model_final".format(fold)).to(device)
        model.linear = torch.load(load_file + "/fold_{}/model_final1".format(fold)+ '.pt')
    model.eval()
    layer_output = []
    ans = []
    batch_id = 0
    correct = 0
    with torch.no_grad(): 
        for batch in test_dataloader:
            input_ids =  batch[0].to(device)
            attention_mask =  batch[1].to(device)
            labels = batch[2].to(device)
            output = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            layer_output.append(output[0].cpu())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')

    return layer_output, ans

In [None]:
nplayer_output = [t.numpy() for t in layer_output[0:1000]]
nplayer_output = np.asarray(nplayer_output)
nplayer_output.shape

In [None]:
lyr_conv_tsne = TSNE(n_components=2, init='random', random_state=5, verbose=1).fit_transform(nplayer_output)

In [None]:
df = pd.DataFrame()
df["y"] = ans[0:1000]
df["comp-1"] = lyr_conv_tsne[:,0]
df["comp-2"] = lyr_conv_tsne[:,1]

sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 3),
                data=df).set(title="BERT T-SNE projection") 


Train lyrics + convNet

In [None]:
class TrainDataset(torch.utils.data.Dataset):
    def __init__(self, input_ids, attention_masks, id, label):
      self.input_ids = input_ids
      self.attention_masks = attention_masks
      self.id = id
      self.labels = label
    def __getitem__(self, index):
      input_id = self.input_ids[index]
      attention_mask = self.attention_masks[index]
      mel = np.load('choi_mel/' + self.id[index] +'.npy') 
      mel = torch.tensor(mel.reshape(1, 96, 1360))
      label = self.labels[index]
      return input_id, attention_mask, mel, label
    def __len__(self):
      return len(self.input_ids)


In [None]:
class HybridModel(nn.Module):
    def __init__(self, model1, model2):
        super(HybridModel, self).__init__()
        self.lyric = model1
        self.convNet = model2
        self.dropout = nn.Dropout(0.5)
        self.linear = nn.Sequential(
            nn.Linear(60,32),  
            nn.ReLU(),
            nn.Linear(32,3)          
        )
        
    def forward(self, input_ids, attention_mask, mel):
        lyric_outputs = self.lyric.base_model(input_ids, attention_mask=attention_mask)
        convNet_outputs = self.convNet.conv_layers(mel)
        convNet_outputs = self.convNet.pool(convNet_outputs)
        convNet_outputs = convNet_outputs.squeeze(2).squeeze(2)  
        outputs = torch.cat((lyric_outputs[0], convNet_outputs),1)
        outputs = self.dropout(outputs)
        outputs = self.linear(outputs)                  
        return outputs


In [None]:
def train_fusion_3(train_dataloader, val_dataloader,fold, num_epochs = 10, validation = True, save_file = 'valence_fusion_convbert_3class', learning_rate_lyrics = 5e-7, learning_rate_hybrid = 5e-5, learning_rate_convNet = 5e-4, convNet_start_ep = 20):
    poolings = [(2, 4), (3, 4), (2, 5), (2, 4), (4, 4)]
    args = [5,#num_conlayer
                [32, 32, 32, 32, 32],#num_feat_map
                1.0, #feat_scale_factor
                [(3, 3), (3, 3), (3, 3), (3, 3), (3, 3)], #convSizes
                poolings, #pool_sozes
                0.0, #dropout_conv
                (1,1,96,1360)]#intputshape
    model = ConvNet(*args, conv_until = 5)
    pytorch_state_dict = model.state_dict()
    count_c = 0
    count_b = 0
    for i, (name, param) in enumerate(pytorch_state_dict.items()):
        # Convert weights from Keras to PyTorch format
        if 'weight' in name and 'linear' not in name :
            pytorch_state_dict[name] = torch.from_numpy(np.transpose(convNet_weight[1][0+count_c*6], (3,2,0,1)))
            count_c += 1
        elif 'bias' in name and 'linear' not in name :
            pytorch_state_dict[name] = torch.from_numpy(np.asarray(convNet_weight[1][1+count_b*6]))
            count_b += 1
    model = HybridModel(PosModel('SamLowe/roberta-base-go_emotions'),model).to(device)
    '''model = HybridModel(PosModel('SamLowe/roberta-base-go_emotions'),ConvNet(*args, conv_until = 5)).to(device)
    model.convNet.conv_layers = torch.load('valence_convNet/fold_{}/model_epoch_'.format(fold)+str(convNet_start_ep)+'_1.pt')'''
    lyrics_optimizer = torch.optim.AdamW(model.lyric.base_model.parameters(), lr=learning_rate_lyrics)
    convNet_optimizer = torch.optim.AdamW(model.convNet.conv_layers.parameters(), lr=learning_rate_convNet)
    hybrid_optimizer = torch.optim.AdamW(model.linear.parameters(), lr=learning_rate_hybrid)
    loss_function = torch.nn.CrossEntropyLoss()
    #num_epochs = 10  # Set the number of training epochs
    alpha = 0.5
    best_val_loss = np.inf
    best_val_accuracy = 0
    for epoch in range(num_epochs):
        #model.train()
        total_loss = 0
        train_loss = 0
        batch_id = 0
        correct = 0
        print(f"Epoch: {epoch + 1}",'training')
        for batch in train_dataloader:
            model.train()
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            mel = batch[2].to(device)
            mel = mel.to(torch.float32)
            labels = batch[3].to(device)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                mel = mel
            )
            loss = loss_function(outputs, labels)
            lyrics_optimizer.zero_grad()
            convNet_optimizer.zero_grad()
            hybrid_optimizer.zero_grad()
            loss.backward()
            lyrics_optimizer.step()
            convNet_optimizer.step()
            hybrid_optimizer.step()
            total_loss += loss.item()
            train_loss += loss.item()
            #print(outputs.logits)
            _,predict_label = torch.max(outputs,1)
            correct += (predict_label==labels).sum()
            
            print('batch:', batch_id, '/',str(len(train_dataloader)), 'loss:', loss.item(), end='\r')
            batch_id += 1
            
        average_loss = total_loss / len(train_dataloader)
        print(f"Loss: {average_loss:.4f}", 'accuracy:', correct.item()/len(train_dataloader))
        #model.eval()
        if validation:
            print('validation')
            model.eval()
            prediction = []
            ans = []
            val_loss = 0.0
            batch_id = 0
            correct = 0
            with torch.no_grad(): 
                for batch in val_dataloader:
                    input_ids =  batch[0].to(device)
                    attention_mask =  batch[1].to(device)
                    mel = batch[2].to(device)
                    mel = mel.to(torch.float32)
                    labels = batch[3].to(device)
                    outputs = model(
                        input_ids=input_ids,
                        attention_mask=attention_mask,
                        mel = mel
                    )
                    loss = loss_function(outputs, labels.to(device))

                    val_loss += loss.item()
                    _,predict_label = torch.max(outputs,1)
                    correct += (predict_label==labels).sum()
                    
                    prediction.append(predict_label.cpu().item())
                    ans.append(labels.cpu().item())
                    print('batch:', batch_id, '/',str(len(val_dataloader)), 'loss:', loss.item(), end='\r')
                    batch_id += 1
            val_loss /= len(val_dataloader)
            val_accuracy = correct.item() / len(val_dataloader)
            print('loss:', val_loss)
            print('accuracy:',val_accuracy)
            print('f1 score:', f1_score(ans, prediction, average='macro'))
            #if val_monitor == 'loss':
            if val_loss <= best_val_loss:
                best_val_loss = val_loss
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                model.lyric.base_model.save_pretrained(save_file + '/fold_{}/model_best_loss'.format(fold))
                tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer_best_loss'.format(fold))
                torch.save(model.convNet.conv_layers, save_file + '/fold_{}/convNet_best_loss1'.format(fold) + '.pt')
                torch.save(model.linear, save_file + '/fold_{}/hybrid_model_best_loss'.format(fold) + '.pt')
            #else:
            if val_accuracy >= best_val_accuracy:
                best_val_accuracy = val_accuracy
                if not os.path.isdir(save_file):
                    os.mkdir(save_file)
                if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                    os.mkdir(save_file + '/fold_{}'.format(fold))
                model.lyric.base_model.save_pretrained(save_file + '/fold_{}/model_best_acc'.format(fold))
                tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer_best_acc'.format(fold))
                torch.save(model.convNet.conv_layers, save_file + '/fold_{}/convNet_best_acc1'.format(fold) + '.pt')
                torch.save(model.linear, save_file + '/fold_{}/hybrid_model_best_acc'.format(fold) + '.pt')
        else:
            if not os.path.isdir(save_file):
                    os.mkdir(save_file)
            if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
                os.mkdir(save_file + '/fold_{}'.format(fold))
            model.lyric.save_pretrained(save_file + '/fold_{}/model'.format(fold) + str(epoch+1))
            tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer'.format(fold) + str(epoch+1))
        if not os.path.isdir(save_file):
                    os.mkdir(save_file)
        if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
            os.mkdir(save_file + '/fold_{}'.format(fold))
        model.lyric.base_model.save_pretrained(save_file + '/fold_{}/model_epoch_'.format(fold)+ str(epoch+1))
        tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer_epoch_'.format(fold)+ str(epoch+1))
        torch.save(model.convNet.conv_layers, save_file + '/fold_{}/convNet_epoch_'.format(fold)+ str(epoch+1) + '.pt')
        torch.save(model.linear, save_file + '/fold_{}/hybrid_model_epoch_'.format(fold)+ str(epoch+1) + '.pt')
    if not os.path.isdir(save_file):
        os.mkdir(save_file)
    if not os.path.isdir(save_file + '/fold_{}'.format(fold)):
        os.mkdir(save_file + '/fold_{}'.format(fold))
    model.lyric.base_model.save_pretrained(save_file + '/fold_{}/model_final'.format(fold))
    tokenizer.save_pretrained(save_file + '/fold_{}/tokenizer_final'.format(fold))
    torch.save(model.convNet.conv_layers, save_file + '/fold_{}/convNet_final1'.format(fold) + '.pt')
    torch.save(model.linear, save_file + '/fold_{}/hybrid_model_final'.format(fold) + '.pt')
    
def test_fusion_3(test_dataloader,fold, load_best = 'loss', load_file = 'valence_fusion_convbert_3class'):
    print('testing')
    poolings = [(2, 4), (3, 4), (2, 5), (2, 4), (4, 4)]
    args = [5,#num_conlayer
                [32, 32, 32, 32, 32],#num_feat_map
                1.0, #feat_scale_factor
                [(3, 3), (3, 3), (3, 3), (3, 3), (3, 3)], #convSizes
                poolings, #pool_sozes
                0.0, #dropout_conv
                (1,1,96,1360)]#intputshape
    if load_best == 'loss':
        model = HybridModel(PosModel(load_file + "/fold_{}/model_best_loss".format(fold)),ConvNet(*args, conv_until = 5)).to(device)
        model.convNet.conv_layers = torch.load(load_file + "/fold_{}/convNet_best_loss1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/hybrid_model_best_loss".format(fold)+ '.pt')
    elif load_best == 'accuracy':
        model = HybridModel(PosModel(load_file + "/fold_{}/model_best_acc".format(fold)),ConvNet(*args, conv_until = 5)).to(device)
        model.convNet.conv_layers = torch.load(load_file + "/fold_{}/convNet_best_acc1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/hybrid_model_best_acc".format(fold)+ '.pt')
    
    else:
        model = HybridModel(PosModel(load_file + "/fold_{}/model_final".format(fold)),ConvNet(*args, conv_until = 5)).to(device)
        model.convNet.conv_layers = torch.load(load_file + "/fold_{}/convNet_final1".format(fold)+ '.pt')
        model.linear = torch.load(load_file + "/fold_{}/hybrid_model_final".format(fold)+ '.pt')
    
    model.eval()
    prediction = []
    ans = []
    batch_id = 0
    correct = 0
    #alpha = 0.5
    with torch.no_grad(): 
        for batch in test_dataloader:
            input_ids =  batch[0].to(device)
            attention_mask =  batch[1].to(device)
            mel = batch[2].to(device)
            mel = mel.to(torch.float32)
            labels = batch[3].to(device)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                mel = mel
            )
            _,predict_label = torch.max(outputs,1)
            correct += (predict_label==labels.to(device)).sum()
            prediction.append(predict_label.cpu().item())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')
    accuracy = correct.item() / len(test_dataloader)
    f1 = f1_score(ans, prediction, average=None)
    prec_recall = precision_recall_fscore_support(ans, prediction)
    conf_m = confusion_matrix(prediction,ans)
    return accuracy, f1, prec_recall,  conf_m

In [None]:
convNet_start_ep = [2, 2, 2, 2, 1]

In [None]:
test_accuracy = []
conf_m = []
#test_order_a = []
#pred_order_a = []
confusion_matrixs = []
test_f1_0 = []
test_f1_1 = []
test_f1_2 = []
test_f1 = []
test_precision_0 = []
test_recall_0 = []
test_precision_1 = []
test_recall_1 = []
test_precision_2 = []
test_recall_2 = []
#batch_size = 32
for i, (train, test) in tqdm(enumerate(kfold.split(input_ids, choi_rep, valence))):
    
    print('Fold {}:'.format(i+1))
    train_id, val_id, train_ids, val_ids, train_mask, val_mask, train_label, val_label = train_test_split(np.array(sp_id, dtype="object")[train], np.array(input_ids, dtype="object")[train], np.array(attention_masks, dtype="object")[train], valence[train], test_size=0.1, random_state=42)
    
    train_dataset = TrainDataset(input_ids = train_ids, attention_masks = train_mask, id=train_id, label = train_label)
    train_dataloader =  DataLoader(train_dataset,batch_size=1,shuffle=False)
    
    val_dataset = TrainDataset(input_ids = val_ids, attention_masks = val_mask, id=val_id, label = val_label)
    val_dataloader =  DataLoader(val_dataset,batch_size=1,shuffle=False)
    test_dataset = TrainDataset(input_ids = np.array(input_ids, dtype="object")[test], attention_masks = np.array(attention_masks, dtype="object")[test], id=np.array(sp_id, dtype="object")[test], label = valence[test])
    test_dataloader =  DataLoader(test_dataset,batch_size=1,shuffle=False)
    
    train_fusion_3(train_dataloader, val_dataloader,i+1,num_epochs=10, learning_rate_lyrics=5e-7, learning_rate_convNet = 5e-5, learning_rate_hybrid = 5e-4, convNet_start_ep=convNet_start_ep[i])
    
    accuracy, f1, prec_recall, confusion_m = test_fusion_3(test_dataloader,i+1)
    print("test accuracy:",accuracy,"f1 score:",f1)
    test_accuracy.append(accuracy)
    test_f1_0.append(f1[0])
    test_f1_1.append(f1[1])
    test_f1_2.append(f1[2])
    test_precision_0.append(prec_recall[0][0])
    test_precision_1.append(prec_recall[0][1])
    test_precision_2.append(prec_recall[0][2])
    test_recall_0.append(prec_recall[1][0])
    test_recall_1.append(prec_recall[1][1])
    test_recall_2.append(prec_recall[1][2])
    confusion_matrixs.append(confusion_m)

In [None]:
total_conf = confusion_matrixs[0]*0
for i in range(0,len(test_accuracy)):
    print("Fold {}:".format(i+1))
    print("Confusion Matrix:")
    print(confusion_matrixs[i])
    total_conf += confusion_matrixs[i]
    print('accuracy:', test_accuracy[i])
    print('recall 0:',test_recall_0[i],',recall 1:',test_recall_1[i],',recall 2:',test_recall_2[i])
    print('precision 0:',test_precision_0[i],',precision 1:',test_precision_1[i],',precision 2:',test_precision_2[i])
    print('F1 score 0:', test_f1_0[i], 'F1 score 1:', test_f1_1[i],',f1 2:',test_f1_2[i])
print("Total Confusion Matrix:\n",total_conf)
print("Avg accuracy:",np.array(test_accuracy).mean())
print("Avg recall 0:",np.array(test_recall_0).mean(),",Avg recall 1:",np.array(test_recall_1).mean(),",Avg recall 2:",np.array(test_recall_2).mean())
print("Avg precision 0:",np.array(test_precision_0).mean(),",Avg precision 1:",np.array(test_precision_1).mean(),",Avg precision 2:",np.array(test_precision_2).mean())
print("Avg f1 score 0:",np.array(test_f1_0).mean(),",Avg f1 score 1:",np.array(test_f1_1).mean(),",Avg f1 score 2:",np.array(test_f1_2).mean())
#print("Avg f1 score:",np.array(test_f1).mean())

In [None]:
class get_intermediate_output(nn.Module):
    def __init__(self, model1, model2):
        super(get_intermediate_output, self).__init__()
        self.lyric = model1
        self.convNet = model2
        self.dropout = nn.Dropout(0.5)
        self.linear = nn.Sequential(
            nn.Linear(60,32),  
            nn.ReLU(),
            nn.Linear(32,2)          
        )
        
    def forward(self, input_ids, attention_mask, mel):
        lyric_outputs = self.lyric.base_model(input_ids, attention_mask=attention_mask)
        convNet_outputs = self.convNet.conv_layers(mel)
        convNet_outputs = self.convNet.pool(convNet_outputs)
        convNet_outputs = convNet_outputs.squeeze(2).squeeze(2)  
        outputs = torch.cat((lyric_outputs[0], convNet_outputs),1)
        outputs = self.dropout(outputs)
        outputs = self.linear[0](outputs)                  #output:2

        return outputs


In [None]:
def intermediate_output_best(test_dataloader,fold = 5, load_file = 'valence_fusion_convbert_3class_best', accuracy = 588):
    print('testing')
    poolings = [(2, 4), (3, 4), (2, 5), (2, 4), (4, 4)]
    args = [5,#num_conlayer
                [32, 32, 32, 32, 32],#num_feat_map
                1.0, #feat_scale_factor
                [(3, 3), (3, 3), (3, 3), (3, 3), (3, 3)], #convSizes
                poolings, #pool_sozes
                0.0, #dropout_conv
                (1,1,96,1360)]#intputshape
    model = get_intermediate_output(PosModel(load_file + "/fold{}/model_".format(fold)+str(accuracy)),ConvNet(*args, conv_until = 5)).to(device)
    model.convNet.conv_layers = torch.load(load_file + "/fold{}/convNet_".format(fold)+str(accuracy)+ '.pt')
    model.linear = torch.load(load_file + "/fold{}/hybrid_model_".format(fold)+str(accuracy)+ '.pt')
    
    model.eval()
    layer_output = []
    ans = []
    batch_id = 0
    correct = 0
    #alpha = 0.5
    with torch.no_grad(): 
        for batch in test_dataloader:
            input_ids =  batch[0].to(device)
            attention_mask =  batch[1].to(device)
            mel = batch[2].to(device)
            mel = mel.to(torch.float32)
            labels = batch[3].to(device)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                mel = mel
            )
            layer_output.append(outputs[0].cpu())
            ans.append(labels.cpu().item())
            batch_id += 1
            print('batch:', batch_id, end='\r')
    
    return layer_output, ans

In [None]:
for i, (train, test) in tqdm(enumerate(kfold.split(input_ids, choi_rep, valence))):
    if i != 4:
        continue
    print('Fold {}:'.format(i+1))
    test_dataset = TrainDataset(input_ids = np.array(input_ids, dtype="object")[test], attention_masks = np.array(attention_masks, dtype="object")[test], id=np.array(sp_id, dtype="object")[test], label = valence[test])
    test_dataloader =  DataLoader(test_dataset,batch_size=1,shuffle=False)
    
    
    layer_output, ans = intermediate_output_best(test_dataloader,i+1, accuracy=588)
nplayer_output = [t.numpy() for t in layer_output[0:1000]]
nplayer_output = np.asarray(nplayer_output)
nplayer_output.shape

In [None]:
lyr_conv_tsne = TSNE(n_components=2, init='random', random_state=5, verbose=1).fit_transform(nplayer_output)

In [None]:
df = pd.DataFrame()
df["y"] = ans[0:1000]
df["comp-1"] = lyr_conv_tsne[:,0]
df["comp-2"] = lyr_conv_tsne[:,1]

sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 3),
                data=df).set(title="BERT + convNet T-SNE projection") 