In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# Suppress native-hadoop warning
!sed -i '$a\# Add the line for suppressing the NativeCodeLoader warning \nlog4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR,console' /$HADOOP_HOME/etc/hadoop/log4j.properties

In [3]:
import sys
sys.path.append('/home/work')

BASE_DIR = '/home/work'

In [4]:
import pyspark
from pyspark.sql import SparkSession, functions as F
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

from data.utils.data_loader import load_from_hdfs
from models.utils import load_model
from models.evaluation_metrics import calculate_rmse, calculate_mae, calculate_song_coverage, calculate_user_coverage, calculate_precision_recall

In [5]:
# Set Spark Settings
conf = pyspark.SparkConf().setAll([
    ('spark.master', 'local[10]'),
    ('spark.app.name', 'MusicRecommender'),
    ('spark.driver.memory','14g'),
    # ('spark.sql.shuffle.partitions', '200'),
])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Print Spark Settings
settings = spark.sparkContext.getConf().getAll()
for s in settings:
    print(s)

('spark.driver.host', '6f6ec6ea2650')
('spark.app.startTime', '1717093914712')
('spark.master', 'local[10]')
('spark.executor.id', 'driver')
('spark.app.id', 'local-1717093915437')
('spark.app.submitTime', '1717093914589')
('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.secur

## Evaluate Models

In [6]:
# Results DataFrame
results = []
model_dirs = ['als_model']
model_types = ['ALS']
datasets = ['processed/user_rating_balanced']
partitions = [1]

# Static Variables
total_size = 717872016
total_users = 1823179
total_songs = 136736

user_recs = None
predictions = None

batch_size = 10000
n_recs = 50

for model_dir, model_type in zip(model_dirs, model_types):
    for dataset in datasets: 
        for par in partitions:            
             # Clear Cache
            spark.catalog.clearCache()
            
            # Load Data
            train_data, test_data = load_from_hdfs(dataset, par)
            
            # Load Model
            model_path = f'file://{BASE_DIR}/models/{model_dir}'
            model = load_model(model_type, model_path)
            
            # Train Metrics
            train_user_ids = train_data.select('user_id').distinct()
            train_users = train_user_ids.count()
            train_song_ids = train_data.select('song_id').distinct()
            train_songs = train_song_ids.count()
            
            # Song Metrics
            test_user_ids = test_data.select('user_id').distinct()
            test_users = test_user_ids.count()
            test_song_ids = test_data.select('song_id').distinct()
            test_songs = test_song_ids.count()

            # Recommendation Metrics
            for i in range(0, test_users, batch_size):
                start = i
                end = i + batch_size
                test_user_batch = 
                user_recs_batch = model.recommendForUserSubset(test_user_batch, 50)
                if user_recs is None:
                    user_recs = user_recs_batch
                else:
                    user_recs = user_recs.union(user_recs_batch)
            
            # Explode Recommendations
            user_recs = user_recs.select('user_id', F.explode('recommendations').alias('recommendation'))
            user_recs = user_recs.select('user_id', F.col('recommendation.song_id').alias('song_id'), F.col('recommendation.rating').alias('rating'))
            user_recs.show(5)
            
            recommendation_users = user_recs.select('user_id').distinct().count()
            recommendation_songs = user_recs.select('song_id').distinct().count()
            
                                    
            # Coverage Metrics
            test_song_coverage = calculate_song_coverage(train_songs, test_songs)
            test_overall_song_coverage = calculate_song_coverage(total_songs, test_songs)
            test_user_coverage = calculate_user_coverage(train_users, test_users)
            test_overall_user_coverage = calculate_user_coverage(total_users, test_users)
            recommendations_song_coverage = calculate_song_coverage(train_songs, recommendation_songs)
            recommendations_overall_song_coverage = calculate_song_coverage(total_songs, recommendation_songs)
            recommendations_user_coverage = calculate_user_coverage(train_users, recommendation_users)
            recommendations_overall_user_coverage = calculate_user_coverage(total_users, recommendation_users)
            
            # Get Predictions
            predictions = model.transform(test_data)
            
            # Evaluation Metrics
            rmse = calculate_rmse(predictions)
            mae = calculate_mae(predictions)
            precision, recall = calculate_precision_recall(user_recs, test_data, 0.0)            
            
            results.append({
                'Model': model_type,
                'Dataset': dataset,
                'Users(Train:Test)': f'{train_users} : {test_users}',
                'Songs(Train:Test)': f'{train_songs} : {test_songs}',
                'Test User Coverage(Model:Overall)': f'{round(test_user_coverage, 2)} : {round(test_overall_user_coverage, 2)}',
                'Test Song Coverage(Model:Overall)': f'{round(test_song_coverage, 2)} : {round(test_overall_song_coverage, 2)}',
                'Recommendations User Coverage(Model:Overall)': f'{round(recommendations_user_coverage, 2)} : {round(recommendations_overall_user_coverage, 2)}',
                'Recommendations Song Coverage(Model:Overall)': f'{round(recommendations_song_coverage, 2)} : {round(recommendations_overall_song_coverage, 2)}',
                'Recommendations Precision': round(precision, 4),
                'Recommendations Recall': round(recall, 4),     
                'Predictions RMSE': rmse,
                'Predictions MAE': mae,
            })

                                                                                

Loaded 24891408 training records and 6224643 test records from HDFS
root
 |-- user_id: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- album_id: integer (nullable = true)
 |-- artist_id: integer (nullable = true)
 |-- genre_id: integer (nullable = true)
 |-- genre_name: string (nullable = true)
 |-- partition_id: integer (nullable = false)



[Stage 90:>                                                       (0 + 10) / 10]

+-------+-------+------+
|user_id|song_id|rating|
+-------+-------+------+
+-------+-------+------+



                                                                                

Precision: 0.0 = Relevant Recommendations: 0 / Total Recommendations: 0


[Stage 324:>                                                      (0 + 10) / 10]

Recall: 0.0 = Relevant Recommendations: 0 / Relevant Test Items: 6224643


                                                                                

## Evaluation Results

In [7]:
# Evaluation Results Schema
schema = StructType([
    StructField("Model", StringType(), True),
    StructField("Dataset", StringType(), True),
    StructField("Users(Train:Test)", StringType(), True),
    StructField("Songs(Train:Test)", StringType(), True),
    StructField("Test User Coverage(Model:Overall)", StringType(), True),
    StructField("Test Song Coverage(Model:Overall)", StringType(), True),
    StructField("Recommendations User Coverage(Model:Overall)", StringType(), True),
    StructField("Recommendations Song Coverage(Model:Overall)", StringType(), True),
    StructField("Recommendations Precision", FloatType(), True),
    StructField("Recommendations Recall", FloatType(), True),
    StructField("Predictions RMSE", FloatType(), True),
    StructField("Predictions MAE", FloatType(), True),
])

# Output Results as DF
results_df = spark.createDataFrame(results, schema)
results_df.show(truncate=False) 

+-----+------------------------------+-----------------+-----------------+---------------------------------+---------------------------------+--------------------------------------------+--------------------------------------------+-------------------------+----------------------+----------------+---------------+
|Model|Dataset                       |Users(Train:Test)|Songs(Train:Test)|Test User Coverage(Model:Overall)|Test Song Coverage(Model:Overall)|Recommendations User Coverage(Model:Overall)|Recommendations Song Coverage(Model:Overall)|Recommendations Precision|Recommendations Recall|Predictions RMSE|Predictions MAE|
+-----+------------------------------+-----------------+-----------------+---------------------------------+---------------------------------+--------------------------------------------+--------------------------------------------+-------------------------+----------------------+----------------+---------------+
|ALS  |processed/user_rating_balanced|173463 : 173446  