First, we manage all imports for the code.

In [None]:
!pip install gensim
!pip install pandas
!pip install nltk
!pip install install -Uq bertopic
!pip install torch
!pip install matplotlib

In [None]:
#imports
import pandas as pd
import numpy as np
import random
import ast
from torch import nn
import torch
import re
import nltk
from nltk.stem.snowball import SnowballStemmer
from torch.utils.data import random_split, Subset, DataLoader
import gensim
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
import matplotlib.pyplot as plt

import zipfile
import os
import shutil

import bertopic
from sentence_transformers import SentenceTransformer

Next we process the zipped data to csv. It is stored in .zip, because it is smaller on git and then locally it is extracted to csv.

In [None]:
with zipfile.ZipFile("data/dataset.zip", "r") as zip_ref:
    zip_ref.extractall("data")

for file_name in os.listdir("data/dataset"):
    source = "data/dataset/" + file_name
    destination = "data/" + file_name
    if os.path.isfile(source): shutil.move(source, destination)

os.rmdir("data/dataset")
del source, destination

Preprocessing of the datasets, the goal is to receive a table with the following columns: Name, Lyrics, Genre<br>
df Dataset - Name, Lyrics, Genre<br>
df3 Dataset - 10000 entrys of random selectet lyrics to train doc2vec
df4 Dataset - 28k~ entrys of songs

In [None]:

df = pd.read_csv('data/Spotify-2000.csv')
df = df[['Title', 'Top Genre']] #take only the name and genre
df2 = pd.read_csv('data/spotify_millsongdata.csv')
df4 = pd.read_csv('data/tcc_ceds_music.csv')
df4 = df4[['track_name','genre','lyrics']]

df['lyrics'] = '' # add column lyrics
#Now combine those two documents by the title
found = 0

for x, title in enumerate(df['Title']):
    #print(title)
    for y, title2 in enumerate(df2['song']):
        if title2.lower() == title.lower():
            df['lyrics'].iloc[x] = df2['text'].iloc[y]

print("finished combining..")

Collect 40000 random entrys of lyrics from the millsongdata Dataset

In [None]:
df3 = df2['text'].sample(n=40000)
#print(df3)
print('done')

In [None]:
#remove songs which were not in both datsets
songs_to_remove = []
for x, lyrics in enumerate(df['lyrics']):
    if lyrics == '':
        songs_to_remove.append(x)
df.drop(songs_to_remove, axis = 0, inplace = True)

print(df)

Further preprocessing of the lyrics itself. We remove the stopwords and punctuations with regex and stopwords from ntlk form df and df3

In [None]:
from nltk.corpus import stopwords
nltk.download('stopwords')
stemmer = SnowballStemmer("english")
sw = stopwords.words('english')
punc_regex = r'[^\s\w]' #searches for everything thats not a word or space
stopword_regex = r'\b{0}\b'
space_regex = r'\s\s+'
newl_regex = r'\n|\r'

#print(df3.iloc[1])

#preprocessing of df
for x, lyrics in enumerate(df['lyrics']):
    txt = lyrics
    txt = re.sub(punc_regex, '', txt) #remove punuctuations
    for sword in sw:
        txt = re.sub(stopword_regex.format(sword), '', txt, flags=re.IGNORECASE) #remove every stopword
    txt = re.sub(newl_regex, '', txt)
    stemmed_txt = ''
    for word in txt.split():
        stemmed_txt += str(stemmer.stem(word.lower())) + " "
    
    df['lyrics'].iloc[x] = stemmed_txt

print(df)
print('finished preprocessing of df')

In [None]:
#preprocessing of df3 -> dataset for doc2vec training

preprocess = False

if(preprocess):
    for x, lyrics in enumerate(df3):
        txt = lyrics
        txt = re.sub(punc_regex, '', txt) #remove punuctuations
        for sword in sw:
            txt = re.sub(stopword_regex.format(sword), '', txt, flags=re.IGNORECASE) #remove every stopword
        txt = re.sub(newl_regex, '', txt)
        stemmed_txt = ''
        for word in txt.split():
            stemmed_txt += str(stemmer.stem(word.lower())) + " "

        df3.iloc[x] = stemmed_txt
    os.remove('data/doc2vec_training_data.csv')
    df3.to_csv('data/doc2vec_training_data.csv', index=False)
else:
    df3 = pd.read_csv('data/doc2vec_training_data.csv')

print(df3)
print('finished preprocessing of df3')


In [None]:
#preprocessing of df4
for x, lyrics in enumerate(df4['lyrics']):
    txt = lyrics
    txt = re.sub(punc_regex, '', txt) #remove punuctuations
    for sword in sw:
        txt = re.sub(stopword_regex.format(sword), '', txt, flags=re.IGNORECASE) #remove every stopword
    
    txt = re.sub(newl_regex, '', txt)
    stemmed_txt = ''
    for word in txt.split():
        stemmed_txt += str(stemmer.stem(word.lower())) + " "
    df4['lyrics'].iloc[x] = stemmed_txt

