In [2]:
import transformers
import torch
import os, sys, random, re, collections, string
import numpy as np
import csv
import sklearn.model_selection
import sklearn.metrics
import heapq
import matplotlib
from sklearn.model_selection import train_test_split
import tqdm
from datasets import load_dataset
from transformers import AutoModelForCausalLM
from collections import Counter
from transformers import BertTokenizer
from transformers import XLNetTokenizer
from transformers import pipeline
from transformers import TrainingArguments
from transformers import BertForSequenceClassification, XLNetForSequenceClassification
from transformers import Trainer
import torch
import torch.nn as nn
from transformers import BertModel


tokenizer_bert = BertTokenizer.from_pretrained("bert-base-uncased")
tokenizer_xlnet = XLNetTokenizer.from_pretrained("xlnet-base-cased")

device = torch.device('mps')

In [3]:
emotion_pipeline = pipeline("text-classification", model = "joeddav/distilbert-base-uncased-go-emotions-student",device = device)
dataset = load_dataset("chloeliu/lyrics")
import nltk
from nltk.tokenize import word_tokenize

nltk.download('punkt')
def preprocess(text):
    tokens = word_tokenize(text)
    tokens = [token.lower() for token in tokens]
    tokens = [token for token in tokens if token.isalpha()]
    return tokens
dataset = dataset.map(lambda x: {"lyrics": preprocess(x['lyrics'])})


[nltk_data] Downloading package punkt to
[nltk_data]     /Users/dhruvcharan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [4]:
def classify_emotions_batch(batch):
    texts = [' '.join(lyrics) for lyrics in batch['lyrics']]
    texts = [text[:512] for text in texts]
    results = emotion_pipeline(texts)
    dominant_emotions = []
    for result in results:
        if result:
            dominant_emotions.append(result['label'])
        else:
            dominant_emotions.append('None')
    return {'emotion': dominant_emotions}

emotion_labeled_dataset = dataset.map(classify_emotions_batch, batched=True,batch_size=16)

In [5]:
emotion_labeled_dataset.save_to_disk('emotion_labeled_dataset-fixed')

Saving the dataset (0/1 shards):   0%|          | 0/28372 [00:00<?, ? examples/s]

In [6]:
def get_label_dict(dataset):
    unique_labels = sorted(set(dataset['emotion']))
    label_dict = {label: idx for idx, label in enumerate(unique_labels)}
    return label_dict

label_dict = get_label_dict(emotion_labeled_dataset['train'])



In [7]:
def preprocess_function(examples, tokenizer):

    joined_lyrics = [' '.join(lyric) for lyric in examples['lyrics']]

    tokenized_inputs = tokenizer(joined_lyrics, truncation=True, max_length=512, padding="max_length")

    tokenized_inputs['labels'] = [label_dict[label] for label in examples['emotion']]

    return tokenized_inputs

def tokenize_function(examples):
    return preprocess_function(examples, tokenizer_bert)
def tokenize_function_xlnet(examples):
    return preprocess_function(examples,tokenizer_xlnet)

In [8]:
train_test_split = emotion_labeled_dataset["train"].train_test_split(test_size=0.2)
train_dataset = train_test_split['train'].map(tokenize_function, batched=True)
test_dataset = train_test_split['test'].map(tokenize_function, batched=True)
train_dataset_xlnet = train_test_split['train'].map(tokenize_function_xlnet, batched=True)
test_dataset_xlnet = train_test_split['test'].map(tokenize_function_xlnet, batched=True)
num_labels = len(label_dict)

Map:   0%|          | 0/22697 [00:00<?, ? examples/s]

Map:   0%|          | 0/5675 [00:00<?, ? examples/s]

Map:   0%|          | 0/22697 [00:00<?, ? examples/s]

Map:   0%|          | 0/5675 [00:00<?, ? examples/s]

In [9]:
import matplotlib.pyplot as plt
from transformers import TrainerCallback

class MetricsLoggingCallback(TrainerCallback):
    def __init__(self):
        super().__init__()
        self.train_losses = []
        self.eval_losses = []
        self.eval_accuracy = []

    def on_epoch_end(self, args, state, control, **kwargs):
        print(state.log_history)
        if state.log_history:
            for log in state.log_history:
                if 'loss' in log:
                    self.train_losses.append(log['loss'])
                if 'eval_loss' in log:
                    self.eval_losses.append(log['eval_loss'])
                if 'eval_accuracy' in log:
                    self.eval_accuracy.append(log['eval_accuracy'])
            self.plot_metrics()


    def plot_metrics(self):
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.plot(self.train_losses, label='Training Loss')
        plt.plot(self.eval_losses, label='Validation Loss')
        plt.title('Training & Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(self.eval_accuracy, label='Validation Accuracy')
        plt.title('Validation Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()

        plt.show()

metrics_logging_callback = MetricsLoggingCallback()

In [12]:

from sklearn.metrics import accuracy_score

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return {"accuracy": accuracy_score(labels, predictions)}

model_bert = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels)
model_bert.to(device)
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    evaluation_strategy="steps",
    eval_steps=500,
    logging_strategy="steps",
    logging_steps=500,
    save_strategy = 'epoch',
    per_device_eval_batch_size=16,
    warmup_steps=500,
    weight_decay=0.01,

    logging_dir='./logs'
)


trainer_bert = Trainer(
    model=model_bert,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    callbacks=[metrics_logging_callback]
)

trainer_bert.train()


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


In [None]:
trainer_bert.evaluate()

In [None]:
predictions = trainer_bert.predict(test_dataset)


In [None]:
print(label_dict)

In [16]:
def get_reverse_label_dict(label_dict):
    reverse_label_dict = {idx: label for label, idx in label_dict.items()}
    return reverse_label_dict

