In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("UAS")\
        .config('spark.ui.port', '3050')\
        .getOrCreate()

In [None]:
ratings = spark.read.option("header", "true").option("delimiter",";").csv("BX-Book-Ratings.csv")

In [None]:
ratings.printSchema()

root
 |-- User-ID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: string (nullable = true)



In [None]:
ratings = ratings.withColumnRenamed("Book-Rating","rating")\
                 .withColumnRenamed("User-ID","userId")

In [None]:
from pyspark.sql.functions import col
ratings = ratings.withColumn('userId', col('userId').cast('integer'))\
                    .withColumn('ISBN', col('ISBN').cast('integer'))\
                    .withColumn('rating', col('rating').cast('double'))

In [None]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- ISBN: integer (nullable = true)
 |-- rating: double (nullable = true)



In [None]:
from pyspark.sql.functions import col,isnan, when, count
ratings.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ratings.columns]).show()

+------+------+------+
|userId|  ISBN|rating|
+------+------+------+
|     0|182750|     0|
+------+------+------+



In [None]:
ratings = ratings.na.fill({"rating": 0.0})

In [None]:
ratings = ratings.na.drop()
ratings.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ratings.columns]).show()

+------+----+------+
|userId|ISBN|rating|
+------+----+------+
|     0|   0|     0|
+------+----+------+



In [None]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- ISBN: integer (nullable = true)
 |-- rating: double (nullable = false)



In [None]:
# train = ratings.filter(col("rating") != 0.0)
train = ratings.limit(300000)
# train = ratings

In [None]:
data = train.select("rating").count()
zero = train.filter(col("rating") == 0.0).count()
print("Data :" , data)
print("Rating 0.0 :", zero)
print("Rating > 0.0 :", data-zero)

Data : 300000
Rating 0.0 : 185096
Rating > 0.0 : 114904


In [None]:
(training, test) = train.randomSplit([0.8, 0.2], seed = None)

In [None]:
training.count()

240152

In [None]:
from pyspark.ml.recommendation import ALS
als = ALS(userCol="userId", itemCol="ISBN", ratingCol="rating", coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder()\
                .addGrid(als.rank, [10, 50, 100, 150]) \
                .addGrid(als.regParam, [.01, .05, .1, .15]) \
                .addGrid(als.maxIter, [20]) \
                .build()

print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [None]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator)

In [None]:
model = cv.fit(training)

In [None]:
best_model = model.bestModel

In [None]:
print("Rank:", best_model._java_obj.parent().getRank())
print("MaxIter:", best_model._java_obj.parent().getMaxIter())
print("RegParam:", best_model._java_obj.parent().getRegParam())

In [None]:
predictions = best_model.transform(test)

In [None]:
RMSE = evaluator.evaluate(predictions)
print("RMSE", RMSE)

In [None]:
uRecom = best_model.recommendForAllUsers(10)

In [None]:
iRecom = best_model.recommendForAllItems(10)

In [None]:
from pyspark.sql.functions import explode

In [None]:
uRecs = uRecom.withColumn("rec_exp", explode("recommendations"))\
              .select('userId', col("rec_exp.ISBN"), col("rec_exp.rating"))

In [None]:
iRecs = iRecom.withColumn("rec_exp", explode("recommendations"))\
              .select('ISBN', col("rec_exp.userId"), col("rec_exp.rating"))

In [None]:
uRecs.sort(col('rating').desc()).show()

+------+----------+---------+
|userId|      ISBN|   rating|
+------+----------+---------+
| 60714| 860514501|28.768648|
| 67465| 860514501|28.220264|
| 46947| 860514501|27.372032|
| 66823| 860514501| 25.65177|
|  2135| 860514501|24.544556|
| 21046| 860514501|24.057022|
| 57221| 860514501|24.001074|
| 70461| 860514501|23.687176|
| 64530| 440139791|23.587126|
| 64530| 310912520|23.587126|
| 21191| 860514501|22.123701|
| 39459| 708830560|22.016844|
| 49215| 552122866|21.937525|
| 38995| 708830560|21.694242|
| 76576| 860514501|21.518108|
| 64530| 553202790|21.228416|
| 64530| 395681863|21.228416|
| 64530| 440800129|21.228416|
| 64530| 590117653|21.228416|
| 35401|1575841711|21.046762|
+------+----------+---------+
only showing top 20 rows



In [None]:
print("Item ", iRecs.count())
print("User ", uRecs.count())

Item  1082150
User  245330


In [None]:
from pyspark.sql.functions import avg

rating_book = iRecs.groupBy("ISBN").agg(avg("rating").alias("avg rating")).filter(col('avg rating') != 0.0)

In [None]:
rating_book.show()

+-----+------------------+
| ISBN|        avg rating|
+-----+------------------+
|    2| 9.445890712738038|
|    3| 7.473214483261108|
|   10|7.5906116485595705|
|   20| 8.539437770843506|
|   84| 5.003663873672485|
|   90| 8.698471832275391|
|  279| 9.078240871429443|
|  698|7.4398369789123535|
| 1400| 6.646812486648559|
| 1401| 5.559951496124268|
| 1403| 8.829939651489259|
| 1404|6.4485368728637695|
| 1406|  7.05359525680542|
| 1420| 8.961522483825684|
| 1460| 7.684878921508789|
| 1965| 5.977848863601684|
|14048|  7.31473708152771|
|16964|6.4078452587127686|
|21013| 7.331098461151123|
|48118| 5.949427080154419|
+-----+------------------+
only showing top 20 rows