print(df4)
print('finished preprocessing of df4')

In [None]:
sentence_model = SentenceTransformer("all-MiniLM-L6-v2")

#display(df)

topic_model = bertopic.BERTopic(language='english', embedding_model=sentence_model,verbose=True)

topics, probs = topic_model.fit_transform(df["lyrics"])

print(topics)

In [None]:
print(topic_model.get_topic_info())

print(topic_model.get_topics()[1])

Tokenize the lyrics and create tagged Documents

In [None]:
tagged_data = []
nltk.download('punkt')

for i,d in enumerate(df3):
    tokenized_words = nltk.tokenize.word_tokenize(d)
    tagged_data.append(TaggedDocument(words=tokenized_words, tags=str(i)))

print(len(tagged_data))

Setting up the Doc2Vec model

In [None]:
doc2vec_model = gensim.models.doc2vec.Doc2Vec(vector_size=100, min_count=1, epochs=30)
doc2vec_model.build_vocab(tagged_data)
doc2vec_model.train(tagged_data, total_examples=doc2vec_model.corpus_count, epochs=doc2vec_model.epochs)

print("Doc2Vec model finished training")

Building the DataLoader for the machine learning model

In [None]:
columns = ['Token'] + ['Target']
df_for_dataloader = pd.DataFrame(columns = columns)
df_for_dataloader.set_index(columns)

lookup_dict = {}
token_list = []
target_list = []
i = 0
for d in df4["genre"]:
    if d not in lookup_dict:
        lookup_dict[d] = i
        i+=1

for index, df_row in df4.iterrows():
    lyrics_tokenized = nltk.tokenize.word_tokenize(df_row["lyrics"])
    token = [doc2vec_model.infer_vector(lyrics_tokenized)]
    #print(token)
    one_hot_encoded_vector = []
    for x in lookup_dict.keys():
        if df_row["genre"] == x:
            one_hot_encoded_vector.append(1)
        else:
            one_hot_encoded_vector.append(0)
    target = [np.array(one_hot_encoded_vector)] # should be genre
    #row = pd.DataFrame([token + target], columns=['Token', 'Target'])
    #df_for_dataloader = pd.concat([df_for_dataloader, row])
    token_list.append(token)
    target_list.append(target)


Splitting the dataset into train and test (80,20)

In [None]:
data_train, data_test = torch.utils.data.random_split(list(zip(token_list, target_list)), [0.8, 0.2])
print(len(data_train))
print(len(data_test))
    
dataloader_train = torch.utils.data.DataLoader(data_train, batch_size=32)
dataloader_test = torch.utils.data.DataLoader(data_test, batch_size=32)

In [None]:
genre_test_count_dict = {}
for i in data_test:
  if np.argmax(i[1]) in genre_test_count_dict:
    genre_test_count_dict[np.argmax(i[1])] += 1
  else:
    genre_test_count_dict[np.argmax(i[1])] = 1

genre_train_count_dict = {}
for i in data_train:
  if np.argmax(i[1]) in genre_train_count_dict:
    genre_train_count_dict[np.argmax(i[1])] += 1
  else:
    genre_train_count_dict[np.argmax(i[1])] = 1

for i in genre_test_count_dict:
  print(i, genre_test_count_dict[i])

lookup_dict_reverse = {y: x for x, y in lookup_dict.items()}

We define out neurol network model.

In [None]:
device = torch.device('cpu') #torch.device('cuda' if torch.cuda.is_available() else 'cpu') For faster training in the end

class SongModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(SongModel, self).__init__()
        self.input_size = input_size
        self.hidden_size = input_size
        self.output_size = output_size
        self.layer_1 =   nn.Linear(self.input_size, self.hidden_size)
        self.layer_2 =   nn.Linear(self.hidden_size, self.hidden_size)
        self.layer_out = nn.Linear(self.hidden_size, self.output_size)
        self.softmax =   nn.Softmax()
        self.sigmoid =   nn.Sigmoid()
        self.tanh =      nn.Tanh()

        nn.init.xavier_uniform_(self.layer_1.weight)
        nn.init.zeros_(self.layer_1.bias)
        nn.init.xavier_uniform_(self.layer_out.weight)
        nn.init.zeros_(self.layer_out.bias)


    def forward(self, d):
        x = self.sigmoid(self.layer_1(d))
        x = self.softmax(self.layer_out(x))
        return x

model = SongModel(dataloader_train.dataset[0][0][0].size, dataloader_train.dataset[0][1][0].size).to(device)
optimizer = torch.optim.Adagrad(model.parameters(), lr=0.01)
loss_fn = torch.nn.CrossEntropyLoss()

Define lists to save the accuracy and loss in each iteration to plot them later.

In [None]:
acc_train_normal_model = []
acc_test_normal_model = []
loss_train_normal_model = []
loss_test_normal_model = []
genre_acc_train_normal_model = []
genre_acc_test_normal_model = []
for i in range(len(lookup_dict)):
    genre_acc_train_normal_model.append([])
    genre_acc_test_normal_model.append([])

