# Data Processing


## Q1: Explore directory

In [None]:
!hdfs dfs -ls /data/msd/audio/features

In [None]:
!hdfs dfs -ls hdfs:///data/msd/genre

In [None]:
!hdfs dfs -ls hdfs:///data/msd/main/summary/

In [None]:
!hdfs dfs -du -s -h /data/msd/main/summary/   
    

In [None]:
!hdfs dfs -ls -R hdfs:///data/msd/ | awk '{print $8}' | sed -e 's/[^-][^\/]*\//--/g' -e 's/^/ /' -e 's/-/|/'

In [None]:
!hdfs dfs -du -s -h /data/msd/audio/
!hdfs dfs -du -s -h /data/msd/genre/
!hdfs dfs -du -s -h /data/msd/main/
!hdfs dfs -du -s -h /data/msd/tasteprofile
!hdfs dfs -du -s -h /data/msd/

In [None]:
# b. Count the number of rows in each of the datasets.
!hdfs dfs -ls -R hdfs:///data/msd/ | awk '{print $8}' | while read -r i; do echo "$i"; hdfs dfs -cat "$i" | wc -l; done > linecount1.csv


## Q2: Data preprocessing

In [None]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))


# Print function docstrings

help(start_spark)
help(stop_spark)
help(display_spark)
help(show_as_html)

In [None]:
# Run this cell to start a spark session in this notebook (don't actually use this many resources)

start_spark(executor_instances=8, executor_cores=4, worker_memory=8, master_memory=8)

In [None]:
# Write your imports and code here or insert cells below

from pyspark.sql import Row, DataFrame, Window, functions as F
from pyspark.sql.types import *

from pyspark.ml.evaluation import RankingEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import VectorAssembler

import json
import numpy as np

from pretty import SparkPretty  # download pretty.py from LEARN and put it in your M:\ or home directory
pretty = SparkPretty(limit=5)

In [None]:
# Determine ideal number of partitions

conf = sc.getConf()

N = int(conf.get("spark.executor.instances"))
M = int(conf.get("spark.executor.cores"))
partitions = 4 * N * M

print(f'ideal # partitions = {partitions}')

In [None]:
# !hdfs dfs -cat /data/msd/tasteprofile/mismatches/sid_matches_manually_accepted.txt | head
# !hdfs dfs -cat /data/msd/tasteprofile/mismatches/sid_matches_manually_accepted.txt | head

In [None]:
mismatches_schema = StructType([
    StructField("song_id", StringType(), True),
    StructField("song_artist", StringType(), True),
    StructField("song_title", StringType(), True),
    StructField("track_id", StringType(), True),
    StructField("track_artist", StringType(), True),
    StructField("track_title", StringType(), True)
])


In [None]:
# Load and parse mismatches in Python and then parallelize and createDataFrame in spark
path = "/scratch-network/courses/2023/DATA420-23S1/data/msd/tasteprofile/mismatches/sid_matches_manually_accepted.txt"
with open(path, "r") as f:
    lines = f.readlines()
    sid_matches_manually_accepted = []
    for line in lines:
        if line.startswith("< ERROR: "):
            a = line[10:28]
            b = line[29:47]
            c, d = line[49:-1].split("  !=  ")
            e, f = c.split("  -  ")
            g, h = d.split("  -  ")
            sid_matches_manually_accepted.append((a, e, f, b, g, h))

matches_manually_accepted = spark.createDataFrame(sc.parallelize(sid_matches_manually_accepted, 8), schema=mismatches_schema)
show_as_html(matches_manually_accepted)



In [None]:
path = "/scratch-network/courses/2023/DATA420-23S1/data/msd/tasteprofile/mismatches/sid_mismatches.txt"
with open(path, "r") as f:
    lines = f.readlines()
    sid_mismatches = []
    for line in lines:
        if line.startswith("ERROR: "):
            a = line[8:26]
            b = line[27:45]
            c, d = line[47:-1].split("  !=  ")
            e, f = c.split("  -  ")
            g, h = d.split("  -  ")
            sid_mismatches.append((a, e, f, b, g, h))

mismatches = spark.createDataFrame(sc.parallelize(sid_mismatches, 64), schema=mismatches_schema)
show_as_html(mismatches)

In [None]:
# Load and parse triplets in spark

triplets_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("song_id", StringType(), True),
    StructField("plays", IntegerType(), True)
])
triplets = (
    spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", "\t")
    .option("codec", "gzip")
    .schema(triplets_schema)
    .load("hdfs:///data/msd/tasteprofile/triplets.tsv/")
    .cache()
)
show_as_html(triplets,5)

