Import necessary libraries

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 18.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=9005ef44cd4ed5eb864568fd92f3b04c2059b2ef7aeec0a5cb3a1a8adafa70f3
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
import numpy as np 
import pandas as pd
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from google.colab import drive
from pyspark.ml.feature import StringIndexer

In [None]:
sc = SparkContext()
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [None]:
ratings = spark.read.csv("data_200,000.csv",sep = ',', header = True)
ratings.show()

+------+--------------------+-------+
|userID|             movieID|ratings|
+------+--------------------+-------+
|340626|acapulco_+prima+s...|      4|
| 26447|         otello+1986|      3|
|172600|          holes+2003|      2|
|314533|my+brother+the+te...|      4|
|249289|sommer+der+gaukle...|      2|
|186589|  aces+n+eights+2008|      3|
|278499|children+on+their...|      4|
| 95207|       excision+2012|      3|
| 48275|       outsider+1997|      4|
| 69998|rosas+hllenfahrt+...|      4|
|102246|los+signos+del+zo...|      4|
|427110|my+neighbor+totor...|      4|
|379843|        vincent+1982|      5|
|358920|     mirrormask+2005|      3|
|339159|this+is+spinal+ta...|      4|
| 91269|forbidden+planet+...|      3|
|447391|           gigi+1958|      2|
|410073|whats+up_+scarlet...|      3|
| 98060|the+heart+of+the+...|      5|
|335447|      44500+max+2009|      4|
+------+--------------------+-------+
only showing top 20 rows



In [None]:
ratings.printSchema()

root
 |-- userID: string (nullable = true)
 |-- movieID: string (nullable = true)
 |-- ratings: string (nullable = true)



In [None]:
ratings = ratings. \
    withColumn('userId', col('userID').cast('integer')).\
    withColumn('movieId', col('movieID').cast('string')).\
    withColumn('rating', col('ratings').cast('float')).\
    drop('ratings')
    #drop('_c3').\
    #drop('_c0').\
    #drop('_c1').\
    #drop('_c2')
ratings.show()

+------+--------------------+------+
|userId|             movieId|rating|
+------+--------------------+------+
|340626|acapulco_+prima+s...|   4.0|
| 26447|         otello+1986|   3.0|
|172600|          holes+2003|   2.0|
|314533|my+brother+the+te...|   4.0|
|249289|sommer+der+gaukle...|   2.0|
|186589|  aces+n+eights+2008|   3.0|
|278499|children+on+their...|   4.0|
| 95207|       excision+2012|   3.0|
| 48275|       outsider+1997|   4.0|
| 69998|rosas+hllenfahrt+...|   4.0|
|102246|los+signos+del+zo...|   4.0|
|427110|my+neighbor+totor...|   4.0|
|379843|        vincent+1982|   5.0|
|358920|     mirrormask+2005|   3.0|
|339159|this+is+spinal+ta...|   4.0|
| 91269|forbidden+planet+...|   3.0|
|447391|           gigi+1958|   2.0|
|410073|whats+up_+scarlet...|   3.0|
| 98060|the+heart+of+the+...|   5.0|
|335447|      44500+max+2009|   4.0|
+------+--------------------+------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

ratings = ratings.withColumn("movieIndex", dense_rank().over(Window.orderBy("movieId")))
ratings.show()

+------+--------------------+------+----------+
|userId|             movieId|rating|movieIndex|
+------+--------------------+------+----------+
|406568|          ++++++1959|   4.0|         1|
|362230|          ++++++1959|   4.0|         1|
| 17433|          ++++++1959|   4.0|         1|
|435477|          ++++++1959|   4.0|         1|
|190145|          ++++++1959|   4.0|         1|
|158290|          ++++++1959|   3.0|         1|
| 44682|          ++++++1959|   4.0|         1|
|369213|            ++++2013|   3.0|         2|
| 66335|       +la+mode+1993|   3.0|         3|
|210367|       +la+mode+1993|   3.0|         3|
| 79536|       +la+mode+1993|   3.0|         3|
|363147|     +laventure+2008|   3.0|         4|
| 30664|     +laventure+2008|   4.0|         4|
|123697|     +laventure+2008|   3.0|         4|
|350111|    +nos+amours+1983|   4.0|         5|
|389308|    +nos+amours+1983|   4.0|         5|
|230428|+nous+la+libert+1931|   5.0|         6|
|385620|+nous+la+libert+1931|   3.0|    

