#**Genre Classification based on Song Lyrics**

# 1. Download the Dataset
The datasets used for training and validation were downloaded from Kaggle. The following section downloads the datasets and saves them in the current colab repository. In order to do so, you need a Kaggle account and [download the Kaggle API token](https://medium.com/unpackai/how-to-use-kaggle-datasets-in-google-colab-f9b2e4b5767c). We uploaded the used datasets in our github repository, so you could also skip this part and access them from our github repository instead.

In [None]:
!pip install kaggle

In [None]:
# upload JSON file with account info from kaggle
from google.colab import files

files.upload()

In [None]:
# Make directory named kaggle and copy kaggle.json file there
! mkdir ~/.kaggle
# Choose the kaggle.json file that you downloaded
! cp kaggle.json ~/.kaggle/
# Download dataset from kaggle
! kaggle datasets download neisse/scrapped-lyrics-from-6-genres
! unzip scrapped-lyrics-from-6-genres
! rm -r scrapped-lyrics-from-6-genres.zip

# 2. Preprocessing
The training and validation dataset consists of two csv files. We first merge the two csv files. Then, we take a look at the "Genres" column. Every song has multiple genres asigned. We will first split the string that contains the multiple genres and save them in different columns ("Genre1", "Genre2"...). For the classification we will only use the "Genre1" column and select 7 genres that have the highest sample sizes. After looking closer at the dataset, we notice an imbalance in sample size between the different genres. In order to make the dataset more balanced we will swap some samples to different genres, e.g. "Rap" was added to "Hip Hop" or "Black Music" was added to "R&B". The 7 genres used for classification are: "Rock","Hip Hop","Pop","Indie","Heavy Metal","R&B", "Country". At the end, the lyrics in the dataset are cleaned (e.g. remove special characters), the labels are encoded into integers and a train-test split was created.

In [None]:
import pandas as pd
lyrics_data = pd.read_csv("/content/lyrics-data.csv")
artists_data = pd.read_csv("/content/artists-data.csv")

lyrics_data = lyrics_data.rename({"ALink":"Link"}, axis="columns")
data = lyrics_data.merge(artists_data, on='Link', how='left')
data = data.drop(["Link", "SName", "SLink", "Songs", "Popularity"], axis=1)
data = data.loc[data["language"] == "en"]
data[['Genre1', 'Genre2', 'Genre3', 'Genre4']] = data['Genres'].str.split(';', -1, expand=True)
data = data[data['Genre1'].notna()]
data = data.drop("Genres", axis=1)
data = data.reset_index().drop("index", axis=1)

other_genres = []
for genre in data['Genre1'].unique():
  freq = len(data[data["Genre1"] == genre])
  if freq <1200:
    other_genres.append(genre)

for index, row in data.iterrows():
  if (row["Genre1"] == "Pop/Rock") & (row["Genre2"] == " Pop"):
    data.loc[index, "Genre1"] = "Pop"
  
  if (row["Genre1"] == "Rap"):
    data.loc[index, "Genre1"] = "Hip Hop"

  if (row["Genre1"] == "Rock Alternativo") & (row["Genre2"] == " Indie"):
    data.loc[index, "Genre1"] = "Indie"

  if (row["Genre1"] == "Black Music") & (row["Genre2"] == " R&B"):
    data.loc[index, "Genre1"] = "R&B"

  if (row["Genre1"] == "Black Music") & (row["Genre2"] == " Soul Music"):
    data.loc[index, "Genre1"] = "R&B"

  if (row["Genre1"] == "Soul Music") & (row["Genre2"] == " R&B"):
    data.loc[index, "Genre1"] = "R&B"

  if (row["Genre1"] == "Soul Music") & (row["Genre2"] == " Pop"):
    data.loc[index, "Genre1"] = "R&B"

  if (row["Genre1"] == "Dance") & (row["Genre2"] == " Pop"):
    data.loc[index, "Genre1"] = "Pop"

  if (row["Genre1"] == "Folk") & (row["Genre2"] == " Indie"):
    data.loc[index, "Genre1"] = "Indie"

  if (row["Genre1"] == "Folk") & (row["Genre3"] == " Indie"):
    data.loc[index, "Genre1"] = "Indie"

  if (row["Genre1"] == "Romântico") & (row["Genre3"] == " R&B"):
    data.loc[index, "Genre1"] = "R&B"
  
  if (row["Genre1"] == "Pop") & (row["Genre2"] == " R&B"):
    data.loc[index, "Genre1"] = "R&B"

  if (row["Genre1"] == "Rock") & (row["Genre2"] == " Country"):
    data.loc[index, "Genre1"] = "Country" 
  
  if (row["Genre1"] == "Trilha Sonora") & (row["Genre2"] == " Country"):
    data.loc[index, "Genre1"] = "Country" 

  if (row["Genre1"] == "Folk") & (row["Genre2"] == " Country"):
    data.loc[index, "Genre1"] = "Country" 

  if row["Genre1"] in other_genres:
    data.loc[index, "Genre1"] = "Other" 
  
  ###New changes:###
  if (row["Genre1"] == "Rock") & (row["Genre2"] == " Pop"):
    data.loc[index, "Genre1"] = "Pop"  

  if (row["Artist"] == "Wilco"):
    data.loc[index, "Genre1"] = "Country" 

  if (row["Genre1"] == "Rock") & (row["Genre2"] == " Indie"):
    data.loc[index, "Genre1"] = "Indie"  

  if (row["Artist"] == "Steve Earle"):
    data.loc[index, "Genre1"] = "Country" 

  if (row["Genre1"] == "Rock") & (row["Genre2"] == " Heavy Metal"):
    data.loc[index, "Genre1"] = "Heavy Metal"  

  if (row["Genre1"] == "Pop") & (row["Genre3"] == " R&B"):
    data.loc[index, "Genre1"] = "Pop"  
  
  if (row["Artist"] in ['Ciara', 'Jeremih', 'Kehlani','Ashanti', 'Jhené Aiko', 'Ray J', 'Omarion', 'Mindless Behavior']):
        data.loc[index, "Genre1"] = "R&B"  

genre_list = ["Rock","Hip Hop","Pop","Indie","Heavy Metal","R&B", "Country"]
data = data[data["Genre1"].isin(genre_list)]

In [None]:
freq_list = []
for genre in data['Genre1'].unique():
  freq = len(data[data["Genre1"] == genre])
  freq_list.append((freq, genre))

print(sorted(freq_list, key = lambda x: x[0], reverse=True))

In [None]:
decode_labels = {}
i = 0
for label in data["Genre1"].unique():
  decode_labels[i] = label
  i += 1
encode_labels = {v: k for k, v in decode_labels.items()}

import re 

def clean_lyrics(lyrics):
    lyrics = re.sub(r"[^A-Za-z0-9']+", " ", lyrics, flags=re.MULTILINE)
    lyrics = re.sub(r"(?<=[a-zA-Z0-9]) (?=['])|(?<=[']) (?=[a-zA-Z0-9])", "", lyrics, flags=re.MULTILINE)
    lyrics = re.sub('\s+',' ',lyrics)
    return lyrics.lower().lstrip()

for index, row in data.iterrows(): #encode labels in dataset 
  label = encode_labels[row["Genre1"]]
  data.loc[index,'Label'] = label

from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(data, test_size=0.05)

#3. Build Vocab and Dataloader


In [None]:
from torch.utils.data import Dataset
import torchaudio
from torch.nn.utils.rnn import pad_sequence
import torch
from torch.utils.data import DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: int(x)


class LyricsDataset(Dataset):
  def __init__(self, dataset: pd.DataFrame):
      self.lyrics = dataset['Lyric'].tolist()
      self.labels = dataset["Label"].tolist()

  def __len__(self):
      return len(self.lyrics)

  def __getitem__(self, index: int):
      feat = clean_lyrics(self.lyrics[index])
      label = int(self.labels[index])
      
      return (label, feat)
  
  def collate_fn(batch):
      label_list = [torch.tensor(label_pipeline(b[0])) for b in batch]
      feat_list = [torch.tensor(text_pipeline(b[1])) for b in batch]

      feats = torch.nn.utils.rnn.pad_sequence(feat_list, batch_first = True)
      labels = torch.tensor(label_list, dtype=torch.int64)

      return feats.to(device), labels.to(device)

    
train_iter = LyricsDataset(dataset=train_data)
test_iter = LyricsDataset(dataset=test_data)

from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

tokenizer = get_tokenizer('basic_english')

def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<pad>","<unk>"], min_freq = 500)
vocab.set_default_index(vocab["<unk>"])

dataloader = DataLoader(train_iter, batch_size=8, shuffle=True, collate_fn=LyricsDataset.collate_fn)
test_loader = DataLoader(test_iter, batch_size=8, shuffle=False, collate_fn=LyricsDataset.collate_fn)

# 4. Train Model
An LSTM model with an embedding and linear layer is trained. Since the LSTM model tends to overfit, we added a dropout layer in the model architecture. For training, we use Adam optimizer. To further avoid overfitting, [we also added weight decay](https://medium.com/analytics-vidhya/deep-learning-basics-weight-decay-3c68eb4344e9) to it in order to keep the weights as small as possible, preventing the weights to grow out of control, and thus avoid exploding gradient. Furthermore, we chose a dynamic learning rate, that reduces by 0.5 when the validation loss plateaus. We also added early stopping when the model plateaus and save the best model checkpoints to avoid an overfitted model. After the training, the state_dict and vocabulary of the model are saved.

In [None]:
from torch import nn
from torch.nn import functional as F

embed_len = 50
hidden_dim = 75
n_layers=1
num_class = len(set([label for (label, text) in train_iter]))

class LSTMClassifier(nn.Module):
    def __init__(self):
        super(LSTMClassifier, self).__init__()
        self.embedding_layer = nn.Embedding(num_embeddings=len(vocab), embedding_dim=embed_len)
        self.lstm = nn.LSTM(input_size=embed_len, hidden_size=hidden_dim, num_layers=n_layers, batch_first=True)
        self.linear = nn.Linear(hidden_dim, num_class)
        self.dropout = nn.Dropout(0.3)

    def forward(self, X_batch):
        embeddings = self.embedding_layer(X_batch)
        hidden, carry = torch.randn(n_layers, len(X_batch), hidden_dim), torch.randn(n_layers, len(X_batch), hidden_dim)
        output, (hidden, carry) = self.lstm(embeddings, (hidden.to(device), carry.to(device)))
        output = self.dropout(output)
        return self.linear(output[:,-1])

In [None]:
lstm_classifier = LSTMClassifier().to(device)

lstm_classifier

In [None]:
for layer in lstm_classifier.children():
    print("Layer : {}".format(layer))
    print("Parameters : ")
    for param in layer.parameters():
        print(param.shape)
    print()

In [None]:
class SaveBestModel:
    """
    Class to save the best model while training. If the current epoch's 
    validation loss is less than the previous least less, then save the
    model state.
    """
    def __init__(
        self, best_valid_loss=float('inf')
    ):
        self.best_valid_loss = best_valid_loss
        
    def __call__(
        self, current_valid_loss, 
        epoch, model, optimizer, criterion
    ):
        if current_valid_loss < self.best_valid_loss:
            self.best_valid_loss = current_valid_loss
            print(f"\nBest validation loss: {self.best_valid_loss}")
            print(f"\nSaving best model for epoch: {epoch}\n")
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': criterion,
                }, '/content/best_model.pth')