In [None]:
# Anti join mismatches to manually accepted, and anti join the remaining mismatches to triplets

mismatches_not_accepted = mismatches.join(matches_manually_accepted, on="song_id", how="left_anti")
triplets_not_mismatched = triplets.join(mismatches_not_accepted, on="song_id", how="left_anti")

print(f"matches_manually_accepted = {matches_manually_accepted.count()}")
print(f"mismatches                = {mismatches.count()}")
print(f"triplets                  = {triplets.count()}")
print(f"triplets_not_mismatched   = {triplets_not_mismatched.count()}")

### Q2 (b) ###

- Load audio feature attribute names and types from `audio/attributes`
- Define schemas based on `audio/attributes`
- Load one of the small datasets from `audio/features` to use for Audio Similarity Q1 (a)

In [None]:
# Dataset names and attribute type mapping

datasets = [
    'msd-jmir-area-of-moments-all-v1.0',
    'msd-jmir-lpc-all-v1.0',
    'msd-jmir-methods-of-moments-all-v1.0',
    'msd-jmir-mfcc-all-v1.0',
    'msd-jmir-spectral-all-all-v1.0',
    'msd-jmir-spectral-derivatives-all-all-v1.0',
    'msd-marsyas-timbral-v1.0',
    'msd-mvd-v1.0',
    'msd-rh-v1.0',
    'msd-rp-v1.0',
    'msd-ssd-v1.0',
    'msd-trh-v1.0',
    'msd-tssd-v1.0',
]

lookup = {
    'real': DoubleType(),
    'NUMERIC': DoubleType(),
    'float': DoubleType(),
    'string': StringType(),
    'STRING': StringType(),
}
# Choose a dataset name, load attribute names, and define schemas based on attribute names

name = 'msd-jmir-area-of-moments-all-v1.0'

metadata_schema = StructType([
    StructField("name", StringType()),
    StructField("type", StringType()),
])
metadata = spark.read.csv(f'/data/msd/audio/attributes/{name}.attributes.csv', schema=metadata_schema)

metadata.show(metadata.count(), truncate=False)
schema_actual = StructType([
    StructField(name, lookup[typename], True) for name, typename in metadata.collect()
])

schema_simple = StructType([
    StructField(f"F{i:03d}", DoubleType(), True) for i in range(0, metadata.count() - 1)
] + [
    StructField(f"ID", StringType(), True)
])

print('name = ' + name)
print('')
print('actual_schema = ' + pretty(schema_actual))
print('')
print('simple_schema = ' + pretty(schema_simple))
print('')

data_actual = spark.read.csv(f'/data/msd/audio/features/{name}.csv', schema=schema_actual, quote="'")
data_simple = spark.read.csv(f'/data/msd/audio/features/{name}.csv', schema=schema_simple, quote="'")

show_as_html(data_actual)
show_as_html(data_simple)

# Audio similarity

## Q1. Analyse audio feature datasets. Pick one of dataset to analyse...

### (a) Produce descriptive statistics for audio features? Are any features strongly correlated?

In [None]:
# Dataset names and attribute type mapping

datasets = [
    'msd-jmir-area-of-moments-all-v1.0',
    'msd-jmir-lpc-all-v1.0',
    'msd-jmir-methods-of-moments-all-v1.0',
    'msd-jmir-mfcc-all-v1.0',
    'msd-jmir-spectral-all-all-v1.0',
    'msd-jmir-spectral-derivatives-all-all-v1.0',
    'msd-marsyas-timbral-v1.0',
    'msd-mvd-v1.0',
    'msd-rh-v1.0',
    'msd-rp-v1.0',
    'msd-ssd-v1.0',
    'msd-trh-v1.0',
    'msd-tssd-v1.0',
]

lookup = {
    'real': DoubleType(),
    'NUMERIC': DoubleType(),
    'float': DoubleType(),
    'string': StringType(),
    'STRING': StringType(),
}
# Choose a dataset name, load attribute names, and define schemas based on attribute names

name =  'msd-jmir-mfcc-all-v1.0'

metadata_schema = StructType([
    StructField("name", StringType()),
    StructField("type", StringType()),
])
metadata = spark.read.csv(f'/data/msd/audio/attributes/{name}.attributes.csv', schema=metadata_schema)

