In [None]:
# !pip install transformers
# !pip install transformers[torch]
# !pip install tensorboardX
# !pip install pytorch_lightning

In [None]:
import pickle
import numpy as np
# import matplotlib.pyplot as plt
import pandas as pd
import os
from transformers import BertTokenizer,  BertForSequenceClassification, AdamW, BertConfig
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils import data
from tensorboardX import SummaryWriter
# from torchvggish import vggish, vggish_input
import sys
import random
import csv
from sklearn.metrics import confusion_matrix,classification_report
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

from datetime import datetime

In [None]:
def file_search(dirname, ret, list_avoid_dir=[]): 
    filenames = os.listdir(dirname)
    for filename in filenames: 
        full_filename = os.path.join(dirname, filename)
        if os.path.isdir(full_filename):
            if full_filename.split("/")[-1] in list_avoid_dir:
                continue
            else: 
                file_search(full_filename, ret, list_avoid_dir)
        else: 
            ret.append(full_filename)
            
def find_encoding(filename):
    rawdata = open(filename, 'rb').read()
    result = chardet.detect(rawdata)
    charenc = result['encoding']
    
def create_folder(dir_name): 
    if not os.path.exists(dir_name): 
        os.makedirs(dir_name)
        
def extract_trans(list_in_file, out_file):
    lines = []
    for in_file in list_in_file:
        cnt = 0
        encodings_to_try = ['latin-1', 'ISO-8859-1', 'utf-16']
        lines = None
        
        for encoding in encodings_to_try:
            try:
                with open(in_file, 'r', encoding=encoding) as f1:
                    lines = f1.readlines()
                break  # If successful, exit the loop
            except UnicodeDecodeError:
                print(f"Failed to decode {in_file} with {encoding} encoding. Trying another encoding.")
                continue
        
        with open(out_file, 'a', encoding='latin-1') as f2:  # Use 'a' for append mode
            csv_writer = csv.writer(f2)
            lines = sorted(lines)
            
            for line in lines:
                name = line.split(':')[0].split(' ')[0].strip()
                if name[:3] != 'Ses': 
                    continue
                elif name[-3: -1] == 'XX': 
                    continue
                trans = line.split(':')[1].strip()
                cnt += 1
                csv_writer.writerow([name, trans])
                    
        

In [None]:
list_files = []
for x in range(5):
    sess_name = 'Session' + str(x + 1)
    path = 'data/IEMOCAP_full_release'
    file_search(path, list_files)
    print(sess_name + ', #sum files: ' + str(len(list_files)))
    
extract_trans(list_files, 'processed_trans.csv')
    

In [None]:
file = pd.read_csv("processed_trans.csv")
headerList = ['sessionID', 'text']
  
# converting data frame to csv
file.to_csv("processed_trans_head.csv", header=headerList, index=False)

In [None]:
def find_category(lines):
    list_category = ['ang', 'hap', 'sad', 'neu', 'fru', 'exc', 'fea', 'sur', 'dis', 'oth', 'xxx']
    category = {}
    for cate in list_category: 
        if category.__contains__(cate):
            pass
        else: 
            category[cate] = len(category)
    is_target = True
    id = ''
    c_label = ''
    list_ret = []
    for line in lines: 
        if is_target == True: 
            try: 
                id = line.split('\t')[1].strip()
                label = line.split('\t')[2].strip()
                if not category.__contains__(label):
                    print('ERROR: we can\'t find ', label)
                    sys.exit()
                list_ret.append([id, label])
                is_target = False
            except: 
                print('ERROR ', lone)
                sys.exit()
        else:
            if line == '\n':
                is_target = True
    return list_ret

    

