# What is covered?
1. Creating SparkSession
2. Reading ratings data from cloud storage bucket
3. Data preparation
4. Training ALS model on ratings data
5. Calculating the RMSE score and populating prediction

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, BooleanType, TimestampType, IntegerType

In [2]:
# create spark session
spark=SparkSession \
    .builder \
    .appName("Data to BQ") \
    .config("spark.jars", "gs://spark-lib/bigquery/spark-3.5-bigquery-0.38.0.jar") \
    .getOrCreate()

24/06/23 00:10:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
data_file_uri = "gs://als-exp/data/ratings.csv"
tmp_bucket_uri = "gs://als-exp/tmp"
project_id = "recs-exp"
bq_table_name = f"{project_id}.books.ratings"

schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("book_id", IntegerType(), True),
    StructField("rating", IntegerType(), True)
])

# read ratings data into a dataframe
dataframe = spark.read.format("csv").schema(schema).load(data_file_uri)

### Optional: Storing data to BigQuery Table

In [9]:
dataframe.select("user_id", "book_id", "rating").write \
    .format("com.google.cloud.spark.bigquery.v2.Spark34BigQueryTableProvider")\
    .option('table', bq_table_name) \
    .option('temporaryGcsBucket',tmp_bucket_uri) \
    .mode('append') \
    .save()

spark.stop()

                                                                                

### Data Preparation

In [4]:
dataframe = dataframe.na.drop()
dataframe.show()
# creating train and test split
train_data, test_data = dataframe.randomSplit([0.80,0.20])

                                                                                

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      1|    258|     5|
|      2|   4081|     4|
|      2|    260|     5|
|      2|   9296|     5|
|      2|   2318|     3|
|      2|     26|     4|
|      2|    315|     3|
|      2|     33|     4|
|      2|    301|     5|
|      2|   2686|     5|
|      2|   3753|     5|
|      2|   8519|     5|
|      4|     70|     4|
|      4|    264|     3|
|      4|    388|     4|
|      4|     18|     5|
|      4|     27|     5|
|      4|     21|     5|
|      4|      2|     5|
|      4|     23|     5|
+-------+-------+------+
only showing top 20 rows



### Train ALS model

In [5]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [None]:
# Train the ALS model on the training set
als_model = ALS(maxIter=5, regParam= 0.01, userCol="user_id", itemCol="book_id", ratingCol="rating", rank=5, coldStartStrategy="drop")
als_model = als_model.fit(train_data)

# Make predictions on the testing set
predictions = als_model.transform(test_data)
predictions = predictions.dropna()

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))



In [None]:
predictions.show()

## Conclusion
In this article, we implemented ALS model training on a PySpark cluster. The ALS (Alternating Least Squares) algorithm is very useful when working with big data and developing collaborative filtering recommendation systems. ALS effectively handles the cold start problem in recommendations and leverages the power of distributed computing clusters, such as Apache Spark, to train models on large datasets efficiently with reduced training time.