# metadata.show(metadata.count(), truncate=False)
schema_actual = StructType([
    StructField(name, lookup[typename], True) for name, typename in metadata.collect()
])

schema_simple = StructType([
    StructField(f"F{i:03d}", DoubleType(), True) for i in range(0, metadata.count() - 1)
] + [
    StructField(f"ID", StringType(), True)
])

print('name = ' + name)
print('')
print('actual_schema = ' + pretty(schema_actual))
print('')
print('simple_schema = ' + pretty(schema_simple))
print('')

data_actual = spark.read.csv(f'/data/msd/audio/features/{name}.csv', schema=schema_actual, quote="'")
data_simple = spark.read.csv(f'/data/msd/audio/features/{name}.csv', schema=schema_simple, quote="'")

show_as_html(data_actual)
show_as_html(data_simple)

In [None]:
features = data_simple
# features.printSchema()
print(features.count())  
# 994623
#drop non-numeric column
desc_stats=features.drop("MSD_TRACKID")

#find Descriptive statistics of each column feature
combined_stats = desc_stats.describe().toPandas().transpose()
combined_stats.columns = ['Count', 'Mean', 'Stddev', 'Min', 'Max']
combined_stats['Feature'] = combined_stats.index
combined_stats = combined_stats[['Feature', 'Count', 'Mean', 'Stddev', 'Min', 'Max']]

In [None]:
#Correlated features  
corr_data = features.toPandas()
corr_data.corr().unstack().sort_values().drop_duplicates().tail(11) #Top 10 correlated pairs

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

correlation_matrix = corr_data.corr()

# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))

plt.figure(figsize=(18, 16))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", vmin=-1, vmax=1, mask=mask)  # Set vmin and vmax to -1 and 1 respectively
plt.title("Correlation Matrix")
plt.show()

### (b) Load the MSD All Music Genre Dataset (MAGD). Visualize the distribution of genres for the songs that were matched.

In [None]:
!hdfs dfs -ls hdfs:///data/msd/genre

In [None]:
!hdfs dfs -cat /data/msd/genre/msd-topMAGD-genreAssignment.tsv | head

In [None]:
genre_schema = StructType([
    StructField("track_id", StringType(), True),
    StructField("genre", StringType(), True)
])
genres_data = (
    spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", "\t")
    .schema(genre_schema)
    .load("hdfs:///data/msd/genre/msd-MAGD-genreAssignment.tsv")
    .cache()
)
genres_data.show(5, False)
 

# First, remove the songs that were mismatched from genre_schema based on "track_id"
genre_matched = genres_data.join(mismatches_not_accepted, on="track_id", how="left_anti")  

# then plot distribution using matplotlib

genre_plot_data = genre_matched.groupBy('genre').count() 

genre_plot_data = genre_plot_data.sort("count",ascending=False) #sort by count
genre_plot_data.show(21)

genre_plot_data = genre_plot_data.toPandas() #Pandas format to plot

In [None]:
import matplotlib.pyplot as plt

def plot_genre(data):
    plt.figure(figsize=(10, 10))  # Increase the width of the plot by adjusting figsize
    ax = data.plot(kind='bar', x='genre', y='count', legend=False, grid=True)

    plt.ylabel('Count')
    plt.xlabel('Genre')
    plt.title("Total Number Of Matched Songs Per Genre")

    # Adjust gridlines width
    ax.yaxis.grid(linewidth=0.5)
    ax.xaxis.grid(linewidth=0.5)

    plt.show()

plot_genre(genre_plot_data)



### (c) Merge the genres dataset and the audio features dataset so that every song has a label.

In [None]:
!hdfs dfs -ls hdfs:///data/msd/genre

In [None]:
features=features.withColumnRenamed('MSD_TRACKID', 'ID')
show_as_html(features, 5)
show_as_html(genres_data, 5)

In [None]:
mfcc_features_genre=genres_data.join(features,genres_data.track_id==features.ID,how='inner') #inner join
mfcc_features_genre.printSchema()
mfcc_features_genre.count()

## Question 2. Develop a binary classification model.



### (b) Convert the genre column into a column representing if the song is ”Electronic” or some other genre as a binary label.

In [None]:
mfcc_genre_binary = mfcc_features_genre.withColumn("Class", F.when((F.col("genre").isin({"Electronic"})), 1).otherwise(0)) ## labeliing binary class
show_as_html(mfcc_genre_binary,5)

In [None]:
# What is the class balance of the binary label?
show_as_html(mfcc_genre_binary.groupBy("Class").count())

