In [112]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
        
from tqdm import tqdm
from sklearn.utils import resample

from nltk.corpus import wordnet
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
import string
import re

import nltk
nltk.download('wordnet')

import subprocess

# Download and unzip wordnet
try:
    nltk.data.find('wordnet.zip')
except:
    nltk.download('wordnet', download_dir='/kaggle/working/')
    command = "unzip /kaggle/working/corpora/wordnet.zip -d /kaggle/working/corpora"
    subprocess.run(command.split())
    nltk.data.path.append('/kaggle/working/')
    
from nltk.corpus import wordnet

/kaggle/input/genius-song-lyrics-with-language-information/song_lyrics.csv
/kaggle/input/lyrics/lyrics_data.csv
[nltk_data] Downloading package wordnet to /usr/share/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package wordnet to /kaggle/working/...
[nltk_data]   Package wordnet is already up-to-date!
Archive:  /kaggle/working/corpora/wordnet.zip


replace /kaggle/working/corpora/wordnet/lexnames? [y]es, [n]o, [A]ll, [N]one, [r]ename:  NULL
(EOF or read error, treating as "[N]one" ...)


In [None]:
data_path = "/kaggle/input/genius-song-lyrics-with-language-information/song_lyrics.csv"
chunks = pd.read_csv(data_path, iterator=True, chunksize=1000)
desired_size = 3500

new_data = pd.DataFrame()
for data in tqdm(chunks):
    data_filtered = data[data.language == "en"][["title", "artist", "tag", "lyrics"]]
    
    new_data = pd.concat([new_data, data_filtered])
    
    # limit the maximum examples because of the computational power limit
    if min(new_data.tag.value_counts()) >= desired_size:
        break


undersampled_data = pd.DataFrame()

for tag in set(new_data.tag):
    class_df = new_data[new_data.tag == tag]
    
    undersampled = resample(class_df, replace=False, n_samples=desired_size, random_state=42)
    undersampled_data = pd.concat([undersampled_data, undersampled])

In [None]:
undersampled_data = undersampled_data.sample(frac=1).reset_index(drop=True)
print(undersampled_data.tag.value_counts())

undersampled_data.head()

#  Data Processing

Convert lowercase, remove extra information provided by data source, lemmatize and remove punctuations

In [None]:
undersampled_data.lyrics = undersampled_data.lyrics.str.lower()
def handleNewLine(text):
    new_line_idxs = [match.start() for match in re.finditer(r'\n', text)]
    
    lines = []
    for idx in range(0, len(new_line_idxs)-1):
        startIndex = new_line_idxs[idx]
        endIndex = new_line_idxs[idx+1]
        line = text[startIndex:endIndex]
        line = line.split("\n")[1]
        
        if len(line)>0:
            lines.append(line)
    
    return ' \n '.join(lines)

undersampled_data['lyrics'] = undersampled_data['lyrics'].apply(handleNewLine)

In [None]:
# Remove extra notes
undersampled_data['lyrics'] = undersampled_data['lyrics'].str.replace(r'\[.*?\]', '', regex=True)
undersampled_data['lyrics'] = undersampled_data['lyrics'].str.replace(r'\([^)]*\)', '', regex=True)

# Remove punctuations from the lyrics column except new line (\n)
punctuation = re.compile(r'[^\w\s\n]+')
undersampled_data['lyrics'] = undersampled_data['lyrics'].apply(lambda x: punctuation.sub('', x).strip())
undersampled_data['lyrics'] = undersampled_data['lyrics'].str.replace(" \n  \n ", " \n ")

random_song = np.random.randint(0, len(undersampled_data))
undersampled_data.lyrics.iloc[random_song], undersampled_data.tag.iloc[random_song]

In [None]:
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()

for index, row in tqdm(undersampled_data.iterrows()):
    text = row['lyrics']
    lemmatized_text = []
    
    # Lemmatize the text
    for word in text.split(" "):
        if word not in stop_words or word == "\n":
            if word == "\n":
                lemmatized_text.append(word)
            else:
                lemmatized_text.append(lemmatizer.lemmatize(word))
    
    txt = ' '.join(lemmatized_text)
    undersampled_data.loc[index, 'lyrics'] = " \n " + txt

