# Personalized Music Recommender using Ensemble Methods

In [None]:
!pip install pyspark

In [15]:
%matplotlib inline
import zipfile
import os
import ast
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
#import sklearn as skl
#import sklearn.utils, sklearn.preprocessing, sklearn.decomposition, sklearn.svm
from sklearn.preprocessing import LabelEncoder, MultiLabelBinarizer, LabelBinarizer, StandardScaler
from pyspark.sql.functions import col

!pip install graphviz
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import plot_model

import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
import os

## Data Loading and Preprocessing

### Load the dataset

In [None]:
!wget https://os.unil.cloud.switch.ch/fma/fma_metadata.zip

In [None]:
!unzip fma_metadata.zip

In [6]:
def load(filepath):

    filename = os.path.basename(filepath)

    if 'features' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'echonest' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'genres' in filename:
        return pd.read_csv(filepath, index_col=0)

    if 'tracks' in filename:
        tracks = pd.read_csv(filepath, index_col=0, header=[0, 1])

        COLUMNS = [('track', 'tags'), ('album', 'tags'), ('artist', 'tags'),
                   ('track', 'genres'), ('track', 'genres_all')]
        for column in COLUMNS:
            tracks[column] = tracks[column].map(ast.literal_eval)

        COLUMNS = [('track', 'date_created'), ('track', 'date_recorded'),
                   ('album', 'date_created'), ('album', 'date_released'),
                   ('artist', 'date_created'), ('artist', 'active_year_begin'),
                   ('artist', 'active_year_end')]
        for column in COLUMNS:
            tracks[column] = pd.to_datetime(tracks[column])

        SUBSETS = ('small', 'medium', 'large')
        try:
            tracks['set', 'subset'] = tracks['set', 'subset'].astype(
                    'category', categories=SUBSETS, ordered=True)
        except (ValueError, TypeError):
            # the categories and ordered arguments were removed in pandas 0.25
            tracks['set', 'subset'] = tracks['set', 'subset'].astype(
                     pd.CategoricalDtype(categories=SUBSETS, ordered=True))

        COLUMNS = [('track', 'genre_top'), ('track', 'license'),
                   ('album', 'type'), ('album', 'information'),
                   ('artist', 'bio')]
        for column in COLUMNS:
            tracks[column] = tracks[column].astype('category')

        return tracks

In [7]:
tracks = load('/content/fma_metadata/tracks.csv')
genres = load('/content/fma_metadata/genres.csv')
features = load('/content/fma_metadata/features.csv')
echonest = load('/content/fma_metadata/echonest.csv')

In [8]:
np.testing.assert_array_equal(features.index, tracks.index)
assert echonest.index.isin(tracks.index).all()

tracks.shape, features.shape, echonest.shape

((106574, 52), (106574, 518), (13129, 249))

In [9]:
subset = tracks.index[tracks['set', 'subset'] <= 'medium']

assert subset.isin(tracks.index).all()
assert subset.isin(features.index).all()

features_all = features.join(echonest, how='inner').sort_index(axis=1)
print('Not enough Echonest features: {}'.format(features_all.shape))

tracks = tracks.loc[subset]
features_all = features.loc[subset]

tracks.shape, features_all.shape

Not enough Echonest features: (13129, 767)


((25000, 52), (25000, 518))

In [16]:
train = tracks.index[tracks['set', 'split'] == 'training']
val = tracks.index[tracks['set', 'split'] == 'validation']
test = tracks.index[tracks['set', 'split'] == 'test']

print('{} training examples, {} validation examples, {} testing examples'.format(*map(len, [train, val, test])))

genres = list(LabelEncoder().fit(tracks['track', 'genre_top']).classes_)
print('Top genres ({}): {}'.format(len(genres), genres))
genres = list(MultiLabelBinarizer().fit(tracks['track', 'genres_all']).classes_)
print('All genres ({}): {}'.format(len(genres), genres))