save_best_model = SaveBestModel()

class EarlyStopping():
    """
    Early stopping to stop the training when the loss does not improve after
    certain epochs.
    """
    def __init__(self, patience=8, min_delta=0):
        """
        :param patience: how many epochs to wait before stopping when loss is
               not improving
        :param min_delta: minimum difference between new loss and old loss for
               new loss to be considered as an improvement
        """
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
    def __call__(self, val_loss):
        if self.best_loss == None:
            self.best_loss = val_loss
        elif self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            # reset counter if validation loss improves
            self.counter = 0
        elif self.best_loss - val_loss < self.min_delta:
            self.counter += 1
            print(f"INFO: Early stopping counter {self.counter} of {self.patience}")
            if self.counter >= self.patience:
                print('INFO: Early stopping')
                self.early_stop = True

early_stopping = EarlyStopping()

In [None]:
from tqdm import tqdm
from sklearn.metrics import accuracy_score
import gc

def CalcValLossAndAccuracy(model, loss_fn, val_loader, epoch):
    with torch.no_grad():
        Y_shuffled, Y_preds, losses = [],[],[]
        for X, Y in val_loader:
            preds = model(X)
            loss = loss_fn(preds, Y)
            losses.append(loss.item())

            Y_shuffled.append(Y)
            Y_preds.append(preds.argmax(dim=-1))

        Y_shuffled = torch.cat(Y_shuffled)
        Y_preds = torch.cat(Y_preds)

        early_stopping(torch.tensor(losses).mean())

        print("Valid Loss : {:.3f}".format(torch.tensor(losses).mean()))
        print("Valid Acc  : {:.3f}".format(accuracy_score(Y_shuffled.cpu().detach().numpy(), Y_preds.cpu().detach().numpy())))
        save_best_model(torch.tensor(losses).mean(), epoch, model, optimizer, loss_fn) # save best checkpoint added
        print('-'*50)
        scheduler.step(torch.tensor(losses).mean()) # scheduler added