reverse_label_dict = get_reverse_label_dict(label_dict)



In [None]:

predicted_label_ids = np.argmax(predictions.predictions, axis=-1)
predicted_labels = [reverse_label_dict[id] for id in predicted_label_ids]
predictions_train = trainer_bert.predict(train_dataset)
predicted_label_ids_train = np.argmax(predictions_train.predictions, axis=-1)
predicted_labels_train = [reverse_label_dict[id] for id in predicted_label_ids_train]



In [None]:
train_dataset = train_dataset.to_pandas()
train_dataset['predicted_label'] = predicted_labels_train


In [None]:
train_dataset.to_csv('train_dataset_bert.csv')

In [None]:
test_dataset = test_dataset.to_pandas()
test_dataset['predicted_label'] = predicted_labels

In [None]:
test_dataset.to_csv('test_dataset_bert.csv')

In [14]:
model_xlnet = XLNetForSequenceClassification.from_pretrained("xlnet-base-cased", num_labels=num_labels)
model_xlnet.to(device)
training_args_xlnet = TrainingArguments(
    output_dir='./results_xlnet',
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    eval_steps=500,
    logging_strategy="steps",
    logging_steps=500,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs_xlnet',
    evaluation_strategy="epoch"
)

trainer_xlnet = Trainer(
    model=model_xlnet,
    args=training_args_xlnet,
    train_dataset=train_dataset_xlnet,
    eval_dataset=test_dataset_xlnet,
    compute_metrics = compute_metrics,
    callbacks=[metrics_logging_callback]
)

trainer_xlnet.train()


Some weights of XLNetForSequenceClassification were not initialized from the model checkpoint at xlnet-base-cased and are newly initialized: ['logits_proj.bias', 'logits_proj.weight', 'sequence_summary.summary.bias', 'sequence_summary.summary.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


In [17]:
predictions_xlnet = trainer_xlnet.predict(test_dataset_xlnet)
predicted_label_ids_xlnet = np.argmax(predictions_xlnet.predictions, axis=-1)
predicted_labels_xlnet = [reverse_label_dict[id] for id in predicted_label_ids_xlnet]
predictions_train_xlnet = trainer_xlnet.predict(train_dataset_xlnet)
predicted_label_ids_train_xlnet = np.argmax(predictions_train_xlnet.predictions, axis=-1)
predicted_labels_train_xlnet = [reverse_label_dict[id] for id in predicted_label_ids_train_xlnet]

train_dataset_xlnet_df = train_dataset_xlnet.to_pandas()
train_dataset_xlnet_df['predicted_label'] = predicted_labels_train_xlnet
train_dataset_xlnet_df.to_csv('train_dataset_xlnet.csv')
test_dataset_xlnet_df = test_dataset_xlnet.to_pandas()
test_dataset_xlnet_df['predicted_label'] = predicted_labels_xlnet
test_dataset_xlnet_df.to_csv('test_dataset_xlnet.csv')

  0%|          | 0/355 [00:00<?, ?it/s]

  0%|          | 0/1419 [00:00<?, ?it/s]

In [None]:
class BertWithAdditionalFeatures(nn.Module):
    def __init__(self, num_labels, num_additional_features):
        super().__init__()
        self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.dropout = nn.Dropout(0.1)
        self.feature_layer = nn.Linear(num_additional_features, 50)
        self.classifier = nn.Linear(self.bert.config.hidden_size + 50, num_labels)

    def forward(self, input_ids, attention_mask, additional_features):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        feature_output = self.feature_layer(additional_features)
        concatenated_output = torch.cat((pooled_output, feature_output), dim=1)
        concatenated_output = self.dropout(concatenated_output)
        logits = self.classifier(concatenated_output)
        return logits
    
def preprocess_function_FA(examples, tokenizer):
    joined_lyrics = [' '.join(lyric) for lyric in examples['lyrics']]
    tokenized_inputs = tokenizer(joined_lyrics, truncation=True, max_length=512, padding="max_length")
    tokenized_inputs['labels'] = [label_dict[label] for label in examples['emotion']]
    tokenized_inputs['additional_features'] = examples['additional_features']
    return tokenized_inputs
def tokenize_function_FA(examples):
    return preprocess_function_FA(examples, tokenizer_bert)

In [None]:
emotion_labeled_dataset_df = emotion_labeled_dataset.to_pandas()
one_hot_encoded_features = ['artist_name','genre','topic']
for feature in one_hot_encoded_features:
    one_hot_encoded = pd.get_dummies(emotion_labeled_dataset_df[feature])
    emotion_labeled_dataset_df = pd.concat([emotion_labeled_dataset_df, one_hot_encoded], axis=1)
    emotion_labeled_dataset_df = emotion_labeled_dataset_df.drop(columns=[feature])
additional_features = list(emotion_labeled_dataset_df.columns[1:]) - ['lyrics','emotion']
emotion_labeled_dataset_fa = Dataset.from_pandas(emotion_labeled_dataset_df)
train_test_split_fa = emotion_labeled_dataset_fa["train"].train_test_split(test_size=0.2)
train_dataset_fa = train_test_split_fa['train'].map(tokenize_function_FA, batched=True)
test_dataset_fa = train_test_split_fa['test'].map(tokenize_function_FA, batched=True)





In [None]:
training_args_fa = TrainingArguments(
    output_dir='./results_fanet',
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    eval_steps=500,
    logging_strategy="steps",
    logging_steps=500,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs_fa',
    evaluation_strategy="epoch"
)

trainer_fa = Trainer(
    model=model_fa,
    args=training_args_fa,
    train_dataset=train_dataset_fa,
    eval_dataset=test_dataset_fa,
    compute_metrics=compute_metrics,
    callbacks=[metrics_logging_callback]
)

trainer_fa.train()
