# Lyrics to genres multi-label classification

### Data loading and preparing

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import keras.backend as tfb
from keras.utils.generic_utils import get_custom_objects
from sklearn.metrics import multilabel_confusion_matrix

In [None]:
# data = pd.read_csv(r"data_cleaned/final.csv")
data = pd.read_csv(r"data_cleaned/data_final2_equalized.csv")

In [None]:
data.head()

### Unique genre labels

In [None]:
unique_genres = set()

for row in data.genre:
    for item in row.split(","):
        unique_genres.add(item)

unique_genres = list(unique_genres)
print(len(unique_genres))
print(unique_genres)

In [None]:
def drop_genres(genres_list):
    new_genres = []
    
    for genre in genres_list.split(","):
        if genres[genre] >= 930:
            new_genres.append(genre)
    
    return ','.join(new_genres) 


def get_new_genres(df):
    genres = dict()
    
    for row in df.genre:
        for item in row.split(","):
            if item in genres:
                genres[item] += 1
            else:
                genres[item] = 1
    
    return genres

In [None]:
genres = get_new_genres(data)

data["genre"] = data["genre"].apply(lambda x: drop_genres(x))
data = data.drop(data[data["genre"] == ""].index)

### Dropping songs where length of lyrics is less than 50 words

In [None]:
data = data.drop(data[data["lyrics"].map(lambda x: len(x.split())) < 100].index)

### Number of genre occurances

In [None]:
genres = get_new_genres(data)

plt.figure(dpi=250)
plt.bar(genres.keys(), genres.values())
plt.title("Number of genre occurances")
plt.ylabel('Number of occurances')
plt.xlabel('Genre')
plt.xticks(list(genres.keys()), rotation=75, fontsize=3)
plt.show()

### Creating vectors with encoded genre labels

In [None]:
def set_labels(x):
    labels = np.array([0 for i in range(len(unique_genres))])

    for item in x.split(","):
        labels[unique_genres.index(item)] = 1
    
    return np.array(labels)

In [None]:
data["genre"] = data["genre"].apply(lambda x: x.replace("\n"," "))
data["labels"] = data["genre"].apply(lambda x: set_labels(x))

In [None]:
data.head()

### Clearing lyrics

In [None]:
def clean_text(text):
    text = text.lower()
    text = re.sub(r"can't", "can not", text)
    text = re.sub(r"n\'t", " not", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'s", " is", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'t", " not", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'m", " am", text)
    text = re.sub(r"\'scuse", " excuse ", text)
    text = re.sub('\W', ' ', text)
    text = re.sub('\s+', ' ', text)
    text = text.strip(' ')
    return text

def remove_punctuation(text): 
    text = re.sub(r'[?|!|\'|"|#]', r'',text)
    text = re.sub(r'[.|,|)|(|\|/]', r' ',text)
    text = text.strip()
    text = text.replace("\n"," ")
    return text

def remove_numbers(text):
    result = ""
    
    for word in text.split():
        alpha_word = re.sub('[^a-z A-Z]+', '', word)
        result += alpha_word
        result += " "
    result = result.strip()
    
    return result

def ultimate_text_cleaning(text):
    text = clean_text(text)
    text = remove_punctuation(text)
    text = remove_numbers(text)
    text = re.sub('\s+', ' ', text)
    text = text.strip(' ')
    return text

In [None]:
data["lyrics"] = data["lyrics"].apply(lambda x: ultimate_text_cleaning(x))

In [None]:
data.head()

In [None]:
sample_lyrics = [i for i in data["lyrics"][:3]]
print(sample_lyrics)

### Checkpoint

In [None]:
# data.to_csv(r"data_cleaned/final_cleaned_labeled2.csv", index=False)

In [None]:
data = pd.read_csv(r"data_cleaned/final_cleaned_labeled2.csv")

In [None]:
def set_labels(x):
    labels = np.array([0 for i in range(len(unique_genres))])

    for item in x.split(","):
        labels[unique_genres.index(item)] = 1
    
    return np.array(labels)

In [None]:
unique_genres = set()

for row in data.genre:
    for item in row.split(","):
        unique_genres.add(item)

unique_genres = list(unique_genres)

data["genre"] = data["genre"].apply(lambda x: x.replace("\n"," "))
data["labels"] = data["genre"].apply(lambda x: set_labels(x))

### Splitting dataset to test and train 

In [None]:
x_train, x_test, y_train, y_test = train_test_split(data["lyrics"], data["labels"], test_size=0.2, shuffle=True)

In [None]:
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)