### (c) Split the dataset into training and test sets.

In [None]:
# Dropping out non-numeric columns 
mfcc_genre_df = mfcc_genre_binary.drop("ID","track_id","genre")
mfcc_genre_df.printSchema()


In [None]:
# Remove high-correlated variables. we delete the variable F011.
mfcc_genre_df = mfcc_genre_df.drop("F011", "F008")
corr_data = mfcc_genre_df.toPandas() 
corr_data.corr().unstack().sort_values().drop_duplicates().tail(11) #Top 10 correlated pairs

In [None]:
# Assemble features
assembler = (VectorAssembler()
            .setInputCols(mfcc_genre_df.columns[:-1])
            .setOutputCol('features')
            )
features = assembler.transform(mfcc_genre_df).select(["features", "Class"])
features.count()  
# 420620

In [None]:
show_as_html(features,5)

In [None]:
# split test and train set use exact stratification using Window
temp = (
     features
     .withColumn("id", F.monotonically_increasing_id())
     .withColumn("Random",F.rand())
     .withColumn(
        "Row",
         F.row_number()
         .over(
             Window
             .partitionBy("Class")
             .orderBy("Random")
        )
     )
 )
show_as_html(temp,5)

In [None]:
training = temp.where(
     ((F.col("Class") == 0) & (F.col("Row") < 379954 * 0.8)) |
     ((F.col("Class") == 1) & (F.col("Row") < 40666 * 0.8))     ## 80 % stratified split on class label column.
)
training.cache()

test = temp.join(training, on="id", how="left_anti")
test.cache()


In [None]:
training.filter(F.col("Class") == 1).show(5)

In [None]:
training = training.drop("id", "Random", "Row")  ## Training data
test = test.drop("id", "Random", "Row")          ## Testing data (only used for final predictions)


In [None]:
# user defined function that shows the class balance for a given dataframe
def print_class_balance(data, name):
    N = data.count()
    counts = data.groupBy("Class").count().toPandas()
    counts["ratio"] = counts["count"] / N
    print(name)
    print(N)
    print(counts)
    print("")

In [None]:
print_class_balance(features, "features")
print_class_balance(training, "training")
print_class_balance(test, "test")


In [None]:
# Downsampling
training_downsampled = (
    training
    .withColumn("Random", F.rand())
    .where((F.col("Class") != 0) | ((F.col("Class") == 0) & (F.col("Random") < 2 * (40666 / 379954))))
)
training_downsampled.cache()

print_class_balance(training_downsampled, "training_downsampled")

### (d-e) train & assess the performance of models

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# User define for performance metrics
def binary_metrics(df):
    """
    Calculate and Print the classification Metrics
    """
    total = df.count()
    TP = df[(df.Class == 1) & (df.prediction == 1)].count()
    TN = df[(df.Class == 0) & (df.prediction == 0)].count()
    FP = df[(df.Class == 0) & (df.prediction == 1)].count()
    FN = df[(df.Class== 1) & (df.prediction == 0)].count()
         
    a = (TP + TN) / total #accuracy
         
    #check 0 value for recall
    if (TP + FN) != 0:
        r = float(TP)/(TP + FN)
    else:
        r = "NA"
         
    #check 0 value for precision
    if (TP + FP) != 0:
        p = float(TP)/(TP + FP)
    else:
        p = "NA"
            
    #check 0 value for f1 score
    if p == "NA" or r == "NA":
        f = "NA"
    else:
        f = 1/((1/r+1/p)/2)
            
    binary_class_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Class", metricName="areaUnderROC")
    auroc = binary_class_evaluator.evaluate(df)
    print('Actual count: {}'.format(total))
    print('True Positive: {}'.format(TP))
    print('True Negative: {}'.format(TN))
    print('False Positive: {}'.format(FP))
    print('False Negative: {}'.format(FN))
    print('precision: {}'.format(p))
    print('recall: {}'.format(r))
    print('accuracy: {}'.format(a))
    print('auroc: {}'.format(auroc))
    print('F1_Score: {}'.format(f))
    return()


In [None]:
# LOGISTIC REGRESION

import numpy as np

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# No sampling
lr = LogisticRegression(featuresCol='features', labelCol='Class')
lr_model1 = lr.fit(training)
# Make predictions on test data using the Transformer.transform() method.
predictions1 = lr_model1.transform(test)
predictions1.cache()
binary_metrics(predictions1)

