The core of this project is based around a simple task -- performing genre analysis on the “Multi-
Lingual Lyrics for Genre Classification” dataset on kaggle.
https://www.kaggle.com/datasets/mateibejan/multilingual-lyrics-for-genre-classification
This is an extensive dataset that is split into training and testing subsets. The testing dataset
should be used for final testing only. The training dataset should be all training and validation
tasks as appropriate.
This first part of the task is to perform a number of analyses based on training from scratch to
predict genre based on initially on song lyrics and then on song lyrics and artist.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import nltk
import tensorflow as tf
import joblib
import matplotlib.pyplot as plt
import os

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import Embedding, LSTM, Dense, SimpleRNN, Dropout, Input, Reshape, Layer
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard, ModelCheckpoint
from nltk.corpus import stopwords
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Concatenate, Dropout, Flatten
from IPython.display import Image
import tensorflow_hub as hub


# Download necessary NLTK data files
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('punkt_tab')


# List physical devices to see if a GPU is available
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [33]:
# Define the paths to the train and test CSV files
train_file_path = 'part_1_data/train_encoded_lemmatize.csv'
test_file_path = 'part_1_data/test_encoded_lemmatize.csv'

# Load the train and test datasets
train_df = pd.read_csv(train_file_path)
test_df = pd.read_csv(test_file_path)

# Convert all entries in the DataFrames to strings
train_df = train_df.astype(str)
test_df = test_df.astype(str)

In [34]:
# Encode the labels
label_encoder = LabelEncoder()
train_df['genre'] = label_encoder.fit_transform(train_df['genre'])
test_df['genre'] = label_encoder.transform(test_df['genre'])

In [35]:
max_length = 1000
vocab_size = 10000
# Function to preprocess and optionally pad text data
def preprocess_text_data(train_df, test_df, max_artist_length=3, use_padding=True):
    global max_length
    global vocab_size
    # Split the training data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(train_df['lyrics'], train_df['genre'], test_size=0.2, random_state=42)

    # Tokenize the text data
    tokenizer = Tokenizer(num_words=10000)
    tokenizer.fit_on_texts(X_train)
    # vocab_size = len(tokenizer.word_index) + 1  # +1 because word_index starts from 1

    X_train_seq = tokenizer.texts_to_sequences(X_train)
    X_val_seq = tokenizer.texts_to_sequences(X_val)
    X_test_seq = tokenizer.texts_to_sequences(test_df['lyrics'])

    # max_length = max([len(seq) for seq in X_train_seq])
    # print(f"Maximum sequence length: {max_length}")

    # Optionally Pad the sequences
    if use_padding:
        X_train_pad = pad_sequences(X_train_seq, maxlen=max_length, padding='post')
        X_val_pad = pad_sequences(X_val_seq, maxlen=max_length, padding='post')
        X_test_pad = pad_sequences(X_test_seq, maxlen=max_length, padding='post')
    else:
        X_train_pad, X_val_pad, X_test_pad = X_train_seq, X_val_seq, X_test_seq

    # Prepare artist data
    artist_tokenizer = Tokenizer()
    artist_tokenizer.fit_on_texts(train_df['artist'])
    X_train_artist_seq = artist_tokenizer.texts_to_sequences(X_train)
    X_val_artist_seq = artist_tokenizer.texts_to_sequences(X_val)
    X_test_artist_seq = artist_tokenizer.texts_to_sequences(test_df['artist'])

    # Optionally Pad the artist sequences
    if use_padding:
        X_train_artist_pad = pad_sequences(X_train_artist_seq, maxlen=max_artist_length, padding='post')
        X_val_artist_pad = pad_sequences(X_val_artist_seq, maxlen=max_artist_length, padding='post')
        X_test_artist_pad = pad_sequences(X_test_artist_seq, maxlen=max_artist_length, padding='post')
    else:
        X_train_artist_pad, X_val_artist_pad, X_test_artist_pad = X_train_artist_seq, X_val_artist_seq, X_test_artist_seq

    return X_train_pad, X_val_pad, X_test_pad, X_train_artist_pad, X_val_artist_pad, X_test_artist_pad, y_train, y_val

