In [None]:
!pip install datasets scikit-multilearn transformers
from google.colab import drive

drive.mount('/content/drive')
%cd /content/drive/MyDrive/Colab\ Notebooks/

target_labels = ["happiness", "surprise", "disgust", "anger", "neutral"]


#Reformat and Save Datasets

##Process and save emotion data as csv for manual labelling

In [None]:
from datasets import load_dataset
target_labels = ["happiness", "surprise", "disgust", "anger", "neutral"]

def map_data_to_column(example):
  if example['label'] is 1:
    example['happiness'] = 1
  if example['label'] is 3:
    example['anger'] = 1
  if example['label'] is 5:
    example['surprise'] = 1
  return example

datasets_emotion = load_dataset("emotion")
emotion_dataset = datasets_emotion.get("train")
emotion_dataset = emotion_dataset.select(range(500))
dummy_values = [0] * len(emotion_dataset)
for label in target_labels:
  emotion_dataset = emotion_dataset.add_column(label, dummy_values)
emotion_dataset = emotion_dataset.map(map_data_to_column)
emotion_dataset = emotion_dataset.remove_columns(['label'])

emotion_dataset_csv_path = 'emotion.csv'
emotion_dataset.to_csv(emotion_dataset_csv_path, index=False)  

##Process and save go emotions dataset

In [5]:
from datasets import load_dataset, concatenate_datasets
def reformat_go_emotions():
  datasets_go_emotions = load_dataset("go_emotions", "raw")
  dataset_list = []
  
  for go_dataset in datasets_go_emotions:
    go_dataset = datasets_go_emotions.get(go_dataset)
    go_dataset = go_dataset.rename_column("joy", "happiness")
    qualifying_data_index = []
    for go_index, go_data in enumerate(go_dataset):
      sum_of_target_labels = 0
      for label in target_labels:
        sum_of_target_labels += go_data[label]
      if sum_of_target_labels >= 1:
        qualifying_data_index.append(go_index)
    go_dataset = go_dataset.select(qualifying_data_index)
    dataset_list.append(go_dataset)
  filtered_dataset = concatenate_datasets(dataset_list)
  
  features_to_keep = target_labels + ['text']
  for feature in filtered_dataset.features:
    if feature not in features_to_keep:
      filtered_dataset = filtered_dataset.remove_columns([feature])
  return filtered_dataset

In [None]:
go_emotions_dataset = reformat_go_emotions()
go_emotions_dataset_csv_path = "go_emotions.csv"
go_emotions_dataset.to_csv(go_emotions_dataset_csv_path, index=False)  

#Preprocess Dataset

##Import datasets from fs 

In [None]:
from datasets import load_dataset

go_emotions_dataset_csv_path = "go_emotions.csv"
labelled_emotion_dataset_csv_path = "emotion.csv"

go_emotions_dataset = load_dataset("csv", data_files=go_emotions_dataset_csv_path)['train'].select(range(2000))
emotion_dataset = load_dataset("csv", data_files=labelled_emotion_dataset_csv_path)['train']

In [None]:
from datasets import concatenate_datasets
from skmultilearn.model_selection import iterative_train_test_split
import numpy as np
from datasets import Dataset

def organize_labels_into_single_feature(example):
  example['label'] = []
  for label in target_labels:
    example['label'].append(example[label])
  return example

dataset = concatenate_datasets([go_emotions_dataset, emotion_dataset])
dataset = dataset.map(organize_labels_into_single_feature)
dataset = dataset.remove_columns(target_labels)
dataset = dataset.shuffle(seed=42)

dataset_text = []
for text in dataset['text']:
  dataset_text.append([text])

x_train, y_train, x_val, y_val = iterative_train_test_split(np.array(dataset_text), np.array(dataset['label']), test_size=0.3)

print(x_train[0])


In [None]:
!pip3 install transformers torch

# Bert Model

## Tokenizer

In [None]:
from transformers import BertTokenizer
model = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(model)
sample = "hello how are you"
bert_input = tokenizer(sample, return_tensors="pt")
print(bert_input)

## Model

In [None]:
from torch import nn 
from transformers import BertModel, BertForSequenceClassification,TrainingArguments, Trainer
labels = ["happiness", "surprise", "disgust", "anger", "neutral"]
id2label = {idx:label for idx, label in enumerate(labels)}
label2id = {label:idx for idx, label in enumerate(labels)}
classifier = BertForSequenceClassification.from_pretrained(model, num_labels=5, problem_type="multi_label_classification",
                                                               id2label=id2label,
                                                               label2id=label2id)

## Preprocess Data

In [None]:
from datasets import load_dataset, Dataset, ClassLabel, DatasetDict
train = []
for i in range(len(x_train)):
  data, label = x_train[i][0], y_train[i]
  dict_ = {"text":data}
  for j in range(len(labels)):
    dict_[labels[j]] = label[j]
  train.append(dict_)

eval = []
for i in range(len(x_val)):
  data, label = x_val[i][0], y_val[i]
  dict_ = {"text":data}
  for j in range(len(labels)):
    dict_[labels[j]] = label[j]
  eval.append(dict_)
dataset_ = DatasetDict({"train":Dataset.from_list(train),
                    "eval":Dataset.from_list(eval)})
    

print(dataset_["train"][0])
def preprocess_data(examples):
  text = examples['text']
  
  bert_token = tokenizer(text, truncation=True, return_tensors="pt", padding="max_length", max_length=128)
  labels_batch = {k:examples[k] for k in examples.keys() if k in labels}
  labels_matrix = np.zeros((len(text), len(labels)))
  for idx, label in enumerate(labels):
    labels_matrix[:, idx] = labels_batch[label]
  bert_token["labels"] = labels_matrix.tolist()
  return bert_token

encode_set = dataset_.map(preprocess_data, batched=True, remove_columns=dataset_['train'].column_names)
encode_set.set_format("torch")
train_set = encode_set["train"]
eval_set = encode_set["eval"]

In [None]:
print(encode_set)

## Fine-tune model

In [13]:
print(id2label)
print(label2id)
print([id2label[idx] for idx, label in enumerate(train_set[0]["labels"]) if label == 1.0])

{0: 'happiness', 1: 'surprise', 2: 'disgust', 3: 'anger', 4: 'neutral'}
{'happiness': 0, 'surprise': 1, 'disgust': 2, 'anger': 3, 'neutral': 4}
['anger']


In [None]:
import torch
batch = 16
wd = 1e-5
lr =1e-5
def compute_metrics(eval_pred):
  logits, labels = eval_pred
  prediction = torch.sigmoid(torch.tensor(logits)) > 0.5
  match = prediction == torch.tensor(labels)
  return {"accuracy": match.sum().item() / (logits.shape[0] * logits.shape[1])}
arg =  TrainingArguments(output_dir="ece1786", 
                         evaluation_strategy="epoch",
                          num_train_epochs=5,
                          learning_rate=lr,
                          weight_decay=wd,
                          per_device_train_batch_size=batch,
                          per_device_eval_batch_size=batch,
                          logging_strategy="epoch",
                          save_strategy="epoch"
                          )

trainer = Trainer(model=classifier,
                  args=arg,
                  train_dataset=train_set,
                  eval_dataset=eval_set,
                  compute_metrics=compute_metrics,
                  tokenizer=tokenizer)
result = trainer.train()