In [None]:
def extract_labels(list_in_file, out_file) :
    id = ''
    lines = []
    list_ret = []
    
    for in_file in list_in_file:
        
        with open(in_file, 'r') as f1:
            lines = f1.readlines()
            lines = lines[2:]                           # remove head
            list_ret = find_category(lines)
            
        list_ret = sorted(list_ret)                   # sort based on first element
    
        with open(out_file, 'a') as f2:
            csv_writer = csv.writer(f2)
            csv_writer.writerows(list_ret)

In [None]:
list_files = []
list_avoid_dir = ['Attribute', 'Categorical', 'Self-evaluation']
for x in range(5): 
    sess_name = "Session" + str(x + 1)
    path = 'data/IEMOCAP_full_release/' + sess_name + '/dialog/EmoEvaluation/'
    file_search(path, list_files, list_avoid_dir)
    list_files = sorted(list_files)
    print(sess_name + ", #sum files: " + str(len(list_files)))
extract_labels(list_files, "processed_labels.csv")


In [None]:
file = pd.read_csv("processed_labels.csv")
file.to_csv("processed_labels_head.csv", header=['sessionID', 'label'], index=False)


In [None]:
plt.title('Count of Emotions', size=16)
sns.countplot(file['label'])
plt.ylabel('Count', size=12)
plt.xlabel('Emotions', size=12)
sns.despine(top=True, right=True, left=False, bottom=False)
plt.show()

In [None]:
df1 = pd.read_csv('processed_labels_head.csv')
df1.loc[df1["label"] == "ang", "label"] = 0
df1.loc[df1["label"] == "hap", "label"] = 1
df1.loc[df1["label"] == "exc", "label"] = 1
df1.loc[df1["label"] == "sad", "label"] = 1
df1.loc[df1["label"] == "neu", "label"] = 2
df1.loc[df1["label"] == "fru", "label"] = 3
df1.loc[df1["label"] == "fea", "label"] = -1
df1.loc[df1["label"] == "sur", "label"] = -1
df1.loc[df1["label"] == "dis", "label"] = -1
df1.loc[df1["label"] == "oth", "label"] = -1
df1.loc[df1["label"] == "xxx", "label"] = -1


In [None]:
df1.to_csv("processed_digital_labels_head.csv", index=False)

In [None]:
data1 = pd.read_csv("processed_trans_head.csv")
data2 = pd.read_csv("processed_digital_labels_head.csv")
translabels = pd.merge(data1, data2, on = 'sessionID' , how = 'inner')
translabels.to_csv("processed_trans_labels_head.csv")
# data1.head()

In [None]:
list_files = []
for x in range(5): 
    sess_name = "Session" + str(x+1)
    path = 'data/IEMOCAP_full_release/' + sess_name + '/sentences/wav/'
    file_search(path, list_files)
    list_files = sorted(list_files)
    print(sess_name + ", #sum_file: " + str(len(list_files)))
    

In [None]:
df = pd.read_csv('processed_trans_labels_head.csv')
df.head()

In [None]:
len(df)

In [None]:
docs = []
for text, label in zip(df['text'], df['label']): 
    if label != -1: 
        docs.append({'text': text, 'label': label})
        

In [None]:
# no_rows = len(list_files)
# index = 0
# sprectrogram_shape = []
# docs = []
# bookmark = 0
# extraLabel = 0 
# for file in list_files: 
#     if file.split('/')[-1].endswith('.wav'):
#         filename = file.split('/')[-1].strip('.wav')
#         label = df.loc[df['sessionID'] == filename]['label'].values[0]
#         text = df.loc[df['sessionID'] == filename]['text'].values[0]
#         if label != -1: 
#             input_batch = 

In [None]:
random.shuffle(docs)
random.shuffle(docs)
random.shuffle(docs)
total_len = len(docs)
train_len = int(0.8 * total_len)
train_list = docs[: train_len]
test_list = docs[train_len: ]
print('no of items for train', len(train_list))
print('no of items for test', len(test_list))

In [None]:
for i in range(len(train_list)):
    train_list[i]['label'] = int(train_list[i]['label'])