In [None]:
from pyspark.sql.functions import split

In [None]:
table = ratings.select('movieID','movieIndex')
table.show()

+--------------------+----------+
|             movieID|movieIndex|
+--------------------+----------+
|          ++++++1959|         1|
|          ++++++1959|         1|
|          ++++++1959|         1|
|          ++++++1959|         1|
|          ++++++1959|         1|
|          ++++++1959|         1|
|          ++++++1959|         1|
|            ++++2013|         2|
|       +la+mode+1993|         3|
|       +la+mode+1993|         3|
|       +la+mode+1993|         3|
|     +laventure+2008|         4|
|     +laventure+2008|         4|
|     +laventure+2008|         4|
|    +nos+amours+1983|         5|
|    +nos+amours+1983|         5|
|+nous+la+libert+1931|         6|
|+nous+la+libert+1931|         6|
|+nous+la+libert+1931|         6|
|+nous+la+libert+1931|         6|
+--------------------+----------+
only showing top 20 rows



In [None]:
final_table = table.select('movieID','movieIndex').distinct()
final_table.show()

+--------------------+----------+
|             movieID|movieIndex|
+--------------------+----------+
|          ++++++1959|         1|
|            ++++2013|         2|
|       +la+mode+1993|         3|
|     +laventure+2008|         4|
|    +nos+amours+1983|         5|
|+nous+la+libert+1931|         6|
|+propos+de+nice+1930|         7|
|...and+god+create...|         8|
|...and+justice+fo...|         9|
|...and+the+pursui...|        10|
|...tick...+tick.....|        11|
|            .45+2006|        12|
|  009+re+cyborg+2012|        13|
|           0605+2004|        14|
|1+-+nenokkadine+2014|        15|
|              1+2013|        16|
|1+knights+-+in+se...|        17|
|  10+000+timmar+2014|        18|
|             10+1979|        19|
|10+items+or+less+...|        20|
+--------------------+----------+
only showing top 20 rows



In [None]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- movieIndex: integer (nullable = false)



In [None]:
#example of getting from the table.
final_table.where(final_table.movieIndex == 1).show()

+----------+----------+
|   movieID|movieIndex|
+----------+----------+
|++++++1959|         1|
+----------+----------+



In [None]:
final_table.write.option("header",True).option("delimiter",",").csv("lookuptable")

In [None]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 3333) 
# ratings here is the the Spark dataframe type

In [None]:
als = ALS(userCol="userId", itemCol="movieIndex" \
          ,ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

Tuning the model

In [None]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 40, 70, 100, 130, 160, 200]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
print(cv)

CrossValidator_6ca62b28b059


In [None]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train) 
#train can be changed to ratings if we do not want to test it and want to use all data for validation

#Extract best model from the cv model above
best_model = model.bestModel

In [None]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 200
  MaxIter: 10
  RegParam: 0.01


In [None]:
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.7684231183305625


In [None]:
test_predictions.show()

+------+--------------------+------+----------+----------+
|userId|             movieId|rating|movieIndex|prediction|
+------+--------------------+------+----------+----------+
| 79536|       +la+mode+1993|   3.0|         3|0.81155235|
| 30664|     +laventure+2008|   4.0|         4| 1.3734282|
| 21700|...tick...+tick.....|   3.0|        11| 1.2672446|
|360564|10+items+or+less+...|   3.0|        20| 2.0138261|
| 47032|10+questions+for+...|   4.0|        23| 2.0292604|
| 51362|10+questions+for+...|   3.0|        23| 2.1853852|
|141325|10+things+i+hate+...|   4.0|        25|  3.813172|
|143405|10+things+i+hate+...|   4.0|        25|  4.088001|
|156796|10+things+i+hate+...|   4.0|        25| 2.9465654|
|181939|10+things+i+hate+...|   4.0|        25| 1.7001389|
|203685|10+things+i+hate+...|   4.0|        25| 2.5233817|
|214231|10+things+i+hate+...|   2.0|        25| 2.2120197|
|246191|10+things+i+hate+...|   4.0|        25| 2.8384593|
|248414|10+things+i+hate+...|   4.0|        25| 3.233523

Save Model and try Loading