# Downsampling
lr = LogisticRegression(featuresCol='features', labelCol='Class')
lr_model2 = lr.fit(training_downsampled)
predictions2 = lr_model2.transform(test)
predictions2.cache()
binary_metrics(predictions2)


In [None]:
# RANDOM FOREST

from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="Class", featuresCol="features")

# Nosampling
rfModel1 = rf.fit(training)
predictions_rf1 = rfModel1.transform(test)
binary_metrics(predictions_rf1)

# Downsampling
rfModel2 = rf.fit(training_downsampled)
predictions_rf2 = rfModel2.transform(test)
binary_metrics(predictions_rf2)


In [None]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lsvc = LinearSVC(labelCol="Class", featuresCol="features")

# Nosampling
LinearSVC_model1 = lsvc.fit(training)
predictions_svc1 = LinearSVC_model1.transform(test)
binary_metrics(predictions_svc1)

# Downsampling
LinearSVC_model2 = lsvc.fit(training_downsampled)
# Make predictions on test data using the Transformer.transform() method.
predictions_svc2 = LinearSVC_model2.transform(test)
binary_metrics(predictions_svc2)


## Question 3


In [None]:
# Model Tunning
# Logistic Regression

# declaring estimator for CV
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
lr = LogisticRegression(labelCol="Class", featuresCol="features")

# building grid
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder()\
.addGrid(lr.elasticNetParam,[0.0, 0.1,0.3, 0.5, 0.8, 1.0])\
.addGrid(lr.maxIter,[10, 100])\
.addGrid(lr.regParam,[0,0.01, 0.5, 1.0]) \
.build()

# building evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Class")
#evaluator.evaluate(predictions)

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations and print result
cvModel = cv.fit(training_downsampled)
bestmodel=cvModel.bestModel
predictions_lr_cv = bestmodel.transform(test)
binary_metrics(predictions_lr_cv)


## Question 4

In [None]:
# Multiclass Classification
# Logistic Regression
#check schema again
mfcc_features_genre.printSchema()

In [None]:
# labelling genres using stringindexer
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol("genre").setOutputCol("label")
indexed_df = indexer.fit(mfcc_features_genre).transform(mfcc_features_genre)
indexed_df

In [None]:
indexed_df.groupBy("label","genre").count().orderBy("label").show(21,False)

In [None]:
# drop columns which we don't need
multi_genre = indexed_df.drop("ID","genre","track_id")
# drop columns which are highly correlated
multi_genre = multi_genre.drop("F008","F011")
multi_genre.printSchema()

In [None]:
def print_class_balance(data, name):
    N = data.count()
    counts = data.groupBy("label").count().orderBy("label").toPandas()
    counts["ratio"] = counts["count"] / N
    print(name)
    print(N)
    print(counts)
    print("")


In [None]:
# assemble features
assembler = (VectorAssembler()
    .setInputCols(multi_genre.columns[:-1])
    .setOutputCol("features"))
multi_features = assembler.transform(multi_genre).select(["features", "label"])

# splitting data using exact stratification
temp = (
    multi_features
    .withColumn("id", F.monotonically_increasing_id())
    .withColumn("Random", F.rand())
    .withColumn(
        "Row",
        F.row_number()
        .over(
            Window
            .partitionBy("label")
            .orderBy("Random")
        )
    )
)

class_counts = (
      multi_features
     .groupBy("label")
     .count()
     .toPandas()
     .set_index("label")["count"]
     .to_dict()
 )
classes = sorted(class_counts.keys())

training = temp
 
for c in classes:
     training = training.where((F.col("label") != c) | (F.col("Row") < class_counts[c] * 0.8))

training.cache()

test = temp.join(training, on="id", how="left_anti")
test.cache()

training = training.drop("id", "Random", "Row")
test = test.drop("id", "Random", "Row")

print_class_balance(multi_features,"multi_features")
print_class_balance(training, "training")
print_class_balance(test, "test")

In [None]:
# Downsampling applied for the major genres which are "Pop_rock" and "electronic"
downsample=training.filter(training["label"].isin([0,1])) 
training_downsampled = downsample.sampleBy("label",fractions = {0:0.10,1:0.50}, seed = 688)
training_downsampled.cache()

# The final train data
rest = training.filter(training["label"].isin([2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]))
afterdown_train_data = training_downsampled.union(rest)
print_class_balance(afterdown_train_data, "afterdown_train_data")

In [None]:
# Upsampling via poisson random sampling

counts = {label: count for label, count in training.groupBy("label").count().collect()}
count_lower_bound = 2000
count_upper_bound = 500000

