In [1]:
import pyspark
from pyspark.sql import SparkSession

# Ensure the path to the JAR files are correct and accessible
path_to_mongo_spark_connector = "/home/hdoop/Downloads/mongo-spark-connector_2.12-3.0.2.jar"
path_to_mongo_java_driver = "/home/hdoop/Downloads/mongo-java-driver-3.12.10.jar"

print("Creating SparkSession...")

spark = SparkSession.builder \
    .appName("MusicRecommendation") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/music_recommendation.audio_features") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/music_recommendation.audio_features") \
    .config("spark.jars", ",".join([path_to_mongo_spark_connector, path_to_mongo_java_driver])) \
    .getOrCreate()
print("SparkSession created successfully.")


Creating SparkSession...


24/05/12 20:14:25 WARN Utils: Your hostname, irtaza-Victus-by-HP-Gaming-Laptop-15-fa0xxx resolves to a loopback address: 127.0.1.1; using 2407:d000:d:7347:158e:b138:1774:d316 instead (on interface wlp4s0)
24/05/12 20:14:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/12 20:14:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


SparkSession created successfully.


In [2]:
# Load preprocessed audio features from MongoDB into a Spark DataFrame
print("Loading data from MongoDB...")
audio_features = spark.read.format("mongo").option("uri", "mongodb://127.0.0.1:27017/music_recommendation.audio_features").load()
print("Data loaded successfully.")

Loading data from MongoDB...


                                                                                

Data loaded successfully.


In [3]:
# Print the size of the data
print("Size of the loaded data: {} rows x {} columns".format(audio_features.count(), len(audio_features.columns)))

# Print the columns
print("Columns in the DataFrame:")
for col_name in audio_features.columns:
      print(col_name)

# Show the first 50 rows
print("First 50 rows of the DataFrame:")
audio_features.show(10)

                                                                                

Size of the loaded data: 104478 rows x 2 columns
Columns in the DataFrame:
_id
features
First 50 rows of the DataFrame:


                                                                                