In [None]:
best_model.save("/ALS")

In [None]:
from pyspark.ml.recommendation import ALSModel

In [None]:
#try loading
loaded = ALSModel.load("/ALS")

Make Recommendations

In [None]:
nrecommendations = loaded.recommendForAllUsers(20) #top 20 recommandations
nrecommendations.limit(20).show()
#nrecommendations4user_subset = best_model.recommendForUserSubset(user_subset, 3)
# user subset will be another data frame object. 

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    27|[{25100, 2.999452...|
|    28|[{3395, 2.999478}...|
|    53|[{3825, 4.901492}...|
|    76|[{20765, 3.260196...|
|    78|[{12918, 3.432839...|
|    85|[{15899, 4.566853...|
|   101|[{20175, 4.999309...|
|   126|[{3825, 4.263652}...|
|   137|[{56, 4.9992166},...|
|   255|[{4709, 4.9992204...|
|   296|[{19317, 3.999401...|
|   362|[{14594, 2.999262...|
|   368|[{3825, 4.340407}...|
|   406|[{14774, 3.999202...|
|   412|[{23669, 3.545208...|
|   458|[{3825, 4.4763346...|
|   481|[{3825, 3.062106}...|
|   497|[{5965, 4.2939124...|
|   577|[{3825, 4.470476}...|
|   587|[{13673, 3.999341...|
+------+--------------------+



PySpark Version Check


In [None]:
print('PySpark Version :'+spark.version)
print('PySpark Version :'+spark.sparkContext.version)

PySpark Version :3.3.0
PySpark Version :3.3.0


In [None]:
!zip -r '/ALS.zip' '/ALS'

  adding: ALS/ (stored 0%)
  adding: ALS/userFactors/ (stored 0%)
  adding: ALS/userFactors/part-00004-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet (deflated 12%)
  adding: ALS/userFactors/part-00009-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet (deflated 12%)
  adding: ALS/userFactors/.part-00000-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet.crc (deflated 0%)
  adding: ALS/userFactors/.part-00007-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet.crc (deflated 0%)
  adding: ALS/userFactors/part-00001-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet (deflated 12%)
  adding: ALS/userFactors/.part-00006-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet.crc (deflated 0%)
  adding: ALS/userFactors/.part-00005-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet.crc (deflated 0%)
  adding: ALS/userFactors/part-00005-1f389380-2516-467d-a3c7-290ea0cd41e8-c000.snappy.parquet (deflated 12%)
  adding: ALS/userFactors/part-00006-1f389380-

In [None]:
from google.colab import files
files.download('/ALS/metadata/_SUCCESS')



In [None]:
user_subset = ratings.filter(ratings.userId == 27)
user_subset.show()
type(user_subset)


+------+--------------------+------+----------+
|userId|             movieId|rating|movieIndex|
+------+--------------------+------+----------+
|    27|winter+in+wartime...|   3.0|     25100|
+------+--------------------+------+----------+



pyspark.sql.dataframe.DataFrame

In [None]:
rec = loaded.recommendForUserSubset(user_subset, 3)

In [None]:
movie_id_rec = rec.select("recommendations.movieIndex")
movie_id_rec.show()

+--------------------+
|          movieIndex|
+--------------------+
|[25100, 22793, 3825]|
+--------------------+



In [None]:
rec_list = movie_id_rec.collect()[0][0]

In [None]:
toreturn = []
for movie1_id in rec_list:
  final = final_table.filter(final_table.movieIndex == movie1_id)
  Done = final.select("movieID")
  toreturn.append(Done.collect()[0][0])

In [None]:
print(toreturn)

['winter+in+wartime+2008', 'the+usual+suspects+1995', 'casablanca+1942']


In [None]:
#27
def recommendationConverter(alsmodel,userid,lookuptable,orginal_data):
  userid_int = int(userid)
  user_subset = orginal_data.filter(orginal_data.userId == userid_int)
  rec = alsmodel.recommendForUserSubset(user_subset, 20)
  movie_id_rec = rec.select("recommendations.item")
  rec_list = movie_id_rec.collect()[0][0]
  toreturn = []
  for movie1_id in rec_list:
    final = final_table.filter(final_table.movieIndex == movie1_id)
    Done = final.select("movieID")
    toreturn.append(Done.collect()[0][0])