def random_resample(x, counts, count_lower_bound, count_upper_bound):

    count = counts[x]

    if count < count_lower_bound:
        return [x] * int(1 + np.random.poisson((count_lower_bound - count) / count))  # randomly upsample to count_lower_bound

    if count > count_upper_bound:
        if np.random.rand() < count_upper_bound / count: # randomly downsample to count_upper_bound
            return [x]
        else:
            return []

    return [x]  # do nothing

random_resample_udf = F.udf(lambda x: random_resample(x, counts, count_lower_bound, count_upper_bound), ArrayType(IntegerType()))
final_train_data  = (
    afterdown_train_data
    .withColumn("sample", random_resample_udf(F.col("label")))
    .select(
        F.col("label"),
        F.col("features"),
        F.explode(F.col("sample")).alias("sample")
    )
    .drop("sample")
)

print_class_balance(final_train_data , "final_train_data")

In [None]:
# UP AND DOWN SAMPLING TRAINING
# Multi class Classification using Logistic regression
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# instantiate the base classifier.
lr = LogisticRegression(featuresCol='features', labelCol='label')

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(final_train_data)

# score the model on test data.
predictions = ovrModel.transform(test)

multiclass_evaluator(predictions)

# Song recommendations
## Q1: Taste Profile exploration

In [None]:
!hdfs dfs -ls /data/msd/tasteprofile/

In [None]:
# (a) How many unique songs are there in the dataset?
unique_songs = triplets_not_mismatched.select("song_id").distinct().count()
print('number of unique songs is: ', unique_songs) #378310
# (a) How many unique users?
unique_users = triplets_not_mismatched.select("user_id").distinct().count()
print('number of unique users is: ', unique_users) #1019318

In [None]:
triplets_not_mismatched.printSchema()

In [None]:
# (b) MOST ACTIVE USER ANALYSIS
user_counts = triplets_not_mismatched.groupBy("user_id").agg(
                                      F.count(F.col("song_id")).alias("song_count"),
                                      F.sum(F.col("plays")).alias("play_count")
                                     ).orderBy(F.col("play_count").desc()) # Check most active user by counting play times
  
user_counts.cache()

print('There are ', user_counts.count(), ' unique users') #1,019,318
print('Top 5 most active users who have highest play count:')
user_counts.show(5, False)
# What is this as a percentage of the total number of unique songs in the dataset?
song_most_user = user_counts.toPandas()['song_count'][0] # song count the most active user played
print('Percentage of songs the most active user played: ',100 * song_most_user / unique_songs) #0.052%

In [None]:
user_statistics = (
    user_counts.select("song_count", "play_count")
    .describe()
    .toPandas()
    .set_index("summary")
    .rename_axis(None)
)
print(user_statistics)

print("---User with the number of unique song quantile:---")
print(user_counts.approxQuantile("song_count", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05))  #FOR SONG RECOMMENDATION
print("---User with number of total activity quantile:---")
print(user_counts.approxQuantile("play_count", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05))

In [None]:
# Song statistics
song_counts = triplets_not_mismatched.groupBy("song_id").agg(
      F.count(F.col("user_id")).alias("user_count"),
      F.sum(F.col("plays")).alias("play_count"),
    ).orderBy(F.col("play_count").desc())
song_counts.cache()
print('Number of unique song: ', song_counts.count()) #378310
print('Top 5 most active users who have highest play count:')
song_counts.show(5, False)



In [None]:
# MOST POPULAR SONG ANALYSIS
statistics = (
  song_counts
  .select("user_count", "play_count")
  .describe()
  .toPandas()
  .set_index("summary")
  .rename_axis(None)
)
print(statistics)

# Aprroxiamte Quantitle
print("---songs with have most users listen to ")
print(song_counts.approxQuantile("user_count", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05))
print("---songs with most plays (most popularity)---") 
print(song_counts.approxQuantile("play_count", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05)) #FOR SONG RECOMMENDATION