In [None]:
y_train = np.stack(y_train.values).astype('float32')
y_test = np.stack(y_test.values).astype('float32')

### Tokenization

Check statistics for  maxlen

In [None]:
def calculate_words_num(x):
  x = x.split(" ")
  return len(x)

print(data["lyrics"].apply(calculate_words_num).mean())
print(data["lyrics"].apply(calculate_words_num).median())
print(data["lyrics"].apply(calculate_words_num).max())
print(data["lyrics"].apply(calculate_words_num).min())

In [None]:
max_words = 100_000
maxlen = 200
output_dim = 64

In [None]:
tokenizer = Tokenizer(lower=True, num_words=max_words)
tokenizer.fit_on_texts(list(x_train) + list(x_test))

x_train = tokenizer.texts_to_sequences(x_train)
x_test = tokenizer.texts_to_sequences(x_test)

x_train = pad_sequences(x_train, maxlen=maxlen)
x_test = pad_sequences(x_test, maxlen=maxlen)

Check shapes

In [None]:
print(x_train.shape)
print(x_test.shape)
print(y_train.shape)
print(y_test.shape)

### CNN with embedding model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Embedding, GlobalMaxPool1D, MaxPooling1D, Activation, Dropout, Conv1D, Flatten

In [None]:
unique_genres = set()

for row in data.genre:
    for item in row.split(","):
        unique_genres.add(item)

num_classes = len(unique_genres)
print(num_classes)

In [None]:
model = Sequential()
model.add(Embedding(input_dim=max_words, output_dim=output_dim, input_length=maxlen))
model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
model.add(MaxPooling1D())
model.add(Dropout(0.3))
model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
model.add(GlobalMaxPool1D())
model.add(Dense(100, kernel_regularizer='l1_l2'))
model.add(Dropout(0.5))
model.add(Activation('relu'))
model.add(Dense(num_classes, name="output"))
model.add(Activation('sigmoid'))

In [None]:
model.summary()

### Custom loss function to give more weigth for 1's (because of multilabeling)

In [None]:
POS_WEIGHT = 10  # multiplier for positive targets, needs to be tuned

def weighted_binary_crossentropy(target, output):
    """
    Weighted binary crossentropy between an output tensor
    and a target tensor. POS_WEIGHT is used as a multiplier
    for the positive targets.

    Combination of the following functions:
    * keras.losses.binary_crossentropy
    * keras.backend.tensorflow_backend.binary_crossentropy
    * tf.nn.weighted_cross_entropy_with_logits
    """
    # transform back to logits
    _epsilon = tfb._to_tensor(tfb.epsilon(), output.dtype.base_dtype)
    output = tf.clip_by_value(output, _epsilon, 1 - _epsilon)
    output = tf.math.log(output / (1 - output))
    # compute weighted loss
    target = tf.cast(target, tf.float32)
    loss = tf.nn.weighted_cross_entropy_with_logits(labels=target,
                                                    logits=output,
                                                    pos_weight=POS_WEIGHT)
    return tf.reduce_mean(loss, axis=-1)


get_custom_objects().update({"weighted_binary_crossentropy": weighted_binary_crossentropy})

In [None]:
def f1_score(y_true, y_logit):
    """
    Calculate F1 score
    y_true: true value
    y_logit: predicted value
    """
    true_positives = tfb.sum(tfb.round(tfb.clip(y_true * y_logit, 0, 1)))
    possible_positives = tfb.sum(tfb.round(tfb.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + tfb.epsilon())
    predicted_positives = tfb.sum(tfb.round(tfb.clip(y_logit, 0, 1)))
    precision = true_positives / (predicted_positives + tfb.epsilon())
    return (2 * precision * recall) / (precision + recall + tfb.epsilon())

In [None]:
# model.compile(optimizer="adam", 
#               loss=tf.nn.sigmoid_cross_entropy_with_logits, 
#               metrics=['accuracy', f1_score, tf.keras.metrics.AUC()])

model.compile(optimizer="adam", 
              loss="binary_crossentropy", 
              metrics=['accuracy', f1_score, tf.keras.metrics.AUC()])

# model.compile(optimizer="adam", 
#               loss="weighted_binary_crossentropy", 
#               metrics=['accuracy', f1_score, tf.keras.metrics.AUC()])

### Train

In [None]:
batch_size = 64
epochs = 50

In [None]:
hist = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test))

### Check metrics history

In [None]:
hist.history.keys()

Loss

In [None]:
plt.title('Loss')
plt.plot(hist.history['loss'], label='loss')
plt.plot(hist.history['val_loss'], label='val_loss')
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