Here is the training and test function for the model.

In [None]:
def train(model, dataloader, optimizer,loss_fn):
    total_loss = 0
    acc = 0
    genre_accuracy = {}
    for x, key in enumerate(lookup_dict):
        genre_accuracy[x] = 0

    for sample in dataloader:
        model_input = sample[0][0]
        should = sample[1][0]
        predict = model(model_input)
        for sample_index in range(len(should)):
            predict_e = predict[sample_index].data.numpy()
            should_e = should[sample_index].numpy()
            if np.where(predict_e == np.amax(predict_e))[0][0] == np.where(should_e == np.amax(should_e))[0][0]:
                acc += 1
                genre_accuracy[np.where(should_e == np.amax(should_e))[0][0]] += 1
        loss = loss_fn(predict, should.float())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss
    loss_train_normal_model.append(total_loss.detach().numpy() / len(dataloader.dataset))
    acc_train_normal_model.append(acc * 100 / len(dataloader.dataset))
    print(f"\tTrain - \tLoss: {loss_train_normal_model[-1]:3.10}, Acc: {acc_train_normal_model[-1]:3.5}%", end="\n")
    for (genre, id_genre) in lookup_dict.items():
        genre_acc_train_normal_model[id_genre].append(round(genre_accuracy[lookup_dict[genre]] * 100 / genre_train_count_dict[id_genre],3))

In [None]:
def test(model, dataloader, loss_fn):
    total_loss = 0
    acc = 0
    genre_accuracy = {}
    for x, key in enumerate(lookup_dict):
        genre_accuracy[x] = 0

    for sample in dataloader:
        model_input = sample[0][0]
        should = sample[1][0]
        predict = model(model_input)
        for sample_index in range(len(should)):
            predict_e = predict[sample_index].data.numpy()
            should_e = should[sample_index].numpy()
            if np.where(predict_e == np.amax(predict_e))[0][0] == np.where(should_e == np.amax(should_e))[0][0]:
                acc += 1
                genre_accuracy[np.where(should_e == np.amax(should_e))[0][0]] += 1

        loss = loss_fn(predict, should.float())
        total_loss += loss

    loss_test_normal_model.append(total_loss.detach().numpy() / len(dataloader.dataset))
    acc_test_normal_model.append(acc * 100 / len(dataloader.dataset))
    print(f"\tEvaluation - \tLoss: {loss_test_normal_model[-1]:3.10}, Acc: {acc_test_normal_model[-1]:3.5}%", end="\n")
    for (genre, id_genre) in lookup_dict.items():
        genre_acc_test_normal_model[id_genre].append(round(genre_accuracy[lookup_dict[genre]] * 100 / genre_test_count_dict[id_genre],3))

In [None]:
epochs = 200
for t in range(epochs):
    print("Epoch " + str(t) + ":")
    train(model, dataloader_train, optimizer, loss_fn)
    test(model, dataloader_test, loss_fn)
    torch.save(model.state_dict(), "model_test")

Here it is possible to save or load models

In [None]:
torch.save(model.state_dict(), "model_40p_train_test")

In [None]:
model.load_state_dict(torch.load("model_40p_train_test"))
model.eval()

Now we plot our results.

In [None]:
ax_array = []

fig1, ax1 = plt.subplots(1,1)
ax1.semilogx(range(epochs), acc_train_normal_model, label="Train")
ax1.semilogx(range(epochs), acc_test_normal_model, label="Test")
ax1.set_title("Global accuracy")
ax_array.append(ax1)

fig2, ax2 = plt.subplots(1,1)
ax2.semilogx(range(epochs), loss_train_normal_model, label="Train")
ax2.semilogx(range(epochs), loss_test_normal_model, label="Test")
ax2.set_title("Global loss")
ax_array.append(ax2)


fig3, ax3 = plt.subplots(1,1)
for i in range(len(genre_acc_train_normal_model)):
    ax3.semilogx(range(epochs), genre_acc_train_normal_model[i], label=lookup_dict_reverse[i])

fig4, ax4 = plt.subplots(1,1)
for i in range(len(genre_acc_test_normal_model)):
    ax4.semilogx(range(epochs), genre_acc_test_normal_model[i], label=lookup_dict_reverse[i])

ax3.set_title("Train Genre Accuracy")
ax_array.append(ax3)

ax4.set_title("Test Genre Accuracy")
ax_array.append(ax4)

for ax_p in ax_array:
    box = ax_p.get_position()
    ax_p.set_position([box.x0, box.y0 + box.height * 0.1, box.width, box.height * 0.9])
    ax_p.legend(loc='upper center', bbox_to_anchor=(0.5, -0.08), fancybox=True, shadow=True, ncol=5)

if not os.path.isdir("plots"): os.mkdir("plots")
fig1.savefig("plots/global_accuracy.png")
fig2.savefig("plots/global_loss.png")
fig3.savefig("plots/train_genre_accuracy.png")
fig4.savefig("plots/test_genre_accuracy.png")