In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=0f9441223d1f31a0c9c063fdae68a9d3373d7cd9a004b4d61b9d3b042d80f008
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import pandas as pd
import numpy as np

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


In [None]:
spark = SparkSession.builder.appName('movieRecommendationPySpark').getOrCreate()

In [None]:
ratings = spark.read.json('/content/drive/MyDrive/Colab Notebooks/ratings.json')

In [None]:
ratings.show()

+--------+------+-------+
| item_id|rating|user_id|
+--------+------+-------+
|41335427|     5|      0|
|41335427|     3|      1|
|41335427|     5|      2|
|41335427|     5|      3|
|41335427|     5|      4|
|41335427|     4|      5|
|41335427|     5|      6|
|41335427|     5|      7|
|41335427|     5|      8|
|41335427|     5|      9|
|41335427|     5|     10|
|41335427|     5|     11|
|41335427|     5|     12|
|41335427|     5|     13|
|41335427|     5|     14|
|41335427|     5|     15|
|41335427|     4|     16|
|41335427|     3|     17|
|41335427|     5|     18|
|41335427|     4|     19|
+--------+------+-------+
only showing top 20 rows



In [None]:
ratings.na.drop()

DataFrame[item_id: bigint, rating: bigint, user_id: bigint]

In [None]:
table1 = ratings.groupBy('user_id').agg((f.count('rating')>=300).alias('count')).filter(f.col('count')==True)
print(table1.count())
table2 = ratings.groupBy('item_id').agg((f.count('rating')>=10).alias('count')).filter(f.col('count')==True)
print(table2.count())
table3 = ratings.join(table1,on='user_id',how='inner').drop('count')
table3 = table3.join(table2, on ='item_id',how='inner').drop('count')
print(table3.count())

790
9374
323693


In [None]:
#train, test = table3.randomSplit([.8, .2])


In [None]:
als = ALS(userCol='user_id',
          ratingCol='rating',
          itemCol='item_id')
model = als.fit(table3)
prediction = model.transform(table3)
prediction = prediction.where(f.col('prediction')!=np.nan).na.drop()
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(prediction)
model.write().overwrite().save("/content/drive/MyDrive/Colab Notebooks/models/")

In [None]:
prediction.show()

+--------+-------+------+----------+
| item_id|user_id|rating|prediction|
+--------+-------+------+----------+
| 3194786|     22|     1|  2.430135|
|16682039|     22|     4| 3.6748598|
|19187812|     22|     5|   3.88239|
|42367516|     28|     2|  2.068932|
|10829530|     28|     2|  2.641614|
|12020129|     28|     4| 3.6035326|
|21500681|     28|     5| 3.8813524|
|21500681|    105|     5| 3.8199077|
| 2252213|    105|     4| 3.7389169|
|12020129|    105|     3| 3.8260727|
| 2207382|    228|     2|  3.286349|
| 3194786|    228|     3|  3.179707|
|13855759|    228|     1| 1.7955253|
|11552215|    228|     5| 3.7338142|
|12020129|    228|     3| 3.4531016|
|18221243|    228|     5| 4.0264697|
| 2501542|    262|     3| 3.0638652|
| 5727588|    262|     4| 3.7233224|
|21500681|    384|     4| 3.7120361|
|19187812|    413|     5| 3.4847322|
+--------+-------+------+----------+
only showing top 20 rows



In [None]:
print(rmse)

0.7098375815492473


In [None]:
prediction.coalesce(1).write.csv("/content/drive/MyDrive/Colab Notebooks/prediction", mode="overwrite")
prediction.repartition(1).write.csv("/content/drive/MyDrive/Colab Notebooks/prediction", mode="overwrite")

In [None]:
df = prediction.toPandas()

In [None]:
model.recommendForAllUsers(10).show(truncate=False)

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                                                            |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|22     |[{6801520, 4.826396}, {42805837, 4.748226}, {463790, 4.746833}, {47250749, 4.7230854}, {6440505, 4.706369}, {6571827, 4.692729}, {884960, 4.66082}, {10953171, 4.6403823}, {1840987, 4.605967}, {1791604, 4.5881906}]      |
|28     |[{227267, 5.0126987}, {955081, 4.9439554}, {2444552, 4.9301267}, {14244

In [None]:
model.recommendForUserSubset(spark.createDataFrame([{"user_id":22}]),10).show(truncate=False)

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                                                      |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|22     |[{6801520, 4.826396}, {42805837, 4.748226}, {463790, 4.746833}, {47250749, 4.7230854}, {6440505, 4.706369}, {6571827, 4.692729}, {884960, 4.66082}, {10953171, 4.6403823}, {1840987, 4.605967}, {1791604, 4.5881906}]|
+-------+-----------------------------------------------------------------------------------------------

In [None]:
new_test = test.select(["item_id","user_id"]).where(f.col("user_id")==1055).limit(10)
new_test.show()

+-------+-------+
|item_id|user_id|
+-------+-------+
|    115|   1055|
|   3801|   1055|
|  43554|   1055|
|  63845|   1055|
| 810663|   1055|
| 943470|   1055|
|1049657|   1055|
|1095121|   1055|
|1142385|   1055|
|1180927|   1055|
+-------+-------+



In [None]:
new_prediction = model.transform(new_test)
new_prediction.show()

+-------+-------+----------+
|item_id|user_id|prediction|
+-------+-------+----------+
|    115|   1055| 3.8159878|
|   3801|   1055| 3.3042886|
|  43554|   1055| 3.8256767|
|  63845|   1055|  3.482943|
| 810663|   1055| 3.5902839|
| 943470|   1055|  3.191405|
|1049657|   1055| 3.7092094|
|1095121|   1055| 3.3182786|
|1142385|   1055| 3.4836848|
|1180927|   1055| 3.3607078|
+-------+-------+----------+



In [None]:
ratings.select(f.max(f.col("user_id"))).show()

+------------+
|max(user_id)|
+------------+
|      356969|
+------------+



In [None]:
df_train = spark.createDataFrame([(115,356970,3),(43554,356970,4),(423,356970,2)],"item_id: int, user_id: int, rating: int")

In [None]:
df_train.show()

+-------+-------+------+
|item_id|user_id|rating|
+-------+-------+------+
|    115| 356970|     3|
|  43554| 356970|     4|
|    423| 356970|     2|
+-------+-------+------+



In [None]:
df_test = spark.createDataFrame([(63845,356970),(1142385,356970),(505,356970)],"item_id: int, user_id: int ")
df_test.show()

+-------+-------+
|item_id|user_id|
+-------+-------+
|  63845| 356970|
|1142385| 356970|
|    505| 356970|
+-------+-------+



In [None]:
from pyspark.ml.recommendation import ALSModel
new_model = ALSModel.load(path = "/content/drive/MyDrive/Colab Notebooks/models/")

In [None]:
new_prediction = new_model.transform(df_train)
new_prediction.show()

+-------+-------+------+----------+
|item_id|user_id|rating|prediction|
+-------+-------+------+----------+
|    115| 356970|     3|       NaN|
|  43554| 356970|     4|       NaN|
|    423| 356970|     2|       NaN|
+-------+-------+------+----------+