Categorical accuracy

In [None]:
plt.title("Accuracy")
plt.plot(hist.history["accuracy"], label="accuracy")
plt.plot(hist.history["val_accuracy"], label="val_accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
plt.title("F1 score")
plt.plot(hist.history["f1_score"], label="f1_score")
plt.plot(hist.history["val_f1_score"], label="val_f1_score")
plt.xlabel("Epochs")
plt.ylabel("F1 score")
plt.legend()
plt.show()

### Evaluate

In [None]:
model.evaluate(x_test)

### Test

In [None]:
k = 4

indexes_pred = np.argwhere(y_pred[k] >= 0.5)
indexes_true = np.argwhere(y_test[k] == 1)

indexes_pred.reshape((1,-1))
indexes_true.reshape((1,-1))

for i in indexes_pred:
  print(list(unique_genres)[i[0]])

print(y_pred[k])
print("\n")

for i in indexes_true:
  print(list(unique_genres)[i[0]])

print(y_test[k])

### Save/Load model

In [None]:
model.save(r'models/model_cnn.h5')


In [None]:
new_model = tf.keras.models.load_model(r'models/model_cnn.h5')


### LSTM model

In [None]:
from keras.layers import LSTM, Bidirectional, SpatialDropout1D

In [None]:
model = Sequential()
model.add(Embedding(input_dim=max_words, output_dim=output_dim, input_length=maxlen))
model.add(SpatialDropout1D(0.3))
model.add(Bidirectional(LSTM(units=32, return_sequences=True, activation='tanh', recurrent_activation='sigmoid', 
                             recurrent_dropout=0.0, dropout=0.5, kernel_initializer='glorot_uniform'),
	                      merge_mode='concat'))
model.add(Bidirectional(LSTM(units=32, return_sequences=True, activation='tanh', recurrent_activation='sigmoid', 
                             recurrent_dropout=0.0, dropout=0.5, kernel_initializer='glorot_uniform'),
	                      merge_mode='concat'))
model.add(Dropout(0.3)) 
model.add(GlobalMaxPool1D())
model.add(Dense(num_classes))
model.add(Activation('sigmoid'))

In [None]:
model.summary()

In [None]:
model.compile(optimizer="adam", 
              loss="weighted_binary_crossentropy", 
              metrics=['accuracy', f1_score, tf.keras.metrics.AUC()])

In [None]:
batch_size = 64
epochs = 20

In [None]:
hist = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test))

### Save/Load

In [None]:
model.save(r'models/model_lstm.h5')

In [None]:
new_model = tf.keras.models.load_model(r'models/model_cnn.h5')

### Check

In [None]:
model.evaluate(x_test)

In [None]:
y_pred = model.predict(x_test)

In [None]:
k = 4

indexes_pred = np.argwhere(y_pred[k] >= 0.9)
indexes_true = np.argwhere(y_test[k] == 1)

indexes_pred.reshape((1,-1))
indexes_true.reshape((1,-1))

for i in indexes_pred:
  print(list(unique_genres)[i[0]])

print(y_pred[k])
print("\n")

for i in indexes_true:
  print(list(unique_genres)[i[0]])

print(y_test[k])

### Fasttext embedding

In [None]:
from gensim.models import KeyedVectors
import numpy as np
import tensorflow as tf

In [None]:

fasttext = KeyedVectors.load_word2vec_format(r'pretrained_embeddings/crawl-300d-2M-subword.vec', binary=False, encoding='utf8')


In [None]:
x_train, x_test, y_train, y_test = train_test_split(data["lyrics"], data["labels"], test_size=0.2, shuffle=True)
y_train = np.stack(y_train.values).astype('float32')
y_test = np.stack(y_test.values).astype('float32')

In [None]:
tokenizer = Tokenizer(lower=True, num_words=max_words)
tokenizer.fit_on_texts(list(x_train) + list(x_test))

x_train = tokenizer.texts_to_sequences(x_train)
x_test = tokenizer.texts_to_sequences(x_test)

x_train = pad_sequences(x_train, maxlen=maxlen)
x_test = pad_sequences(x_test, maxlen=maxlen)

In [None]:
embedding_dim = 300
vocab_size = len(tokenizer.word_index) + 1
nb_of_unknown_words = 0

weight_matrix = np.zeros((vocab_size, embedding_dim))

for word, i in tokenizer.word_index.items():
    try:
        embedding_vector = fasttext[word]
        weight_matrix[i] = embedding_vector
    except KeyError:
        weight_matrix[i] = np.random.uniform(-5, 5, embedding_dim)
        nb_of_unknown_words += 1
        