# Example Usage
X_train_pad, X_val_pad, X_test_pad, X_train_artist_pad, X_val_artist_pad, X_test_artist_pad, y_train, y_val = preprocess_text_data(
    train_df, test_df, max_artist_length=3, use_padding=True
)


In [None]:
print(f"Maximum sequence length: {max_length}")
print(f"Vocabulary size: {vocab_size}")

In [None]:
# Create the directory if it doesn't exist
os.makedirs('part_1', exist_ok=True)

print("Shape of input data:", X_train_pad.shape)


In [38]:
dropout_rate = 0.5
# Define the model architecture
def create_rnn_model(input_length, vocab_size, embedding_dim, state_size, num_classes):
    model = Sequential()
    model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=input_length))
    model.add(SimpleRNN(state_size, return_sequences=False, activation="tanh"))
    model.add(Dropout(dropout_rate))  # Dropout after the RNN layer
    model.add(Dense(num_classes, activation='softmax'))
    return model

def create_multi_layer_rnn_model(input_length, vocab_size, embedding_dim, state_size, num_classes):
    model = Sequential()
    model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=input_length))
    model.add(SimpleRNN(state_size, return_sequences=True))  # First RNN layer with return_sequences=True
    model.add(Dropout(dropout_rate))  # Add dropout
    model.add(SimpleRNN(state_size, return_sequences=True))  # Second RNN layer with return_sequences=True
    model.add(Dropout(dropout_rate))  # Add dropout
    model.add(SimpleRNN(state_size))  # Third RNN layer without return_sequences
    model.add(Dense(num_classes, activation='softmax'))
    return model

def create_lstm_model(input_length, vocab_size, embedding_dim, state_size, num_classes):
    model = Sequential()
    model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=input_length))
    model.add(LSTM(state_size, return_sequences=False, activation="tanh"))
    model.add(Dropout(dropout_rate))  # Add dropout
    model.add(Dense(num_classes, activation='softmax'))
    return model

def create_multi_layer_lstm_model(input_length, vocab_size, embedding_dim, state_size, num_classes):
    model = Sequential()
    model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=input_length))
    model.add(LSTM(state_size, return_sequences=True, activation="tanh"))
    model.add(Dropout(dropout_rate))  
    model.add(LSTM(state_size, return_sequences=True, activation="tanh"))
    model.add(Dropout(dropout_rate))  
    model.add(LSTM(state_size, return_sequences=False, activation="tanh"))
    model.add(Dropout(dropout_rate))  
    model.add(Dense(num_classes, activation='softmax'))    
    return model

def create_lstm_model_multi_label(max_length, vocab_size, embedding_dim, state_size, num_classes, artist_vocab_size):
    # Input layer for lyrics
    lyrics_input = Input(shape=(max_length,), name='lyrics_input')
    lyrics_embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=max_length)(lyrics_input)
    lyrics_lstm = LSTM(state_size, return_sequences=False)(lyrics_embedding)

    # Input layer for artist
    artist_input = Input(shape=(1,), name='artist_input')
    artist_embedding = Embedding(input_dim=artist_vocab_size, output_dim=embedding_dim, input_length=1)(artist_input)
    # artist_flat = Flatten()(artist_embedding)
    artist_lstm = LSTM(state_size, return_sequences=False)(artist_embedding)

    # Concatenate lyrics and artist representations
    concatenated = Concatenate()([lyrics_lstm, artist_lstm])
    dense = Dense(128, activation='relu')(concatenated)
    dropout = Dropout(dropout_rate)(dense)
    output = Dense(num_classes, activation='softmax')(dropout)

    # Create the model
    model = Model(inputs=[lyrics_input, artist_input], outputs=output)
    return model

In [None]:
# Set hyperparameters
num_epochs = 10 # Define the number of epochs
embedding_dim = 100
state_size = 64  # Define the state size for both RNN and LSTM models
num_classes = len(label_encoder.classes_)
artist_vocab_size = len(train_df['artist'].unique())
learning_rate=1e-3


# Ensure the directory exists
output_dir = 'model_plots'
# Ensure the output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

log_dir = "logs"
tensorboard_callback = TensorBoard(log_dir=log_dir)