In [None]:
undersampled_data.reset_index(inplace=True, drop=True)
undersampled_data.lyrics.iloc[random_song], undersampled_data.tag.iloc[random_song]

## Split each line

In [None]:
def split_lyrics(lyric):
  """Splits a lyric into a list of lines."""
  lines = []
  try:
      splt = lyric.split("\n")
      for line in splt:
        line = line.strip()

        if len(line) > 1:
          lines.append(line)

      return lines
  except:
        return None

undersampled_data["lines"] = undersampled_data.lyrics.apply(split_lyrics)
undersampled_data.reset_index(inplace=True, drop=True)

undersampled_data.sample(5)

## Generate Tokens

In [None]:
def split_tokens(lines):
    tokens = []
    
    try:
        for line in lines:
            tokens.append(word_tokenize(line))

        return tokens
    except:
        return None

undersampled_data["tokens"] = undersampled_data.lines.apply(split_tokens)
undersampled_data.reset_index(inplace=True, drop=True)

### Save the file

In [None]:
undersampled_data.to_csv("lyrics_data.csv")

# HAN Model

In [113]:
!pip install torchtext



In [114]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn.functional as F

from torchtext.vocab import GloVe
import torchtext
from torchtext.data import get_tokenizer
from torchtext import data
from torchtext import vocab
from nltk.stem.porter import PorterStemmer
from torch.nn.utils.rnn import pad_sequence


import spacy
spacy_en = spacy.load('en_core_web_sm')

nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to /usr/share/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /usr/share/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [115]:
dataset = pd.read_csv("/kaggle/input/lyrics/lyrics_data.csv")

In [None]:
def tokenize_document(doc):
    txt = doc.replace('"', '').replace("'", "").replace("\n", "") 
    txt = re.sub(r'\s+', ' ', txt).strip()
    
    return txt.split()

In [116]:
class WordAttention(nn.Module):
  def __init__(self, hidden_size, embedding_dim):
    super().__init__()

    self.lin1 = nn.Linear(hidden_size, hidden_size)
    self.lin2 = nn.Linear(hidden_size, 1, bias=False)

  def forward(self, x):
    u = torch.tanh(self.lin1(x))
    attention = F.softmax(self.lin2(x), dim=1)

    output = torch.sum(
        attention * x, dim=1
    )

    return attention, output

class SentenceAttention(nn.Module):
  def __init__(self, hidden_size, embedding_dim):
    super().__init__()

    self.lin1 = nn.Linear(hidden_size, hidden_size)
    self.lin2 = nn.Linear(hidden_size, 1, bias=False)

  def forward(self, x):
    u = torch.tanh(self.lin1(x))
    attention = F.softmax(self.lin2(x), dim=1)

    output = torch.sum(
        attention * x, dim=1
    )

    return attention, output

class WordEncoder(nn.Module):
  def __init__(self, corpus_size, embedding_dim, hidden_size, load_embed=False, weights_matrix=None, trainable_embedding=False):
    super().__init__()

    self.embedding = nn.Embedding(corpus_size, embedding_dim)

    if load_embed and weights_matrix is not None:
      self.embedding.load_state_dict({'weight': torch.tensor(weights_matrix)})

    self.embedding.weight.requires_grad = trainable_embedding

    self.gru = nn.GRU(embedding_dim, hidden_size, 2, dropout=0.3, bidirectional=True, batch_first=True)
    self.attention = WordAttention(hidden_size*2, embedding_dim)

  def forward(self, x):
    embeddings = self.embedding(x)
    out, hidden = self.gru(embeddings)
    attention, out = self.attention(out)

    return out

class HAN(nn.Module):
  def __init__(self, corpus_size, embedding_dim, hidden_size, class_count, load_embed=False, weights_matrix=None, trainable_embedding=False):
    super().__init__()
    self.class_count = class_count
    
    self.wordEncoder = WordEncoder(corpus_size=corpus_size, embedding_dim=embedding_dim, hidden_size=50, load_embed=True, weights_matrix=weights_matrix, trainable_embedding=True)

    self.sentGRU = nn.GRU(hidden_size*2, hidden_size, bidirectional=True, batch_first=True)
    self.sentence_attention = SentenceAttention(hidden_size * 2, hidden_size)

    self.softmax = nn.Softmax()
    self.classifier = nn.Linear(hidden_size*2, self.class_count)

  def forward(self, x):
    word_output = self.wordEncoder(x) # 16, 100

    sent_out, _ = self.sentGRU(word_output.unsqueeze(1)) # 16, 100
    _, sent_output = self.sentence_attention(sent_out)

    return self.softmax(self.classifier(sent_output))