print(f"Number of unknown words inizialized randomly: {nb_of_unknown_words}")           

In [None]:
def loss_fn(y_true, y_pred):
    return tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred)

### Dense

In [None]:
model = Sequential()
model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=maxlen, weights=[weight_matrix], trainable=False))
model.add(Flatten())
model.add(Dense(200, kernel_regularizer='l1_l2', activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(50, kernel_regularizer='l1_l2', activation="relu"))
model.add(Dropout(0.2))
model.add(Dense(num_classes, activation="sigmoid"))

model.summary()

In [None]:
model.compile(optimizer="adam", 
              loss="weighted_binary_crossentropy", 
              metrics=[f1_score, tf.keras.metrics.AUC()])

In [None]:
batch_size = 256
epochs = 20

hist = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test))

### CNN

In [None]:
model = Sequential()
model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=maxlen, weights=[weight_matrix], trainable=False))
model.add(Conv1D(filters=256, kernel_size=3, activation='relu', kernel_regularizer='l1_l2'))
model.add(MaxPooling1D())
model.add(Dropout(0.3))
model.add(Conv1D(filters=256, kernel_size=3, activation='relu', kernel_regularizer='l1_l2'))
model.add(GlobalMaxPool1D())
model.add(Dense(num_classes, name="output"))
model.add(Activation('sigmoid'))

In [None]:
model.summary()

In [None]:
model.compile(optimizer="adam", 
              loss="weighted_binary_crossentropy", 
              metrics=['accuracy', f1_score, tf.keras.metrics.AUC()])

In [None]:
batch_size = 64
epochs = 20

In [None]:
hist = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test))

### Save/Load

In [None]:
model.save(r'models/model_cnn_fasttext.h5')

In [None]:
new_model = tf.keras.models.load_model(r'models/model_cnn_fasttext.h5')

### LSTM

In [None]:
model = Sequential()
model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=maxlen, weights=[weight_matrix], trainable=False))
model.add(Bidirectional(LSTM(units=64, return_sequences=True, activation='tanh', recurrent_activation='sigmoid', 
                             recurrent_dropout=0.0, dropout=0.5, kernel_initializer='glorot_uniform'),
	                    merge_mode='concat'))
model.add(Bidirectional(LSTM(units=64, return_sequences=True, activation='tanh', recurrent_activation='sigmoid', 
                             recurrent_dropout=0.0, dropout=0.5, kernel_initializer='glorot_uniform'),
	                      merge_mode='concat'))
model.add(Dropout(0.3)) 
model.add(GlobalMaxPool1D())
model.add(Dense(num_classes))
model.add(Activation('sigmoid'))

In [None]:
model.summary()

In [None]:
model.compile(optimizer="adam", 
              loss="weighted_binary_crossentropy", 
              metrics=['accuracy', f1_score, tf.keras.metrics.AUC()])

In [None]:
batch_size = 32
epochs = 50

In [None]:
hist = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test))

### Save/Load

In [None]:
# model.save(r'models/model_lstm_fasttext2.h5')

In [None]:
new_model = tf.keras.models.load_model(r'models/model_lstm_fasttext.h5')

### Check

In [None]:
y_pred = model.predict(x_test)

In [None]:
k = 4

indexes_pred = np.argwhere(y_pred[k] >= 0.7)
indexes_true = np.argwhere(y_test[k] == 1)

indexes_pred.reshape((1,-1))
indexes_true.reshape((1,-1))

for i in indexes_pred:
  print(list(unique_genres)[i[0]])

print(y_pred[k])
print("\n")

for i in indexes_true:
  print(list(unique_genres)[i[0]])

print(y_test[k])

In [None]:
def create_confusion_matrix(model, threshold=0.75):
    y_pred = model.predict(x_test)

    y_pred[y_pred > threshold] = 1
    y_pred[y_pred <= threshold] = 0

    return multilabel_confusion_matrix(y_test, y_pred)


def calculate_confusion_ratio(matrix):
    ok = sum([matrix[i, 0, 0] + matrix[i, 1, 1] for i in range(matrix.shape[0])])
    bad = sum([matrix[i, 1, 0] + matrix[i, 0, 1] for i in range(matrix.shape[0])])
    
    print(f"percent of ok: {ok/(ok+bad)*100}%")

In [None]:
matrix = create_confusion_matrix(model, threshold=0.5)

In [None]:
calculate_confusion_ratio(matrix)