In [None]:
def plot_histogram(data):
    """histogram showing the distribution of data"""
    if data.columns[0] == "song_id":
        name = "Song"
        title = "Song Popularity"
    elif data.columns[0] == "user_id":
        name = "User"
        title = "User Activity"
    fig, ax = plt.subplots(figsize=(16, 4))  # Increase the width of the plot to double
    data.hist(column='play_count', bins=500, ax=ax)
    ax.set_xlabel(f'Number of plays Range per {name}')
    ax.set_ylabel('Frequency')
    ax.set_title(f'Distribution of {title}')
    current_values_y = ax.get_yticks()
    current_values_x = ax.get_xticks()
    ax.set_xticklabels(['{:,.0f}'.format(x) for x in current_values_x])
    ax.set_yticklabels(['{:,.0f}'.format(x) for x in current_values_y])
    ax.grid(False)  # Remove the grid lines
    plt.savefig(f'Distribution of {title}', bbox_inches='tight', dpi=100)


In [None]:
plot_histogram(songs_distribution)

In [None]:
plot_histogram(user_activity)

In [None]:
# Print distribution of 'Song play count' (N) and 'User song count' (M):
print('User song count M: ', user_counts.approxQuantile("song_count", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05))
print('Song play count N: ', song_counts.approxQuantile("play_count", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05))
# (d) set threshold M & N. We will repeat these steps with other 2 pairs M & N
triplets_not_mismatched.count()

In [None]:
song_counts.show(5)
user_counts.show(5)

In [None]:
#  M = 13, N = 5 (remove data before first quantile)
user_song_count_threshold = 13  #M
song_play_count_threshold = 5 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')

from pyspark.ml.feature import StringIndexer
    
# (e) Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
show_as_html(triplets_limited,5)

# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])
test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:    {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")

test_not_training.show(50, False)


# add test_not_training to dictionary
counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()
counts

# remove the test not in training from test set
for k, v in counts.items():
  test = test.where((col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 25% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

## Question 2


In [None]:
def get_user_counts(triplets):
  return (
    triplets
    .groupBy("user_id_encoded")
    .agg(
      F.count(F.col("song_id_encoded")).alias("song_count"),
      F.sum(F.col("plays")).alias("play_count"),
    )
    .orderBy(F.col("play_count").desc())
  )

user_counts_1 = get_user_counts(test)
user_counts_1.show(20)


In [None]:
top_20_users = user_counts_1.select("user_id_encoded").limit(20).rdd.flatMap(lambda x: x).collect()
# we got top 20 users:
print(top_20_users)

In [None]:
# Import require librabry
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics

als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

# Randomly pick few users for recommendation
subset_users = test.filter(F.col("user_id_encoded").isin([24067,9,515326,208506,36877,18092,678615,47605,8,659627,76,5168]))
some_predictions = als_model.transform(subset_users)



In [None]:
subset_users_recommend = als_model.recommendForUserSubset(subset_users, 10)
subset_users_recommend.show(10, False)

In [None]:
def extract_songs(x):
    x = sorted(x, key=lambda x: -x[1])
    return [x[0] for x in x]
extract_songs_udf = F.udf(lambda x: extract_songs(x), ArrayType(IntegerType()))

In [None]:
# Check the subset song recommended and subset actual song for each user:

# Song recommendation

subset_users_song_recommend = (
    subset_users_recommend.withColumn('songs_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'songs_recommendation']).orderBy(F.col("user_id_encoded"))
    )

subset_users_song_recommend.show(40, False)

# Actual song Users play:

subset_actual_songs = (
    subset_users
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("subset_actual_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "subset_actual_songs")
    .orderBy(F.col("user_id_encoded"))
)

subset_actual_songs.cache()
subset_actual_songs.show(10, False)


###   EXPERIMENT WITH DIFFERENT M AND N

In [None]:
## Experiment 1
# M:13, N:5

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )

user_song_recommendation.cache()
user_song_recommendation.show(5,70)
relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)

relevant_songs.cache()
relevant_songs.show(5,70)
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP

In [None]:
#Experiment 2
#  M = 13, N = 8
user_song_count_threshold = 13  #M
song_play_count_threshold = 8 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP

In [None]:
#Experiment 3
#  M = 15, N = 8
user_song_count_threshold = 15  #M
song_play_count_threshold = 8 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP


In [None]:
#Experiment 4
#  M = 13, N = 10
user_song_count_threshold = 13  #M
song_play_count_threshold = 10 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP


In [None]:
#Experiment 5
#  M = 11, N = 5
user_song_count_threshold = 11  #M
song_play_count_threshold = 5 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP

In [None]:
#Experiment 6
#  M = 9, N = 5
user_song_count_threshold = 9  #M
song_play_count_threshold = 5 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP

In [None]:
#Experiment 6
#  M = 8, N = 5
user_song_count_threshold = 8  #M
song_play_count_threshold = 5 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP


In [None]:
#Experiment 8
#  M =7, N = 5
user_song_count_threshold = 7  #M
song_play_count_threshold = 5 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP

In [None]:
#Experiment 9
#  M =6, N = 5
user_song_count_threshold = 6  #M
song_play_count_threshold = 5 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP

In [None]:
#Experiment 10
#  M =7, N = 4
user_song_count_threshold = 7  #M
song_play_count_threshold = 4 #N
triplets_limited = triplets_not_mismatched
# Filter triplets_limited by threshold M & N
triplets_limited = (
  triplets_limited
  .join(
    user_counts.where(F.col("song_count") > user_song_count_threshold).select("user_id"),
    on="user_id",
    how="inner"
  )
)

triplets_limited = (
  triplets_limited
  .join(
    song_counts.where(F.col("play_count") > song_play_count_threshold).select("song_id"),
    on="song_id",
    how="inner"
  )
)
triplets_limited.cache()
triplets_limited_count = triplets_limited.count()
print('Number of remained observation: ', triplets_limited_count) #43,379,863/45,795,111 -->5.27% data was removed
print((1 - triplets_limited_count/45795111)*100, ' % observations were removed')
print('')
print('------------------------')
print('')
# ------------------------------
# Encoding

user_id_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
song_id_indexer = StringIndexer(inputCol="song_id", outputCol="song_id_encoded")

user_id_indexer_model = user_id_indexer.fit(triplets_limited)
song_id_indexer_model = song_id_indexer.fit(triplets_limited)

triplets_limited = user_id_indexer_model.transform(triplets_limited)
triplets_limited = song_id_indexer_model.transform(triplets_limited)
# ------------------------------
# Splitting

training, test = triplets_limited.randomSplit([0.7, 0.3])

test_not_training = test.join(training, on="user_id", how="left_anti")

training.cache()
test.cache()
test_not_training.cache()

print(f"training:      {training.count()}")
print(f"test:        {test.count()}")
print(f"test_not_training: {test_not_training.count()}")
print('')
test_not_training.show(50, False)
print('')

counts = test_not_training.groupBy("user_id").count().toPandas().set_index("user_id")["count"].to_dict()

print(counts)

#remove the test not in training from test set
for k, v in counts.items():
  test = test.where((F.col("user_id") != k))

#Print final training test
print(f"training:      {training.count()}")
print(f"test:        {test.count()}")

# checking test set if it contains at least 20% of the plays in total
test_play = test.agg(F.sum(F.col("plays"))).collect() # test play count
total_play = triplets_limited.agg(F.sum(F.col("plays"))).collect() #total play count
print(100 * test_play[0][0] / total_play[0][0]) # test play count percentage

#ALS model training
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_encoded", itemCol="song_id_encoded", ratingCol="plays", implicitPrefs=True)
als_model = als.fit(training)

predictions = als_model.transform(test)
recommended_songs = als_model.recommendForAllUsers(10)

user_song_recommendation = (
    recommended_songs.withColumn('user_song_recommendation', extract_songs_udf(F.col('recommendations')))
    .select(['user_id_encoded', 'user_song_recommendation']).orderBy(F.col("user_id_encoded"))
    )
print('')
user_song_recommendation.cache()
user_song_recommendation.show(5,70)

relevant_songs = (
    test
    .select(
        F.col("user_id_encoded").cast(IntegerType()),
        F.col("song_id_encoded").cast(IntegerType()),
        F.col("plays").cast(IntegerType())
    )
    .groupBy('user_id_encoded')
    .agg(
        F.collect_list(
            F.array(
                F.col("song_id_encoded"),
                F.col("plays")
            )
        ).alias('relevance')
    )
    .withColumn("actual_listened_songs", extract_songs_udf(F.col("relevance")))
    .select("user_id_encoded", "actual_listened_songs")
    .orderBy(F.col("user_id_encoded"))
)
print('')
relevant_songs.cache()
relevant_songs.show(5,70)
print('')
perUserItemsRDD = (
    user_song_recommendation.join(relevant_songs, on='user_id_encoded', how='inner')
    .rdd
    .map(lambda row: (row[1], row[2]))
)
perUserItemsRDD.cache()

# generating metrics required
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(f'Precision @10: {rankingMetrics.precisionAt(10)}') # Precision at 10
print(f'NDCG @10: {rankingMetrics.ndcgAt(10)}') #NDCG at 10
print(f'Mean Average Precision (MAP): {rankingMetrics.meanAveragePrecision}') #MAP