# Create and compile the RNN model
rnn_model = create_rnn_model(max_length, vocab_size, embedding_dim, state_size, num_classes)
rnn_model.compile(optimizer=Adam(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Create and compile the Multi Layer RNN model
multi_layer_rnn_model = create_multi_layer_rnn_model(max_length, vocab_size, embedding_dim, state_size, num_classes)
multi_layer_rnn_model.compile(optimizer=Adam(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])


# Create and compile the LSTM model
lstm_model = create_lstm_model(max_length, vocab_size, embedding_dim, state_size, num_classes)
lstm_model.compile(optimizer=Adam(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Create and compile the Multi Layer LSTM model
multi_layer_lstm_model = create_multi_layer_lstm_model(max_length, vocab_size, embedding_dim, state_size, num_classes)
multi_layer_lstm_model.compile(optimizer=Adam(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])


# Create and compile the LSTM model with multi label input
lstm_model_multi = create_lstm_model_multi_label(max_length, vocab_size, embedding_dim, state_size, num_classes, artist_vocab_size)
lstm_model_multi.compile(optimizer=Adam(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Compile the model
# lstm_model_pretrained = create_pretrained_lstm_model(state_size)
# lstm_model_pretrained.compile(optimizer=Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy'])

In [41]:
# Initialize an empty list to store the results
results = []

In [None]:
# Train the RNN model
num_epochs = 15
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Define the checkpoint callback dynamically using lambda to include the model name
checkpoint_path = f'part_1/best_rnn_model.keras'
checkpoint = ModelCheckpoint(
        filepath=checkpoint_path,
        monitor='val_loss',
        save_best_only=True,
        mode='min',
        verbose=1
    )
  

history_rnn = rnn_model.fit(X_train_pad, y_train, validation_data=(X_val_pad, y_val), epochs=num_epochs, batch_size=32, callbacks=[early_stopping, checkpoint, tensorboard_callback])
rnn_model.summary()

# Load the best model saved during training
best_model = load_model(f'part_1/best_rnn_model.keras')

# Evaluate the model on the test data (Block 1)
test_loss, test_acc = best_model.evaluate(X_test_pad, test_df['genre'])

# Store the results in the list
results.append({
    'model_name': 'best_rnn_model',
    'test_loss': test_loss,
    'test_accuracy': test_acc,
    'model_history': history_rnn
})


In [None]:
# Train the stacked RNN model
num_epochs = 15
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Define the checkpoint callback dynamically using lambda to include the model name
checkpoint_path = f'part_1/best_multi_layer_rnn_model.keras'
checkpoint = ModelCheckpoint(
        filepath=checkpoint_path,
        monitor='val_loss',
        save_best_only=True,
        mode='min',
        verbose=1
    )

history_multi_layer_rnn = multi_layer_rnn_model.fit(X_train_pad, y_train, validation_data=(X_val_pad, y_val), epochs=num_epochs, batch_size=32, callbacks=[early_stopping, checkpoint, tensorboard_callback])
multi_layer_rnn_model.summary()


# Load the best model saved during training
best_model = load_model(f'part_1/best_multi_layer_rnn_model.keras')

# Evaluate the model on the test data (Block 1)
test_loss, test_acc = best_model.evaluate(X_test_pad, test_df['genre'])

# Store the results in the list
results.append({
    'model_name': 'best_multi_layer_rnn_model',
    'test_loss': test_loss,
    'test_accuracy': test_acc,
    'model_history': history_multi_layer_rnn
})


In [None]:
# Train the LSTM model
num_epochs = 15
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Define the checkpoint callback dynamically using lambda to include the model name
checkpoint_path = f'part_1/best_lstm_model.keras'
checkpoint = ModelCheckpoint(
        filepath=checkpoint_path,
        monitor='val_loss',
        save_best_only=True,
        mode='min',
        verbose=1
    )

history_lstm = lstm_model.fit(X_train_pad, y_train, validation_data=(X_val_pad, y_val), epochs=num_epochs, batch_size=32, callbacks=[early_stopping, checkpoint, tensorboard_callback])
lstm_model.summary()

# Load the best model saved during training
best_model = load_model(f'part_1/best_lstm_model.keras')

# Evaluate the model on the test data (Block 1)
test_loss, test_acc = best_model.evaluate(X_test_pad, test_df['genre'])

# Store the results in the list
results.append({
    'model_name': 'best_lstm_model',
    'test_loss': test_loss,
    'test_accuracy': test_acc,
    'model_history': history_lstm
})


In [None]:
# Train the Multi Layer LSTM model
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Define the checkpoint callback dynamically using lambda to include the model name
checkpoint_path = f'part_1/best_multi_layer_lstm_model.keras'
checkpoint = ModelCheckpoint(
        filepath=checkpoint_path,
        monitor='val_loss',
        save_best_only=True,
        mode='min',
        verbose=1
    )

history_multi_layer_lstm = multi_layer_lstm_model.fit(X_train_pad, y_train, validation_data=(X_val_pad, y_val), epochs=num_epochs, batch_size=32, callbacks=[early_stopping, checkpoint, tensorboard_callback])
multi_layer_lstm_model.summary()


# Load the best model saved during training
best_model = load_model(f'part_1/best_multi_layer_lstm_model.keras')

# Evaluate the model on the test data (Block 1)
test_loss, test_acc = best_model.evaluate(X_test_pad, test_df['genre'])

# Store the results in the list
results.append({
    'model_name': 'best_multi_layer_lstm_model',
    'test_loss': test_loss,
    'test_accuracy': test_acc,
    'model_history': history_multi_layer_lstm
})

In [None]:
# Train the Multi Layer LSTM model
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Define the checkpoint callback dynamically using lambda to include the model name
checkpoint_path = f'part_1/best_lstm_multi_label.keras'
checkpoint = ModelCheckpoint(
        filepath=checkpoint_path,
        monitor='val_loss',
        save_best_only=True,
        mode='min',
        verbose=1
    )

# Train the model
history_lstm_multi_label = lstm_model_multi.fit(
    [X_train_pad, X_train_artist_pad], y_train,
    validation_data=([X_val_pad, X_val_artist_pad], y_val),
    epochs=num_epochs,
    batch_size=32,
    callbacks=[early_stopping, checkpoint, tensorboard_callback]
)
lstm_model_multi.summary()

# Load the best model saved during training
best_model = load_model(f'part_1/best_lstm_multi_label.keras')

# Evaluate the model on the test data (Block 1)
test_loss, test_acc = best_model.evaluate([X_test_pad, X_test_artist_pad], test_df['genre'])

# Store the results in the list
results.append({
    'model_name': 'best_lstm_multi_label',
    'test_loss': test_loss,
    'test_accuracy': test_acc,
    'model_history': history_lstm_multi_label
})


In [None]:
import pickle

# Save the results array to a file
with open('results_v1.pkl', 'wb') as file:
    pickle.dump(results, file)
print("Results array saved to 'results_v1.pkl'.")

In [None]:
import pickle

# Load the results array from a file
with open('results_v1.pkl', 'rb') as file:
    loaded_results = pickle.load(file)
print("Results array loaded from 'results_v1.pkl'.")

# Verify the loaded results
print(loaded_results)



# Print test accuracies dynamically
for result in loaded_results:
    print(f"{result['model_name']} Test Accuracy: {result['test_accuracy']}")

# Plot training and validation accuracy dynamically
plt.figure(figsize=(12, 6))
for result in loaded_results:
    model_name = result['model_name']
    history = result['model_history']
    plt.plot(history.history['accuracy'], label=f'{model_name} Training Accuracy')
    plt.plot(history.history['val_accuracy'], label=f'{model_name} Validation Accuracy')

plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
import pickle
import matplotlib.pyplot as plt

# Load the results array from a file
with open('results_v1.pkl', 'rb') as file:
    loaded_results = pickle.load(file)
print("Results array loaded from 'results_v1.pkl'.")

# Print test accuracies dynamically
for result in loaded_results:
    print(f"{result['model_name']} Test Accuracy: {result['test_accuracy']}, Test Loss: {result['test_loss']}")

# Prepare data for bar plots
model_names = [result['model_name'] for result in loaded_results]
test_accuracies = [result['test_accuracy'] for result in loaded_results]
test_losses = [result['test_loss'] for result in loaded_results]

# Plot Test Accuracy Bar Plot
plt.figure(figsize=(12, 6))
plt.bar(model_names, test_accuracies, color='green')
plt.title('Test Accuracy per Model')
plt.xlabel('Model')
plt.ylabel('Test Accuracy')
plt.xticks(rotation=45, ha='right')  # Rotate model names for better readability
plt.tight_layout()  # Adjust layout to prevent clipping of x-axis labels
plt.show()

# Plot Test Loss Bar Plot
plt.figure(figsize=(12, 6))
plt.bar(model_names, test_losses, color='red')
plt.title('Test Loss per Model')
plt.xlabel('Model')
plt.ylabel('Test Loss')
plt.xticks(rotation=45, ha='right')  # Rotate model names for better readability
plt.tight_layout()  # Adjust layout to prevent clipping of x-axis labels
plt.show()


In [None]:
# Evaluate the models on the test set
rnn_test_loss, rnn_test_acc = rnn_model.evaluate(X_test_pad, test_df['genre'])
lstm_test_loss, lstm_test_acc = lstm_model.evaluate(X_test_pad, test_df['genre'])
multi_layer_rnn_test_loss, multi_layer_rnn_test_acc = stacked_rnn_model.evaluate(X_test_pad, test_df['genre'])
lstm_multi_test_loss, lstm_multi_test_acc = lstm_model_multi.evaluate([X_test_pad, X_test_artist_pad], test_df['genre'])
multi_layer_lstm_test_loss, multi_layer_lstm_test_acc = multi_layer_lstm_model.evaluate(X_test_pad, test_df['genre'])


In [33]:
# Flatten the padded sequences to fit the classifier
X_train_flat = X_train_pad.reshape(X_train_pad.shape[0], -1)
X_val_flat = X_val_pad.reshape(X_val_pad.shape[0], -1)
X_test_flat = X_test_pad.reshape(X_test_pad.shape[0], -1)

In [34]:
# Initialize a DataFrame to store evaluation results
results_df = pd.DataFrame(columns=['Model', 'Accuracy', 'Precision', 'Recall', 'F1-Score'])

In [None]:
# List of models with their names and initialized instances
models = [
    ("Random Forest", RandomForestClassifier(n_estimators=100, random_state=42)),
]

# Iterate through each model, train, evaluate, and save
for idx, (model_name, model_instance) in enumerate(models):
    print(f"Training and evaluating model: {model_name}")

    # Create a pipeline (can be extended for preprocessing if needed)
    pipeline = Pipeline([
        (model_name, model_instance)
    ])
    
    # Train the pipeline on the training dataset
    pipeline.fit(X_train_flat, y_train)
    
    # Save the trained pipeline
    joblib.dump(pipeline, f'{model_name}_pipeline.pkl')
    print(f"Saved {model_name} pipeline to disk.")
    
    # Make predictions on the test dataset
    y_val_pred = pipeline.predict(X_val_flat)
    
    # Calculate evaluation metrics
    # Evaluate the model on the validation set
    accuracy = accuracy_score(y_val, y_val_pred)
    precision = precision_score(y_val, y_val_pred, average='weighted')
    recall = recall_score(y_val, y_val_pred, average='weighted')
    f1 = f1_score(y_val, y_val_pred, average='weighted')
    
    # Add results to the DataFrame using .loc
    results_df.loc[idx] = [model_name, accuracy, precision, recall, f1]

    # Display predictions (optional, can be removed for large datasets)
    print(f"Predictions for {model_name}: {y_val_pred}\n")

    # Predict on the test set
    y_test_pred = pipeline.predict(X_test_flat)

    # Evaluate the model on the test set
    test_accuracy = accuracy_score(test_df['genre'], y_test_pred)
    test_precision = precision_score(test_df['genre'], y_test_pred, average='weighted')
    test_recall = recall_score(test_df['genre'], y_test_pred, average='weighted')
    test_f1 = f1_score(test_df['genre'], y_test_pred, average='weighted')

    print(f"Test Accuracy: {test_accuracy}")
    print(f"Test Precision: {test_precision}")
    print(f"Test Recall: {test_recall}")
    print(f"Test F1 Score: {test_f1}")

# Display all results in a structured format
print("Evaluation Results:")
print(results_df)

# Save results to a CSV file for future reference
results_df.to_csv("model_evaluation_results.csv", index=False)
print("Saved evaluation results to 'model_evaluation_results.csv'.")