19922 training examples, 2505 validation examples, 2573 testing examples
Top genres (16): ['Blues', 'Classical', 'Country', 'Easy Listening', 'Electronic', 'Experimental', 'Folk', 'Hip-Hop', 'Instrumental', 'International', 'Jazz', 'Old-Time / Historic', 'Pop', 'Rock', 'Soul-RnB', 'Spoken']
All genres (151): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 25, 26, 27, 30, 31, 32, 33, 36, 37, 38, 41, 42, 43, 45, 46, 47, 49, 53, 58, 63, 64, 65, 66, 70, 71, 74, 76, 77, 79, 81, 83, 85, 86, 88, 89, 90, 92, 94, 97, 98, 100, 101, 102, 103, 107, 109, 111, 113, 117, 118, 125, 130, 137, 138, 166, 167, 169, 171, 172, 174, 177, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 214, 224, 232, 236, 240, 247, 250, 267, 286, 296, 297, 311, 314, 322, 337, 359, 360, 361, 362, 374, 378, 400, 401, 404, 428, 439, 440, 441, 442, 443, 456, 468, 491, 495, 502, 504, 514, 524, 538, 539, 542, 580, 602, 619, 651, 659, 695, 741, 763, 808, 810, 811, 906, 1032, 1060, 1193, 1235]


In [17]:
tracks.shape

(25000, 52)

In [None]:
tracks['track', 'genre_top']

In [None]:
!wget https://os.unil.cloud.switch.ch/fma/fma_small.zip

In [None]:
def batch_unzip(zip_path, output_dir, batch_size=0.1):
    with zipfile.ZipFile(zip_path, 'r') as z:
        all_files = z.namelist()
        total_files = len(all_files)
        batch_count = int(batch_size * total_files)

        # Create output directory if it doesn't exist
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # Extract files in batches
        for i in range(0, total_files, batch_count):
            # Extract a subset of files
            subset = all_files[i:i+batch_count]
            for file in subset:
                z.extract(file, output_dir)
            print(f"Batch {int(i/batch_count) + 1}/{int(total_files/batch_count) + 1} extracted")

# Usage
zip_path = '/content/fma_small.zip'
output_dir = '/content'
batch_unzip(zip_path, output_dir)

### Convert to Spectogram

In [23]:
def convert_to_spectrogram(audio_path, save_path, save_as_png=False):
    try:
        # Load the audio file with librosa
        y, sr = librosa.load(audio_path, sr=None)  # Use the native sampling rate

        # Generate a Mel-scaled power (energy-squared) spectrogram
        S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)

        # Convert to log scale (dB)
        log_S = librosa.power_to_db(S, ref=np.max)

        if save_as_png:
            # Plot the Spectrogram
            plt.figure(figsize=(10, 4))
            librosa.display.specshow(log_S, sr=sr, x_axis='time', y_axis='mel')
            plt.title('Mel Spectrogram')
            plt.colorbar(format='%+02.0f dB')
            plt.tight_layout()
            plt.savefig(save_path)
            plt.close()
        else:
            # Save as a numpy array file if not saving as PNG
            np.save(save_path, log_S)

    except Exception as e:
        print(f"Failed to process {audio_path}: {e}")


def process_audio_directory(input_dir, output_dir, save_as_png=False):
    # Create the output directory if it does not exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Recursively process each file in the directory
    for root, dirs, files in os.walk(input_dir):
        for filename in files:
            if filename.endswith('.mp3'):
                file_path = os.path.join(root, filename)
                # Structure the output path to maintain directory structure
                relative_path = os.path.relpath(root, input_dir)
                output_subdir = os.path.join(output_dir, relative_path)
                if not os.path.exists(output_subdir):
                    os.makedirs(output_subdir)

                output_name = filename.replace('.mp3', '.npy' if not save_as_png else '.png')
                output_path = os.path.join(output_subdir, output_name)
                convert_to_spectrogram(file_path, output_path, save_as_png=save_as_png)
                print(f"Processed and saved: {output_path}")

In [None]:
# Store Spectrograms as Numpy Arrays for carry CNN Model
input_dir = '/content/fma_small/'  # Adjust the path to your actual directory structure
output_dir = '/content/spectrograms_npy'  # Define the output directory for spectrograms
process_audio_directory(input_dir, output_dir, save_as_png=False)

## CNN model

In [None]:
# Filter needed columns and drop NaNs
genre_labels = tracks['track', 'genre_top'].dropna()

import numpy as np
import os
import librosa

