In [None]:
from transformers import GPT2Tokenizer, GPT2Model, GPT2ForSequenceClassification, GPT2Config
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from torch.optim import Adam
from torch.optim import lr_scheduler
from sklearn.metrics import accuracy_score, f1_score
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.cuda.amp import autocast, GradScaler


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
checkpoint_path = '/content/drive/My Drive/ece1786_project/model_checkpoint_ForSC.pth'
# load_path = '/content/drive/My Drive/ece1786_project/model_checkpoint_stemmed1.pth'
# state = torch.load(load_path)['model_state_dict']


In [None]:
model_name = "gpt2"
tokenizer = GPT2Tokenizer.from_pretrained(model_name)

In [None]:
torch.cuda.empty_cache()

In [None]:
import pandas as pd

df = pd.read_csv("/content/lyrics_preprocessed_unique.csv")


# sampling fewer data
df = df.groupby('Mood_encod').apply(lambda x: x.sample(n=120, random_state=42)).reset_index(drop=True)
df = df.sample(frac=1).reset_index(drop=True)

lyrics = df['lyrics'].values
mood = df['Mood_encod'].values


In [None]:
df['lyrics'].shape
df['Mood_encod'].value_counts()


In [None]:
lyrics1 = lyrics.tolist()
len(lyrics1)
mood1 = mood.tolist()
len(mood1)


In [None]:
# splitting of data and tokenization
train_inputs, val_inputs, train_labels, val_labels = train_test_split\
    (lyrics1, mood1, test_size = 0.2, random_state = 42)

max_token_number = 512
tokenizer.pad_token = tokenizer.eos_token
train_inputs1 = tokenizer(train_inputs, padding=True, truncation=True, max_length = max_token_number, return_tensors="pt")
val_inputs1 = tokenizer(val_inputs, padding=True, truncation=True, max_length = max_token_number, return_tensors="pt")

train_labels = torch.tensor(train_labels)
val_labels = torch.tensor(val_labels)

train_data = TensorDataset(train_inputs1.input_ids, train_inputs1.attention_mask, train_labels)
val_data = TensorDataset(val_inputs1.input_ids, val_inputs1.attention_mask, val_labels)


In [None]:
# GPT2Model + Custom Classification Head Model

# class GPT2MoodClassifier(nn.Module):
#     def __init__(self, gpt2_model, num_classes = 4):
#         super(GPT2MoodClassifier, self).__init__()
#         self.gpt2_model = gpt2_model
#         self.classification_head = nn.Linear(gpt2_model.config.hidden_size, num_classes)

#     def forward(self, input_ids, attention_mask):
#         outputs = self.gpt2_model(input_ids, attention_mask = attention_mask)
#         last_hidden_state = outputs[0]
#         cls_hidden_state = last_hidden_state[:,0,:]
#         logits = self.classification_head(cls_hidden_state)
#         return logits

# gpt2_model = GPT2Model.from_pretrained(model_name)
# num_classes = len(set(mood))
# model = GPT2MoodClassifier(gpt2_model, num_classes = num_classes)
# model.load_state_dict(state)