In [117]:
embedding_dim = 100
global_vectors = GloVe(name='6B', dim=embedding_dim) # 42B, 840B

In [118]:
unique_words = list()
stemmer = PorterStemmer()
X = []
for lyric in tqdm(list(dataset.lyrics)):
    lyric = lyric.replace(" \n ", " ").strip()
    token_list = lyric.split()
    
    filtered_tokens = []
    for token in token_list:
        token = stemmer.stem(token)
        filtered_tokens.append(token)
        
        if not token in unique_words:
            unique_words.append(token)
    X.append(' '.join(filtered_tokens))
    
unique_words = list(unique_words)
print("Corpus size:", len(unique_words))

100%|██████████| 21258/21258 [17:29<00:00, 20.25it/s] 

Corpus size: 105705





In [119]:
unique_words[:3]

['shall', 'absurd', 'heart']

In [120]:
corpus_size = len(unique_words)
weights_matrix = np.zeros((corpus_size, embedding_dim))

found_word = 0
for i, word in enumerate(unique_words):
  word_vector = global_vectors.get_vecs_by_tokens(word)

  if word_vector.sum().item() == '0':
    weights_matrix[i] = np.random.normal(scale=0.6, size=(embedding_dim, ))
  else:
    weights_matrix[i] = word_vector
    found_word += 1
    
print(found_word)

105705


In [121]:
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore')
y = enc.fit_transform(np.array(dataset.tag).reshape(-1, 1)).toarray()

In [122]:
class CustomDataset(Dataset):
    def __init__(self, X, y, unique_words, weights_matrix):
        self.X = X
        self.y = y
        self.unique_words = unique_words
        self.weights_matrix = weights_matrix
        
        self.stemmer = PorterStemmer()
        
    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        sentence = self.X[idx]
        label = self.y[idx]

        indices = [self.unique_words.index(word)+1 for word in sentence.split()]

        return {
            'input': torch.tensor(indices, dtype=torch.long),
            'label': torch.tensor(label, dtype=torch.float)
        }

def collate_fn(batch):
    inputs = [item['input'] for item in batch]
    labels = [item['label'] for item in batch]

    inputs_padded = pad_sequence(inputs, batch_first=True, padding_value=0)

    return {
        'input': inputs_padded,
        'label': torch.stack(labels)
    }

In [132]:
batch_size = 128

custom_dataset = CustomDataset(X=X, y=y, unique_words=unique_words, weights_matrix=weights_matrix)
train_size = int(0.75 * len(custom_dataset))
test_size = len(custom_dataset) - train_size

train_dataset, test_dataset = random_split(custom_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False, collate_fn=collate_fn)

In [133]:
def train(model, optim, loss_fn, epochs=50, print_loss=False):
  for epoch in range(50):
    epoch_loss = 0

    model.train()
    for i in tqdm(train_loader):
      optim.zero_grad()

      output = model(i["input"])
      target = i["label"]
      # target = target.unsqueeze(1)

      loss = loss_fn(target, output)
      epoch_loss += loss.item()

      loss.backward()
      optim.step()

    model.eval()
    for i in test_loader:
      output = model(i["input"])
      target = i["label"]
      # target = target.unsqueeze(1)

      l = loss_fn(target, output)

    if print_loss:
      if epoch % 5 == 0:
        print("Epoch loss:", round(epoch_loss/len(train_loader), 4))
        print("Eval Loss:", round(l.item(), 4))

  print("Eval Loss:", round(l.item(), 4))
  return model

In [None]:
corpus_size, embedding_dim = weights_matrix.shape

han_model = HAN(corpus_size=corpus_size, embedding_dim=embedding_dim, hidden_size=10, class_count=len(set(dataset.tag)),
                load_embed=True, weights_matrix=weights_matrix, trainable_embedding=False)
optim =  torch.optim.Adam(han_model.parameters(), 0.001)
loss_fn = torch.nn.CrossEntropyLoss()
han_model = train(han_model, optim, loss_fn, epochs=10, print_loss=True)