In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

In [2]:
spark=SparkSession \
    .builder \
    .appName("training") \
    .config("spark.jars", "gs://spark-lib/bigquery/spark-3.5-bigquery-0.38.0.jar") \
    .getOrCreate()

24/05/16 20:28:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
df = spark.read.format("com.google.cloud.spark.bigquery.v2.Spark34BigQueryTableProvider")\
    .option('table', 'absolute-point-423516-c5.demo.ratings') \
    .option('temporaryGcsBucket', 'gs://recs_project/als/data/') \
    .load()

In [4]:
# drop bigquery _id column
df = df.drop('_id')
# drop rows with null values
df = df.na.drop()
df.show()

                                                                                

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|  39129|     10|     1|
|  39129|    133|     1|
|  39129|     43|     1|
|  39129|     76|     1|
|  39130|   3933|     1|
|   4318|    451|     1|
|  39129|     15|     1|
|  39130|   1446|     1|
|  39130|   3480|     1|
|  39129|    230|     1|
|  39129|   1274|     1|
|  39129|    545|     1|
|  39132|     40|     1|
|  34927|    271|     1|
|  15496|    253|     1|
|  38522|     52|     1|
|  38522|     56|     1|
|  39134|     56|     1|
|  39134|    314|     1|
|  21391|   2534|     1|
+-------+-------+------+
only showing top 20 rows



In [5]:
train_data, test_data = df.randomSplit([0.80,0.20])

In [6]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [13]:
# Train the ALS model on the training set
als_model = ALS(maxIter=5, regParam= 0.01, userCol="user_id", itemCol="book_id", ratingCol="rating", rank=5, coldStartStrategy="drop")
pipeline_mf = Pipeline(stages=[als_model])
als_model = als_model.fit(train_data)

# Make predictions on the testing set
predictions = als_model.transform(test_data)
predictions = predictions.dropna()

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))



Root-mean-square error = 0.8300883806488166



                                                                                

In [8]:
# show predictions and actual ratings
predictions.show()

[Stage 92:>                                                         (0 + 1) / 1]

+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|    148|    857|     3| 3.5954463|
|    148|   1152|     3| 3.1684055|
|    148|   1949|     3| 3.2308843|
|    148|   3267|     3| 3.6492395|
|    148|   4374|     3| 3.8596463|
|    148|   6105|     3| 3.3167431|
|    148|   6194|     3| 3.5626419|
|    148|   9837|     3| 3.6544404|
|    496|   8496|     2|  3.838284|
|    833|     26|     2| 2.9573455|
|    833|    119|     3| 3.8620384|
|    833|    264|     3| 3.6961882|
|    833|    934|     3| 3.6231916|
|    833|   1020|     3| 3.5577087|
|    833|   1465|     3|  3.817024|
|    833|   4438|     2| 3.5428925|
|    833|   5084|     2| 3.5846627|
|   1088|     44|     1|    2.3575|
|   1088|     72|     3|  3.655786|
|   1088|    152|     1| 2.1214037|
+-------+-------+------+----------+
only showing top 20 rows




                                                                                