def load_spectrograms_and_labels(spectrogram_dir, genre_labels, max_pad_len=174):
    X = []
    y = []
    for root, dirs, files in os.walk(spectrogram_dir):  # Recursively walk through all directories
        for filename in files:
            if filename.endswith('.npy'):  # Ensure we are only processing .npy files
                track_id = int(filename.split('.')[0])
                if track_id in genre_labels.index:
                    genre = genre_labels.loc[track_id]
                    spectrogram_path = os.path.join(root, filename)
                    spectrogram = np.load(spectrogram_path)

                    # Pad or trim the length of the spectrogram
                    pad_width = max_pad_len - spectrogram.shape[1]
                    if pad_width > 0:
                        spectrogram = np.pad(spectrogram, pad_width=((0, 0), (0, pad_width)), mode='constant')
                    else:
                        spectrogram = spectrogram[:, :max_pad_len]

                    X.append(spectrogram)
                    y.append(genre)
    return np.array(X), np.array(y)

spectrogram_dir = '/content/spectrograms_npy'
X, y = load_spectrograms_and_labels(spectrogram_dir, genre_labels)

# Adding a channel dimension to handle CNN input requirements
X = X.reshape(X.shape[0], X.shape[1], X.shape[2], 1)

# Encode labels
encoder = LabelEncoder()
y_encoded = encoder.fit_transform(y)
y_categorical = to_categorical(y_encoded)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y_categorical, test_size=0.2, random_state=42)

In [None]:
# Assuming X has shape (num_samples, height, width)

input_shape = X_train[0].shape  # Assuming X_train is correctly shaped (samples, height, width, channels)

model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
    MaxPooling2D(2, 2),
    BatchNormalization(),

    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    BatchNormalization(),

    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    BatchNormalization(),

    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(len(encoder.classes_), activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])


In [None]:
plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)


In [None]:
history = model.fit(X_train, y_train, epochs=20, validation_data=(X_test, y_test))


In [None]:
test_loss, test_acc = model.evaluate(X_test, y_test)
print("Test accuracy:", test_acc)

## Stacking

In [None]:
import os

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
#import sklearn as skl
#import sklearn.utils, sklearn.preprocessing, sklearn.decomposition, sklearn.svm
from pyspark.sql.functions import col

#!wget https://os.unil.cloud.switch.ch/fma/fma_small.zip

!wget https://os.unil.cloud.switch.ch/fma/fma_metadata.zip
!unzip fma_metadata.zip
# move to Hadoop Namenode
#!hadoop fs -put enwiki-latest-pages-articles1.xml /