def TrainModel(model, loss_fn, optimizer, train_loader, val_loader, epochs=10):
    for i in range(1, epochs+1):
        losses = []
        for X, Y in tqdm(train_loader):
            Y_preds = model(X) ## Make Predictions

            loss = loss_fn(Y_preds, Y) ## Calculate Loss
            losses.append(loss.item())

            optimizer.zero_grad() ## Clear previously calculated gradients
            loss.backward() ## Calculates Gradients
            optimizer.step() ## Update network weights.

        print("Train Loss : {:.3f}".format(torch.tensor(losses).mean()))
        CalcValLossAndAccuracy(model, loss_fn, val_loader, i)
        if early_stopping.early_stop:
          break

In [None]:
from torch.optim import Adam
from torch.optim.lr_scheduler import ExponentialLR

epochs = 100
learning_rate = 0.001#1e-3

loss_fn = nn.CrossEntropyLoss()
lstm_classifier = LSTMClassifier().to(device)
optimizer = Adam(lstm_classifier.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( 
                optimizer,
                mode='min',
                patience=2,
                factor=0.5,
                min_lr=1e-6,
                verbose=True
            ) # scheduler added

TrainModel(lstm_classifier, loss_fn, optimizer, dataloader, test_loader, epochs)

In [None]:
#torch.save(lstm_classifier.state_dict(), "lstm_model.pth")
torch.save({'epoch': epochs,
            'model_state_dict': lstm_classifier.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss_fn,}, 'final_model.pth')
torch.save(vocab, 'vocab.pth')

#5. Evaluation

In [None]:
torch.cuda.empty_cache()
gc.collect()

In [None]:
def MakePredictions(model, loader):
      Y_shuffled, Y_preds = [], []
      with torch.no_grad():
        for X, Y in loader:
            preds = model(X)
            Y_preds.append(preds)
            Y_shuffled.append(Y)
        gc.collect()
        Y_preds, Y_shuffled = torch.cat(Y_preds), torch.cat(Y_shuffled)

      return Y_shuffled.cpu().detach().numpy(), F.softmax(Y_preds, dim=-1).argmax(dim=-1).cpu().detach().numpy()

Y_actual, Y_preds = MakePredictions(lstm_classifier, test_loader)


In [None]:
target_classes = []
for index, genre in decode_labels.items():
  target_classes.append(genre)

print(target_classes)

In [None]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

print("Test Accuracy : {}".format(accuracy_score(Y_actual, Y_preds)))
print("\nClassification Report : ")
print(classification_report(Y_actual, Y_preds, target_names=target_classes))
print("\nConfusion Matrix : ")
print(confusion_matrix(Y_actual, Y_preds))

In [None]:
!pip install scikit-plot

In [None]:
from sklearn.metrics import confusion_matrix
import scikitplot as skplt
import matplotlib.pyplot as plt
import numpy as np

skplt.metrics.plot_confusion_matrix([target_classes[i] for i in Y_actual], [target_classes[i] for i in Y_preds],
                                    normalize=True,
                                    title="Confusion Matrix",
                                    cmap="Purples",
                                    hide_zeros=True,
                                    figsize=(5,5)
                                    );
plt.xticks(rotation=90);

In [None]:
plt.savefig("confusion_matrix")