In [1]:
movie_link = 'https://drive.google.com/file/d/1ieyb7hl_bs2vskis2rumk_a5bL0iIZxr/view?usp=drive_link'

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=e182aae3d94d0ef62ed3096535fb866f10f0584cf426e1b5edfeba5fc9c82a5f
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [4]:
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()




In [5]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

In [8]:
movie =spark.read.option("header","true").csv("/content/drive/MyDrive/Data/movie-als.csv")

In [9]:
movie.show(5)

+-------+------+------+--------------+
|movieId|userId|rating|original_title|
+-------+------+------+--------------+
|    647|   647|   4.0|          Heat|
|    564|   564|   3.0|          Heat|
|    558|   558|   4.0|          Heat|
|    547|   547|   4.0|          Heat|
|    537|   537|   3.0|          Heat|
+-------+------+------+--------------+
only showing top 5 rows



In [10]:
movie = movie.withColumn("movieId", col("movieId").cast(IntegerType()))

In [11]:
movie = movie.withColumn("userId", col("userId").cast(IntegerType()))

In [15]:
movie = movie.withColumn("rating", col("rating").cast(FloatType()))

In [16]:
movie = movie.filter(col("userId").isNotNull())

In [17]:
movie.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- original_title: string (nullable = true)



In [18]:
train_data, test_data = movie.randomSplit([0.8, 0.2])

In [19]:
def ALS_model():
  model_coldstart = ALS(rank = 10,maxIter=5,regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, coldStartStrategy="drop", seed = 11)
  model = model_coldstart.fit(train_data)
  return model

In [20]:
error_rate = []
loss_error = 0
base_model = ALS_model()
predict_train = base_model.transform(train_data)
predict_test = base_model.transform(test_data)

In [21]:
predict_train.show(5)

+-------+------+------+--------------------+----------+
|movieId|userId|rating|      original_title|prediction|
+-------+------+------+--------------------+----------+
|    148|   148|   2.0|  Breaking the Waves| 3.8839242|
|    148|   148|   2.0|            Van Gogh| 3.8839242|
|    148|   148|   2.5|Star Trek V: The ...| 3.8839242|
|    148|   148|   2.5|   Y tu mamá también| 3.8839242|
|    148|   148|   3.0|    A Bridge Too Far| 3.8839242|
+-------+------+------+--------------------+----------+
only showing top 5 rows



In [22]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='rating',metricName='rmse')

In [23]:
rmse_train = evaluator.evaluate(predict_train)
rmse_test = evaluator.evaluate(predict_test)

In [24]:
print(rmse_train,rmse_test)

0.9639558343998477 0.9988632106041737


In [25]:
model_cv = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, coldStartStrategy= "drop", seed = 11)

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [27]:
paramGrid = ParamGridBuilder().addGrid(model_cv.maxIter, [10,15]).addGrid(model_cv.regParam, [0.1, 0.01]).addGrid(model_cv.rank, [10,12]).build()

crossvalidation = CrossValidator(estimator = model_cv,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=3)

Best_model = crossvalidation.fit(train_data).bestModel

3.2218940536181133

In [29]:
predict_train_bm = Best_model.transform(train_data)
predict_test_bm = Best_model.transform(test_data)
rmse_train_bm = evaluator.evaluate(predict_train_bm)
rmse_test_bm = evaluator.evaluate(predict_test_bm)

In [30]:
print(rmse_train_bm,rmse_test_bm)

0.9551566506524056 0.9840810626626588


In [31]:
Best_model

ALSModel: uid=ALS_c6eba41ec49f, rank=12

In [32]:
top_10_movieId = Best_model.recommendForAllUsers(10)

In [33]:
top_10_movieId.show(5,False)

+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |[{139, 4.258242}, {446, 3.9881768}, {274, 3.926156}, {655, 3.7146652}, {66, 3.466715}, {584, 3.391535}, {273, 3.3861103}, {517, 3.1856475}, {503, 3.1579823}, {519, 3.151078}] |
|2     |[{641, 6.1028247}, {10, 5.30313}, {64, 5.247419}, {542, 4.925959}, {154, 4.7574315}, {632, 4.743712}, {123, 4.6817775}, {383, 4.5694475}, {41, 4.522252}, {235, 4.423604}]     |
|3     |[{240, 4.9982724}, {337, 4.6022}, {239, 3.9203482}, {620, 3.8135788

In [34]:
from pyspark.sql import functions as F

In [35]:
nrecommend = top_10_movieId.withColumn("rec_exp", F.explode("recommendations"))
joined_data = nrecommend.join(movie, nrecommend.rec_exp.movieId == movie.movieId, 'inner')
selected_data = joined_data.select(nrecommend.userId, movie.original_title)


In [36]:
# Rekomendasi berdasarkan user_id tertentu
user_id = 1
filtered_data = selected_data.filter(selected_data.userId == user_id)
filtered_data.show(10)

+------+--------------------+
|userId|      original_title|
+------+--------------------+
|     1|Astérix aux Jeux ...|
|     1|    My Name Is Bruce|
|     1|  The Golden Compass|
|     1|          Persepolis|
|     1|  Notes on a Scandal|
|     1|   Qui a tué Bambi ?|
|     1|         Sommersturm|
|     1|Diarios de motoci...|
|     1|      School of Rock|
|     1| Bollywood/Hollywood|
+------+--------------------+
only showing top 10 rows



In [37]:
# Rekomendasi berdasarkan user_id tertentu
user_id = 5
filtered_data = selected_data.filter(selected_data.userId == user_id)
filtered_data.show(10)

+------+--------------------+
|userId|      original_title|
+------+--------------------+
|     5|  The Garden of Eden|
|     5|  Don Q Son of Zorro|
|     5|    Muxmäuschenstill|
|     5|            Le Bossu|
|     5|K-19: The Widowmaker|
|     5|         5 Card Stud|
|     5|The Million Dolla...|
|     5|     The Dawn Patrol|
|     5|          Madagascar|
|     5|         5 Card Stud|
+------+--------------------+
only showing top 10 rows



In [38]:
# Rekomendasi berdasarkan user_id tertentu
user_id = 10
filtered_data = selected_data.filter(selected_data.userId == user_id)
filtered_data.show(10)

+------+--------------------+
|userId|      original_title|
+------+--------------------+
|    10|           Princesas|
|    10| Das Wunder von Bern|
|    10|    Marie Antoinette|
|    10|              Hostel|
|    10|The Three Musketeers|
|    10|           Spanglish|
|    10|Un long dimanche ...|
|    10|Bang Boom Bang - ...|
|    10|Dr. Jekyll and Mr...|
|    10|       La Peau douce|
+------+--------------------+
only showing top 10 rows