#     print(train_list[i]['label'])

In [None]:
%load_ext tensorboard
%tensorboard --logdir ./

In [None]:
from transformers import BertForSequenceClassification, AdamW, BertConfig
 
model = BertForSequenceClassification.from_pretrained(
    "bert-base-uncased", 
    num_labels = 4,   
    output_attentions = False,
    output_hidden_states = False, 
)
print(model)
params = list(model.named_parameters())
# optimizer = AdamW(model.parameters(),
#                   lr = 2e-5,
#                   eps = 1e-8 
#                 )
# from transformers import get_linear_schedule_with_warmup

# Number of training epochs. The BERT authors recommend between 2 and 4. 
NUM_EPOCHS=4


writer = SummaryWriter(log_dir='results')
total_steps = len(train_list) * NUM_EPOCHS

# Create the learning rate scheduler.
import torch.optim as optim
optimizer = optim.Adam(params=model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)


In [None]:
from tqdm import tqdm
start_epoch = 0
total_steps = 1
NUM_EPOCHS= 4 
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model.train()
model.to('cuda')
for epoch in tqdm(range(start_epoch, NUM_EPOCHS)):
    print("*"*80)
    print("Epochs:", epoch)
    print("*"*80)
    lr_scheduler.step()
    random.shuffle(train_list)
    for every_trainlist in train_list:
        label1=every_trainlist['label']
        text=every_trainlist['text']
        label1=torch.tensor([label1])
        optimizer.zero_grad()
        input_ids = torch.tensor(tokenizer.encode(text, add_special_tokens=True)).unsqueeze(0) 
        label1=label1.to(device)
        input_ids=input_ids.to(device)
#         print(input_ids)
        output = model(input_ids, labels=label1)
        loss, logits = output[0], output[1]
#         loss = criterion(output, label1)
#         print('loss',loss.item())
        loss.backward()
        optimizer.step()
        _, preds = torch.max(logits, 1)
        accuracy = torch.sum(preds == label1)
#         print('accuracy.item()',accuracy.item())
        if total_steps % 10 == 0:
            with torch.no_grad():
                _, preds = torch.max(logits, 1)
                accuracy = torch.sum(preds == label1)
                writer.add_scalar('loss', loss.item(), total_steps)
                writer.add_scalar('accuracy', accuracy.item(), total_steps)                     
        total_steps+=1

In [None]:
y = []
y_pred = []
model.to('cpu')
model.eval()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
for every_test_list in test_list: 
    label1 = every_test_list['label']
    label1 = torch.tensor([label1])
    text = every_test_list['text']
    input_ids = torch.tensor(tokenizer.encode(text, add_special_tokens=True)).unsqueeze(0)
    with torch.no_grad():
        loss, output = model(input_ids, labels=label1)
        _, preds = torch.max(output, 1)
        y.append(label.numpy()[0])
        y_pred.append(preds.numpy()[0])
    

In [None]:
cm = confusion_matrix(y_actu, y_pred)
print(cm)

In [None]:
cmn = (cm.astype('float') / cm.sum(axis=1)[:, np.newaxis])*100

ax = plt.subplots(figsize=(8, 5.5))[1]
sns.heatmap(cmn, cmap='flare', annot=True, square=True, linecolor='black', linewidths=0.75, ax = ax, fmt = '.2f', annot_kws={'size': 16})
ax.set_xlabel('Predicted', fontsize=18, fontweight='bold')
ax.xaxis.set_label_position('bottom')
ax.xaxis.set_ticklabels(["Anger", "Happiness", "Sadness", "Neutral"], fontsize=16)
ax.set_ylabel('Ground Truth', fontsize=18, fontweight='bold')
ax.yaxis.set_ticklabels(["Anger", "Happiness", "Sadness", "Neutral"], fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
classification_report(y, y_pred)

In [None]:
# torch.save(model, '/kaggle/working/model_text.pt')