!hadoop fs -put fma_metadata/* /

#hadoop fs -put /local/path/to/fma_metadata/tracks.csv /hdfs/target/path/
# adjust to reflect the cluster name and Hadoop masternode (IP port) of your cluster
tracks = "hdfs://st446-w09-cluster-m:8020/tracks.csv"

genres = "hdfs://st446-w09-cluster-m:8020/genres.csv"
features = "hdfs://st446-w09-cluster-m:8020/features.csv"
echonest = "hdfs://st446-w09-cluster-m:8020/echonest.csv"
df1 = spark.read.format("csv").option("header", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("multiline", "true") \
    .load(tracks)

df1.limit(10).toPandas()

featuredf = spark.read.format("csv").option("header", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("multiline", "true") \
    .load(features)

featuredf.limit(10).toPandas()

#small track (df1) dataset

small_df = df1.filter(col('set32') == 'small')
small_df.count()

#small feature dataset(plus track)

joined_df = df1.join(featuredf, df1._c0 == featuredf.feature)
small_featuredf=joined_df.filter(col('set32') == 'small')
#small_featuredf.printSchema()
mfcc_columns = [column for column in small_featuredf.columns if "mfcc" in column]
#track40 is genre top

#split the dataset
training=small_featuredf.filter(col('set31') == 'training').select(['track40'] + mfcc_columns)
test=small_featuredf.filter(col('set31') == 'test').select(['track40'] + mfcc_columns)
validation=small_featuredf.filter(col('set31') == 'validation').select(['track40'] + mfcc_columns)
# convert datatype to float
for feature in mfcc_columns:
    training=training.withColumn(feature, col(feature).cast('float'))
    test=test.withColumn(feature, col(feature).cast('float'))
    validation=validation.withColumn(feature, col(feature).cast('float'))
validation.printSchema()

genre classification (from features -mfcc) using Randomforest
test.printSchema()

from pyspark.ml.classification import RandomForestClassifier, OneVsRest, DecisionTreeClassifier, LogisticRegression, LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Convert label to number
stringIndexer = StringIndexer(inputCol="track40", outputCol="indexed")

# feature columns into a vector  #usetesthere which should be train
featuresCreator = VectorAssembler(inputCols=test.columns[1:], outputCol="features")

# Define and fit the model
rf = RandomForestClassifier(labelCol="indexed", featuresCol="features")

pipeline = Pipeline(stages=[stringIndexer, featuresCreator, rf])

#pipeline model
model = pipeline.fit(training)
#make predictions
predictions=model.transform(test)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexed",
    predictionCol="prediction",
    metricName="accuracy"
)

# Compute the accuracy on the test set
accuracy = evaluator.evaluate(predictions)


print(f"Test Accuracy: {accuracy:.4f}")

#final_df.printSchema()

test.printSchema()

#different classifiers

#rf = rf = RandomForestClassifier(labelCol="indexed", featuresCol="features")
dt = DecisionTreeClassifier(labelCol="indexed", featuresCol="features")
#lr = LogisticRegression()

# base pipelines
#rf_pipeline = Pipeline(stages=[stringIndexer,featuresCreator,
    #rf
#])


dt_pipeline = Pipeline(stages=[stringIndexer,featuresCreator,
    dt
])


# fit the base models

#rf_model = rf_pipeline.fit(training)
dt_model = dt_pipeline.fit(training)

# predictions (should be trained on validationtest)
#rf_predict = rf_model.transform(validation)    in last part, rf prediction is predictions

dt_predict = dt_model.transform(test)
#dt_predict.printSchema()
dt_predict.head()

choose probability column as meta-feature
# create new features for stacking

#add id
#reference https://stackoverflow.com/questions/43406887/spark-dataframe-how-to-add-a-index-column-aka-distributed-data-index
from pyspark.sql.functions import monotonically_increasing_id
predictions=predictions.withColumn("id", monotonically_increasing_id())
dt_predict=dt_predict.withColumn("id", monotonically_increasing_id())
validation=validation.withColumn("id", monotonically_increasing_id())

# rename probability column
predictions = predictions.withColumnRenamed('probability', 'rf_probability')
dt_predict = dt_predict.withColumnRenamed('probability', 'dt_probability')

#join the prediction to the validation dataset
newfeature = validation \
    .join(predictions.select('id', 'rf_probability'), on='id') \
    .join(dt_predict.select('id', 'dt_probability'), on='id')
# Assemble new features for meta-model
stacking_assembler = VectorAssembler(inputCols=['rf_prob', 'gbt_prob'], outputCol='stacking_features')

# Meta-model pipeline : use logistic regression
meta_pipeline = Pipeline(stages=[
    stacking_assembler,lr
])

# Fit the meta-model
meta_model = meta_pipeline.fit(new_feature)
# meta-model make predictions
final_predictions = meta_model.transform(new_features)

In [None]:
# Data Exploration

import matplotlib.pyplot as plt
import seaborn as sns

# Assuming df1 is already loaded and contains genre information
genre_counts = df1.groupBy('genre_top').count().toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(x='count', y='genre_top', data=genre_counts)
plt.title('Distribution of Genres')
plt.xlabel('Count')
plt.ylabel('Genre')
plt.show()


In [None]:
# Advanced Feature Engineering

from pyspark.ml.feature import PCA, VectorAssembler

# Assume 'featuredf' contains all features including MFCCs
feature_columns = [c for c in featuredf.columns if "mfcc" in c]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
feature_vector = assembler.transform(featuredf)

# PCA to reduce dimensions
pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(feature_vector)
pca_result = pca_model.transform(feature_vector)

# Adding reduced dimensions back to original dataframe
df1 = df1.join(pca_result.select('track_id', 'pcaFeatures'), on='track_id')

In [None]:
# Cross Validation

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Pipeline already defined as 'pipeline'
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 50]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="indexed", predictionCol="prediction"),
                          numFolds=5)  # 5-fold cross-validation

# Train the model using CrossValidator
cvModel = crossval.fit(training)


In [None]:
# Stacking Implementation

from pyspark.ml.classification import LogisticRegression

# Assembling new features from base model predictions
stacking_assembler = VectorAssembler(inputCols=['rf_probability', 'dt_probability'], outputCol='stacking_features')

# Meta-model: Logistic Regression
lr = LogisticRegression(featuresCol='stacking_features', labelCol='indexed')
stacking_pipeline = Pipeline(stages=[stacking_assembler, lr])

# Training meta-model on predictions
stacking_model = stacking_pipeline.fit(newfeature)  # Ensure 'newfeature' is prepared correctly

# Making final predictions
final_predictions = stacking_model.transform(new_features)  # 'new_features' should be the dataset prepared for the meta-model

## Bagging

## Boosting