In [None]:
# GPT2ForSequenceClassification Model
class GPT2MoodClassifier(nn.Module):
    def __init__(self, gpt2_model, num_classes=4):
        super(GPT2MoodClassifier, self).__init__()
        self.gpt2_for_classification = gpt2_model
        self.num_classes = num_classes

    def forward(self, input_ids, attention_mask):
        outputs = self.gpt2_for_classification(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        return logits


num_classes = len(set(mood))
config = GPT2Config.from_pretrained(model_name, num_labels=num_classes, pad_token_id=50256)
gpt2_model = GPT2ForSequenceClassification.from_pretrained(model_name, config = config)
model = GPT2MoodClassifier(gpt2_model, num_classes = num_classes)

Some weights of GPT2ForSequenceClassification were not initialized from the model checkpoint at gpt2 and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# GPT2ForSequenceClassification Model with Regularization

# class GPT2MoodClassifier(nn.Module):
#     def __init__(self, gpt2_model, num_classes=4):
#         super(GPT2MoodClassifier, self).__init__()
#         self.gpt2_for_classification = gpt2_model
#         self.num_classes = num_classes

#         total_hidden_size = gpt2_model.config.hidden_size * gpt2_model.config.num_attention_heads
#         # Add batch normalization after the hidden layer
#         self.batch_norm = nn.BatchNorm1d(total_hidden_size)

#         # Add dropout layer
#         self.dropout = nn.Dropout(0.1)

#         # Linear layer for classification
#         self.classification_head = nn.Linear(gpt2_model.config.hidden_size, num_classes)

#     def forward(self, input_ids, attention_mask):
#         outputs = self.gpt2_for_classification(input_ids, attention_mask=attention_mask)
#         if hasattr(outputs, "past_key_values"):
#             last_hidden_state = outputs.past_key_values[0][-1]
#         else:
#             last_hidden_state = outputs.last_hidden_states

#         batch_size, num_heads, sequence_length, hidden_size_per_head = last_hidden_state.size()
#         last_hidden_state_reshaped = last_hidden_state.reshape(batch_size * num_heads * sequence_length, hidden_size_per_head)
#
          # Apply batch normalization
#         normalized_hidden_state = self.batch_norm(last_hidden_state_reshaped.permute(1, 0).contiguous().view(1, -1))

#         # Apply dropout
#         normalized_hidden_state = self.dropout(normalized_hidden_state)

#         # Get logits for classification
#         logits = self.classification_head(normalized_hidden_state[:, 0, :])

#         return logits

# num_classes = len(set(mood))
# config = GPT2Config.from_pretrained(model_name, num_labels=num_classes, pad_token_id=50256)
# gpt2_model = GPT2ForSequenceClassification.from_pretrained(model_name, config=config)
# model = GPT2MoodClassifier(gpt2_model, num_classes=num_classes)



In [None]:
# Loss Function, Optimizer and Dataloader

loss = nn.CrossEntropyLoss()
weight_decay = 1e-5
optimizer = torch.optim.Adam(model.parameters(), lr = 5e-6, weight_decay = weight_decay)
scheduler = lr_scheduler.StepLR(optimizer, 1.0, gamma = 0.9)

train_dataloader = DataLoader(train_data, batch_size = 2, shuffle = True)
val_dataloader = DataLoader(val_data, batch_size = 2, shuffle = True)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

GPT2MoodClassifier(
  (gpt2_for_classification): GPT2ForSequenceClassification(
    (transformer): GPT2Model(
      (wte): Embedding(50257, 768)
      (wpe): Embedding(1024, 768)
      (drop): Dropout(p=0.1, inplace=False)
      (h): ModuleList(
        (0-11): 12 x GPT2Block(
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (attn): GPT2Attention(
            (c_attn): Conv1D()
            (c_proj): Conv1D()
            (attn_dropout): Dropout(p=0.1, inplace=False)
            (resid_dropout): Dropout(p=0.1, inplace=False)
          )
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): GPT2MLP(
            (c_fc): Conv1D()
            (c_proj): Conv1D()
            (act): NewGELUActivation()
            (dropout): Dropout(p=0.1, inplace=False)
          )
        )
      )
      (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    )
    (score): Linear(in_features=768, out_features=4, bias=False)
  )
)

# **Original training and validation loop**

In [None]:
# T_loss = []
# V_loss = []
# T_acc = []
# V_acc = []
# T_f1 = []
# V_f1 = []

# for epoch in range(5):

#     model.train()
#     train_loss = 0
#     train_pred = []
#     train_true = []

#     for batch in tqdm(train_dataloader):
#         input_ids = batch[0].to(device)
#         labels = batch[1].to(device)

#         optimizer.zero_grad()

#         logits = model(input_ids)
#         loss_value = loss(logits, labels)

#         prob = nn.functional.softmax(logits, dim = -1)
#         pred = torch.argmax(prob, dim = -1)

#         loss_value.backward()

#         train_loss += loss_value.item()
#         train_pred.extend(pred.cpu().numpy())
#         train_true.extend(labels.cpu().numpy())

#         optimizer.step()

#     scheduler.step()

#     T_acc.append(accuracy_score(train_true, train_pred))
#     T_f1.append(f1_score(train_true, train_pred, average = 'weighted'))
#     T_loss.append(train_loss/len(train_dataloader))

#     print("Epoch: {}, Training Loss: {:.4f}, Accuracy: {:.4f}, F1 Score: {:.4f}"\
#           .format(epoch, train_loss/len(train_dataloader), \
#             accuracy_score(train_true, train_pred), f1_score(train_true, train_pred, average = 'weighted')))


#     model.eval()
#     val_loss = 0
#     val_pred = []
#     val_true = []

#     for batch in val_dataloader:
#         input_ids = batch[0].to(device)
#         labels = batch[1].to(device)

#         with torch.no_grad():
#             logits = model(input_ids)
#             prob = nn.functional.softmax(logits, dim = -1)
#             pred = torch.argmax(prob, dim = -1)

#             loss_value = loss(logits, labels)
#             val_loss += loss_value.item()
#             val_pred.extend(pred.cpu().numpy())
#             val_true.extend(labels.cpu().numpy())

#     V_acc.append(accuracy_score(val_true, val_pred))
#     V_f1.append(f1_score(val_true, val_pred, average = 'weighted'))
#     V_loss.append(val_loss/len(val_dataloader))

#     print("Epoch: {}, Validation Loss: {:.4f}, Accuracy: {:.4f}, F1 Score: {:.4f}"\
#           .format(epoch, val_loss/len(val_dataloader), accuracy_score(val_true, val_pred), \
#             f1_score(val_true, val_pred, average = 'weighted')))


# **Optimization using gradient accumulation and mixed precision**

In [None]:
T_loss = []
V_loss = []
T_acc = []
V_acc = []
T_f1 = []
V_f1 = []

accumulation_steps = 2
checkpoint = 5
scaler = GradScaler()

for epoch in range(30):

    model.train()
    train_loss = 0
    train_pred = []
    train_true = []

    for step, batch in enumerate(tqdm(train_dataloader)):
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        optimizer.zero_grad()

        with autocast():
          logits = model(input_ids,attention_mask)
          loss_value = loss(logits, labels)

        scaler.scale(loss_value).backward()

        if (step+ 1)% accumulation_steps ==0 :
          scaler.step(optimizer)
          scaler.update()
          optimizer.zero_grad()

        prob = nn.functional.softmax(logits, dim = -1)
        pred = torch.argmax(prob, dim = -1)


        train_loss += loss_value.item()
        train_pred.extend(pred.cpu().numpy())
        train_true.extend(labels.cpu().numpy())

    scheduler.step()

    if epoch%checkpoint == 0:
      torch.save({
          'epoch' : epoch,
          'model_state_dict': model.state_dict(),
          'optimizer_state_dict': optimizer.state_dict(),
          'loss' : loss_value,
      }, checkpoint_path)

    T_acc.append(accuracy_score(train_true, train_pred))
    T_f1.append(f1_score(train_true, train_pred, average = 'weighted'))
    T_loss.append(train_loss/len(train_dataloader))

    print("Epoch: {}, Training Loss: {:.4f}, Accuracy: {:.4f}, F1 Score: {:.4f}"\
          .format(epoch, train_loss/len(train_dataloader), \
            accuracy_score(train_true, train_pred), f1_score(train_true, train_pred, average = 'weighted')))


    model.eval()
    val_loss = 0
    val_pred = []
    val_true = []

    for batch in val_dataloader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)

        with torch.no_grad():
            logits = model(input_ids, attention_mask)
            prob = nn.functional.softmax(logits, dim = -1)
            pred = torch.argmax(prob, dim = -1)

            loss_value = loss(logits, labels)
            val_loss += loss_value.item()
            val_pred.extend(pred.cpu().numpy())
            val_true.extend(labels.cpu().numpy())

    V_acc.append(accuracy_score(val_true, val_pred))
    V_f1.append(f1_score(val_true, val_pred, average = 'weighted'))
    V_loss.append(val_loss/len(val_dataloader))

    print("Epoch: {}, Validation Loss: {:.4f}, Accuracy: {:.4f}, F1 Score: {:.4f}"\
          .format(epoch, val_loss/len(val_dataloader), accuracy_score(val_true, val_pred), \
            f1_score(val_true, val_pred, average = 'weighted')))

In [None]:
#plotting all the metrics

plt.figure(figsize=(10, 5))
plt.subplot(1, 3, 1)
plt.plot(T_loss, label='Training Loss')
plt.plot(V_loss, label='Validation Loss')
plt.title('Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plotting Accuracy
plt.subplot(1, 3, 2)
plt.plot(T_acc, label='Training Accuracy')
plt.plot(V_acc, label='Validation Accuracy')
plt.title('Accuracy Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Plotting F1 Score
plt.subplot(1, 3, 3)
plt.plot(T_f1, label='Training F1 Score')
plt.plot(V_f1, label='Validation F1 Score')
plt.title('F1 Score Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('F1 Score')
plt.legend()

plt.tight_layout()
plt.show()