+--------------------+--------------------+
|                 _id|            features|
+--------------------+--------------------+
|{663cfc46eff9941d...|[[3.6756520370513...|
|{663cfc46eff9941d...|[[-4.059843640725...|
|{663cfc47eff9941d...|[[-0.684101795088...|
|{663cfc47eff9941d...|[[-1.278270721028...|
|{663cfc47eff9941d...|[[-1.447105094707...|
|{663cfc47eff9941d...|[[-2.751079754458...|
|{663cfc47eff9941d...|[[-1.501083218520...|
|{663cfc48eff9941d...|[[1.2266289928350...|
|{663cfc48eff9941d...|[[-2.031567618177...|
|{663cfc48eff9941d...|[[-1.707836872826...|
+--------------------+--------------------+
only showing top 10 rows



In [3]:
# Split the data into training and testing sets
print("Splitting data into training and testing sets...")
# Define the ratio for splitting (e.g., 80% training, 20% testing)
training_ratio = 0.8
testing_ratio = 1.0 - training_ratio

# Split the data
training_data, test_data = audio_features.randomSplit([training_ratio, testing_ratio], seed=1234)

# Indicate when data splitting is done
print("Data splitting completed.") 

Splitting data into training and testing sets...
Data splitting completed.


In [5]:
# Print the size of the data
print("Size of the loaded data: {} rows x {} columns".format(training_data.count(), len(training_data.columns)))

# Print the columns
print("Columns in the DataFrame:")
for col_name in training_data.columns:
    print(col_name)



Size of the loaded data: 83284 rows x 2 columns
Columns in the DataFrame:
_id
features


                                                                                

In [6]:
# Print the size of the data
print("Size of the loaded data: {} rows x {} columns".format(test_data.count(), len(test_data.columns)))

# Print the columns
print("Columns in the DataFrame:")
for col_name in test_data.columns:
   print(col_name)



Size of the loaded data: 20965 rows x 2 columns
Columns in the DataFrame:
_id
features


                                                                                

In [4]:
training_data.printSchema()


root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)



In [5]:
from pyspark.sql.functions import col

# Split 'features' array into separate columns
split_features = training_data.select(
    col("_id").alias("row_id"),  # Assuming "_id" is the correct column name
    col("features")[0][0].alias("user_id"),
    col("features")[0][1].alias("track_id"),
    col("features")[0][2].alias("play_count")
)

# Show the schema and a sample of the split data
split_features.printSchema()
split_features.show(10)


root
 |-- row_id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- user_id: double (nullable = true)
 |-- track_id: double (nullable = true)
 |-- play_count: double (nullable = true)



                                                                                

+--------------------+-------------------+-------------------+-------------------+
|              row_id|            user_id|           track_id|         play_count|
+--------------------+-------------------+-------------------+-------------------+
|{663cfc46eff9941d...| 3.6756520370513175| 3.1075330221641035|  2.849467203982779|
|{663cfc47eff9941d...|-0.6841017950883698|0.07530321083923087| 0.7578103674944087|
|{663cfc47eff9941d...|-1.2782707210287152| -1.404545350553354|-0.9536318207970136|
|{663cfc47eff9941d...|-2.7510797544587415|-1.2052667907857624|-0.8085763432086499|
|{663cfc47eff9941d...| -1.501083218520508|-1.3162265608271722| -1.176454029434011|
|{663cfc48eff9941d...| 1.2266289928350496|  3.156820016138062| 3.7090223564897933|
|{663cfc48eff9941d...| -2.031567618177572|-0.3587792300901715| 0.6069387849694297|
|{663cfc48eff9941d...| -1.707836872826653| -1.230572212985931| -1.309623153998263|
|{663cfc48eff9941d...|  1.307634124710383| 1.7670893955220124|  1.068281944658892|
|{66

                                                                                

In [6]:
from pyspark.sql.functions import col

# Split 'features' array into separate columns
split_features01 = test_data.select(
    col("_id").alias("row_id"),  # Assuming "_id" is the correct column name
    col("features")[0][0].alias("user_id"),
    col("features")[0][1].alias("track_id"),
    col("features")[0][2].alias("play_count")
)

# Show the schema and a sample of the split data
split_features.printSchema()
split_features.show(10)


root
 |-- row_id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- user_id: double (nullable = true)
 |-- track_id: double (nullable = true)
 |-- play_count: double (nullable = true)



[Stage 4:>                                                          (0 + 2) / 2]

+--------------------+-------------------+-------------------+-------------------+
|              row_id|            user_id|           track_id|         play_count|
+--------------------+-------------------+-------------------+-------------------+
|{663cfc46eff9941d...| 3.6756520370513175| 3.1075330221641035|  2.849467203982779|
|{663cfc47eff9941d...|-0.6841017950883698|0.07530321083923087| 0.7578103674944087|
|{663cfc47eff9941d...|-1.2782707210287152| -1.404545350553354|-0.9536318207970136|
|{663cfc47eff9941d...|-2.7510797544587415|-1.2052667907857624|-0.8085763432086499|
|{663cfc47eff9941d...| -1.501083218520508|-1.3162265608271722| -1.176454029434011|
|{663cfc48eff9941d...| 1.2266289928350496|  3.156820016138062| 3.7090223564897933|
|{663cfc48eff9941d...| -2.031567618177572|-0.3587792300901715| 0.6069387849694297|
|{663cfc48eff9941d...| -1.707836872826653| -1.230572212985931| -1.309623153998263|
|{663cfc48eff9941d...|  1.307634124710383| 1.7670893955220124|  1.068281944658892|
|{66

                                                                                

In [7]:
from pyspark.sql.functions import col, split, monotonically_increasing_id

# Add a unique identifier column to the DataFrame
training_data = training_data.withColumn("row_id", monotonically_increasing_id())

# Split the 'features' column into separate columns for user_id, track_id, and play_count
training_data = training_data.withColumn("user_id", split(col("features")[0][0].cast("string"), " ")[0].cast("int"))
training_data = training_data.withColumn("track_id", split(col("features")[0][1].cast("string"), " ")[0].cast("int"))
training_data = training_data.withColumn("play_count", split(col("features")[0][2].cast("string"), " ")[0].cast("int"))

# Drop rows with null values
training_data = training_data.dropna()

# Show summary of cleaned data
training_data.summary().show()

# Indicate when data transformation is done
print("Data transformation completed.")


24/05/12 19:29:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 7:>                                                          (0 + 1) / 1]

+-------+--------------------+-------------------+------------------+-------------------+
|summary|              row_id|            user_id|          track_id|         play_count|
+-------+--------------------+-------------------+------------------+-------------------+
|  count|               83330|              83330|             83330|              83330|
|   mean|2.462881038278719...|0.17712708508340333|0.2483019320772831|0.31296051842073686|
| stddev|1.414356045106426...|  2.518530739645346| 2.142208030876912|  2.094931943083896|
|    min|                   0|                -12|                -8|                 -7|
|    25%|       1245540515939|                 -1|                -1|                 -1|
|    50%|       2448131358755|                  0|                 0|                  0|
|    75%|       3667902070811|                  1|                 1|                  1|
|    max|       4964982194190|                 58|                44|                 34|
+-------+-

                                                                                

In [8]:
from pyspark.sql.functions import col, split, monotonically_increasing_id

# Add a unique identifier column to the DataFrame
test_data = test_data.withColumn("row_id", monotonically_increasing_id())

# Split the 'features' column into separate columns for user_id, track_id, and play_count
test_data = test_data.withColumn("user_id", split(col("features")[0][0].cast("string"), " ")[0].cast("int"))
test_data = test_data.withColumn("track_id", split(col("features")[0][1].cast("string"), " ")[0].cast("int"))
test_data = test_data.withColumn("play_count", split(col("features")[0][2].cast("string"), " ")[0].cast("int"))

# Drop rows with null values
test_data = test_data.dropna()

# Show summary of cleaned data
test_data.summary().show()

# Indicate when data transformation is done
print("Data transformation completed.")


[Stage 10:>                                                         (0 + 1) / 1]

+-------+--------------------+------------------+-------------------+------------------+
|summary|              row_id|           user_id|           track_id|        play_count|
+-------+--------------------+------------------+-------------------+------------------+
|  count|               20988|             20988|              20988|             20988|
|   mean|2.454828790113059...|0.1631884886601868|0.23804078521059654| 0.300695635601296|
| stddev|1.426268588886973...| 2.469136495308594|  2.128711398676191|2.0760004697342644|
|    min|                   0|               -10|                 -7|                -7|
|    25%|       1219770712084|                -1|                 -1|                -1|
|    50%|       2448131358742|                 0|                  0|                 0|
|    75%|       3667902070805|                 1|                  1|                 1|
|    max|       4964982194176|                23|                 17|                15|
+-------+------------

                                                                                

In [9]:
from pyspark.sql.functions import col

# Split 'features' array into separate columns
split_features = training_data.select(
    col("_id.oid").alias("row_id"),
    col("features").getItem(0).getItem(0).alias("user_id"),
    col("features").getItem(0).getItem(1).alias("track_id"),
    col("features").getItem(0).getItem(2).alias("play_count")
)

# Show the schema and a sample of the split data
split_features.printSchema()
split_features.show(10)


root
 |-- row_id: string (nullable = true)
 |-- user_id: double (nullable = true)
 |-- track_id: double (nullable = true)
 |-- play_count: double (nullable = true)





+--------------------+-------------------+-------------------+-------------------+
|              row_id|            user_id|           track_id|         play_count|
+--------------------+-------------------+-------------------+-------------------+
|663cfc46eff9941de...| 3.6756520370513175| 3.1075330221641035|  2.849467203982779|
|663cfc47eff9941de...|-0.6841017950883698|0.07530321083923087| 0.7578103674944087|
|663cfc47eff9941de...|-1.2782707210287152| -1.404545350553354|-0.9536318207970136|
|663cfc47eff9941de...|-2.7510797544587415|-1.2052667907857624|-0.8085763432086499|
|663cfc47eff9941de...| -1.501083218520508|-1.3162265608271722| -1.176454029434011|
|663cfc48eff9941de...| 1.2266289928350496|  3.156820016138062| 3.7090223564897933|
|663cfc48eff9941de...| -2.031567618177572|-0.3587792300901715| 0.6069387849694297|
|663cfc48eff9941de...| -1.707836872826653| -1.230572212985931| -1.309623153998263|
|663cfc48eff9941de...|  1.307634124710383| 1.7670893955220124|  1.068281944658892|
|663

                                                                                

In [10]:
from pyspark.sql.functions import col

# Split 'features' array into separate columns
split_features01 = test_data.select(
    col("_id.oid").alias("row_id"),
    col("features").getItem(0).getItem(0).alias("user_id"),
    col("features").getItem(0).getItem(1).alias("track_id"),
    col("features").getItem(0).getItem(2).alias("play_count")
)

# Show the schema and a sample of the split data
split_features.printSchema()
split_features.show(10)


root
 |-- row_id: string (nullable = true)
 |-- user_id: double (nullable = true)
 |-- track_id: double (nullable = true)
 |-- play_count: double (nullable = true)





+--------------------+-------------------+-------------------+-------------------+
|              row_id|            user_id|           track_id|         play_count|
+--------------------+-------------------+-------------------+-------------------+
|663cfc46eff9941de...| 3.6756520370513175| 3.1075330221641035|  2.849467203982779|
|663cfc47eff9941de...|-0.6841017950883698|0.07530321083923087| 0.7578103674944087|
|663cfc47eff9941de...|-1.2782707210287152| -1.404545350553354|-0.9536318207970136|
|663cfc47eff9941de...|-2.7510797544587415|-1.2052667907857624|-0.8085763432086499|
|663cfc47eff9941de...| -1.501083218520508|-1.3162265608271722| -1.176454029434011|
|663cfc48eff9941de...| 1.2266289928350496|  3.156820016138062| 3.7090223564897933|
|663cfc48eff9941de...| -2.031567618177572|-0.3587792300901715| 0.6069387849694297|
|663cfc48eff9941de...| -1.707836872826653| -1.230572212985931| -1.309623153998263|
|663cfc48eff9941de...|  1.307634124710383| 1.7670893955220124|  1.068281944658892|
|663

                                                                                

In [4]:
###IGNORE





from pyspark.sql.functions import col
from pyspark.sql.functions import col, split, monotonically_increasing_id

# Split 'features' array into separate columns
split_features = training_data.select(
    col("_id").alias("row_id"),  # Assuming "_id" is the correct column name
    col("features")[0][0].alias("user_id"),
    col("features")[0][1].alias("track_id"),
    col("features")[0][2].alias("play_count")
)

# Split 'features' array into separate columns
split_features01 = test_data.select(
    col("_id").alias("row_id"),  # Assuming "_id" is the correct column name
    col("features")[0][0].alias("user_id"),
    col("features")[0][1].alias("track_id"),
    col("features")[0][2].alias("play_count")
)

# Add a unique identifier column to the DataFrame
training_data = training_data.withColumn("row_id", monotonically_increasing_id())

# Split the 'features' column into separate columns for user_id, track_id, and play_count
training_data = training_data.withColumn("user_id", split(col("features")[0][0].cast("string"), " ")[0].cast("int"))
training_data = training_data.withColumn("track_id", split(col("features")[0][1].cast("string"), " ")[0].cast("int"))
training_data = training_data.withColumn("play_count", split(col("features")[0][2].cast("string"), " ")[0].cast("int"))

# Drop rows with null values
training_data = training_data.dropna()

# Add a unique identifier column to the DataFrame
test_data = test_data.withColumn("row_id", monotonically_increasing_id())

# Split the 'features' column into separate columns for user_id, track_id, and play_count
test_data = test_data.withColumn("user_id", split(col("features")[0][0].cast("string"), " ")[0].cast("int"))
test_data = test_data.withColumn("track_id", split(col("features")[0][1].cast("string"), " ")[0].cast("int"))
test_data = test_data.withColumn("play_count", split(col("features")[0][2].cast("string"), " ")[0].cast("int"))

# Drop rows with null values
test_data = test_data.dropna()

# Split 'features' array into separate columns
split_features = training_data.select(
    col("_id.oid").alias("row_id"),
    col("features").getItem(0).getItem(0).alias("user_id"),
    col("features").getItem(0).getItem(1).alias("track_id"),
    col("features").getItem(0).getItem(2).alias("play_count")
)

# Split 'features' array into separate columns
split_features01 = test_data.select(
    col("_id.oid").alias("row_id"),
    col("features").getItem(0).getItem(0).alias("user_id"),
    col("features").getItem(0).getItem(1).alias("track_id"),
    col("features").getItem(0).getItem(2).alias("play_count")
)

In [5]:
from pyspark.sql.functions import col

# Filter out null or NaN values from the play_count column
split_features = split_features.filter(~col("play_count").isNull())

# Convert user_id column to integers
split_features = split_features.withColumn("user_id", col("user_id").cast("int"))

# Verify the schema after conversion and filtering
split_features.printSchema()

# Now, proceed to train the ALS model


root
 |-- row_id: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- track_id: double (nullable = true)
 |-- play_count: double (nullable = true)



In [6]:
from pyspark.sql.functions import col

# Filter out null or NaN values from the play_count column
split_features01 = split_features01.filter(~col("play_count").isNull())

# Convert user_id column to integers
split_features01 = split_features01.withColumn("user_id", col("user_id").cast("int"))

# Verify the schema after conversion and filtering
split_features01.printSchema()

# Now, proceed to train the ALS model


root
 |-- row_id: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- track_id: double (nullable = true)
 |-- play_count: double (nullable = true)



In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col



# Assuming you have a DataFrame named 'data' containing your input data
# Load your data into a DataFrame
# data = spark.read.csv("your_data.csv", header=True, inferSchema=True)

# Convert 'track_id' column to integer type and filter out invalid values
cleaned_data = split_features.withColumn("track_id", col("track_id").cast("integer")) \
                   .filter(col("track_id").isNotNull())
print("Data Cleaned ")

Data Cleaned 


In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col



# Assuming you have a DataFrame named 'data' containing your input data
# Load your data into a DataFrame
# data = spark.read.csv("your_data.csv", header=True, inferSchema=True)

# Convert 'track_id' column to integer type and filter out invalid values
cleaned_data01 = split_features01.withColumn("track_id", col("track_id").cast("integer")) \
                   .filter(col("track_id").isNotNull())
print("Data Cleaned")

Data Cleaned


In [9]:
from pyspark.sql.functions import col

# Assuming your DataFrame is called split_features
# Convert the track_id column to integer
split_features = split_features.withColumn("track_id", col("track_id").cast("integer"))

In [10]:
from pyspark.sql.functions import col

# Assuming your DataFrame is called split_features
# Convert the track_id column to integer
split_features01 = split_features01.withColumn("track_id", col("track_id").cast("integer"))

In [20]:
from pyspark.sql.functions import col

# Assuming your DataFrame is called split_features
# Convert the track_id column to integer
split_features = split_features.withColumn("track_id", col("track_id").cast("integer"))

# Create an ALS instance
als = ALS(maxIter=10, regParam=0.01, userCol="user_id", itemCol="track_id", ratingCol="play_count",
          coldStartStrategy="drop")

# Train the ALS model
print("Training the ALS model...")
model = als.fit(split_features)
print("Model training completed.")


Training the ALS model...


24/05/12 15:58:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/12 15:58:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Model training completed.


In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the testing data
predictions = model.transform(split_features)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="play_count",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {:.2f}".format(rmse))




Root Mean Squared Error (RMSE) on test data = 0.88


                                                                                

In [22]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(split_features01)

# Evaluate the model using an appropriate evaluation metric
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="play_count",
    predictionCol="prediction"
)

# Calculate the RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) = ", rmse)



Root Mean Squared Error (RMSE) =  0.8799627835479484


                                                                                

In [11]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


split_features = split_features.repartition("user_id")
print("Data loaded and partitioned successfully.")

# Define ALS model
als = ALS(userCol="user_id", itemCol="track_id", ratingCol="play_count",
          coldStartStrategy="drop")

# Define parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="play_count",
                                predictionCol="prediction")

# Cache the data for reuse
split_features.cache()

# Define cross-validator
cross_val = CrossValidator(estimator=als,
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator,
                           numFolds=3)

# Train ALS model using cross-validation
print("Training the ALS model with hyperparameter tuning...")
cv_model = cross_val.fit(split_features)
print("Model training completed.")

# Make predictions on the test data
predictions = cv_model.transform(split_features01)

# Evaluate the model using RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)


Data loaded and partitioned successfully.
Training the ALS model with hyperparameter tuning...


24/05/12 20:20:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/12 20:20:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Model training completed.




Root Mean Squared Error (RMSE): 0.8741945157100438


                                                                                

In [13]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

best_model = cv_model.bestModel

# Optionally, you can also print the best parameters found during hyperparameter tuning
print("Best parameters:")
print("Rank:", best_model.rank)
print("Max Iterations:", best_model._java_obj.parent().getMaxIter())
print("Regularization Parameter:", best_model._java_obj.parent().getRegParam())

predictions = best_model.transform(split_features)
accuracy = evaluator.evaluate(predictions)
print("Accuracy of the best model:", accuracy)


Best parameters:
Rank: 20
Max Iterations: 15
Regularization Parameter: 0.01
Accuracy of the best model: 0.8836422781420403


In [19]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Assuming split_features is your DataFrame containing user-item interactions
split_features02 = split_features.repartition("user_id")

# Define ALS model
als = ALS(userCol="user_id", itemCol="track_id", ratingCol="play_count",
          coldStartStrategy="drop")

# Define parameter grid for hyperparameter tuning
param_grid02 = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(als.alpha, [0.01, 0.1, 1.0]) \
    .build()

# Define evaluator
evaluator02 = RegressionEvaluator(metricName="rmse", labelCol="play_count",
                                  predictionCol="prediction")

# Cache the data for reuse
split_features02.cache()

# Define cross-validator
cross_val02 = CrossValidator(estimator=als,
                             estimatorParamMaps=param_grid02,
                             evaluator=evaluator02,
                             numFolds=3)

# Train ALS model using cross-validation
print("Training the ALS model with hyperparameter tuning...")
cv_model02 = cross_val02.fit(split_features02)
print("Model training completed.")

# Make predictions on the test data
predictions02 = cv_model02.transform(split_features02)

# Evaluate the model using RMSE
rmse02 = evaluator02.evaluate(predictions02)
print("Root Mean Squared Error (RMSE):", rmse02)


Training the ALS model with hyperparameter tuning...
Model training completed.
Root Mean Squared Error (RMSE): 0.883230456181312


In [21]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Get best model from cross-validation
best_model = cv_model02.bestModel

# Optionally, you can also print the best parameters found during hyperparameter tuning
print("Best parameters:")
print("Rank:", best_model.rank)
print("Max Iterations:", best_model._java_obj.parent().getMaxIter())
print("Regularization Parameter:", best_model._java_obj.parent().getRegParam())
print("Alpha:", best_model._java_obj.parent().getAlpha())

# Calculate accuracy of the best model
predictions = best_model.transform(split_features02)
accuracy = evaluator02.evaluate(predictions)
print("Accuracy of the best model:", accuracy)


Best parameters:
Rank: 10
Max Iterations: 10
Regularization Parameter: 0.01
Alpha: 0.01
Accuracy of the best model: 0.883230456181312
