In [1]:
# Install the dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
!tar xf spark-3.0.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
# Set the environment variables for running PySpark in the collaboration environmentimport os
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop3.2"

In [399]:
# Run the local session to test the installation
import findspark
findspark.init('spark-3.0.2-bin-hadoop3.2')
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [400]:
# Create dataframes
ratings = spark.read.csv('ratings.csv', header=True, inferSchema=True)
movies = spark.read.csv('movies.csv', header=True, inferSchema=True)

In [401]:
# Dataset transformations
import pyspark.sql.functions as F
from pyspark.sql.types import DateType

ratings = ratings.select('userId','movieId','rating','timestamp')
ratings = ratings.withColumnRenamed('userId','userid').withColumnRenamed('movieId','movieid')
ratings = ratings.withColumn('date', F.from_unixtime('timestamp').cast(DateType()))
ratings = ratings.drop('timestamp')

In [402]:
# Show the results
ratings.show(5)
ratings.printSchema()

+------+-------+------+----------+
|userid|movieid|rating|      date|
+------+-------+------+----------+
|     1|     31|   2.5|2009-12-14|
|     1|   1029|   3.0|2009-12-14|
|     1|   1061|   3.0|2009-12-14|
|     1|   1129|   2.0|2009-12-14|
|     1|   1172|   4.0|2009-12-14|
+------+-------+------+----------+
only showing top 5 rows

root
 |-- userid: integer (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- date: date (nullable = true)



In [403]:
# Dataset transformations
from pyspark.sql.types import IntegerType
movies = movies.select('id','original_title')
movies = movies.withColumn('id', movies['id'].cast(IntegerType()))

In [404]:
# Error ALS
ratings_movies_exist = ratings.select(ratings['movieid']).distinct()
movies = movies.join(ratings_movies_exist, movies.id==ratings_movies_exist.movieid, how='left')
movies = movies.filter(movies['movieid'].isNotNull()).drop(movies['movieid'])

In [405]:
# Show the results
movies.show(5)
movies.printSchema()

+----+--------------------+
|  id|      original_title|
+----+--------------------+
| 949|                Heat|
| 710|           GoldenEye|
|1408|    Cutthroat Island|
| 524|              Casino|
|4584|Sense and Sensibi...|
+----+--------------------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- original_title: string (nullable = true)



In [406]:
# Top rated movies with over 150 reviews 
from pyspark.sql.functions import  min, max, count, avg
ratings_with_avg = ratings.groupBy('movieid').agg(count('rating').alias('count'), \
                                                  avg('rating').alias('avg'))

In [407]:
# Show the results
ratings_with_avg.show(5)

+-------+-----+------------------+
|movieid|count|               avg|
+-------+-----+------------------+
|   1580|  190| 3.663157894736842|
|   2659|    3|               4.0|
|   3794|    5|               3.4|
|   3175|   65|3.5076923076923077|
|    471|   49| 3.877551020408163|
+-------+-----+------------------+
only showing top 5 rows



In [408]:
# Join movies
movies_rating = ratings_with_avg.join(movies,
                                      ratings_with_avg.movieid == movies.id,
                                      how='left'). \
                                      select('movieid','original_title','count','avg'). \
                                      withColumnRenamed('original_title','title')

movies_rating.show(5)

+-------+--------------------+-----+------------------+
|movieid|               title|count|               avg|
+-------+--------------------+-----+------------------+
|    148|The Secret Life o...|    1|               4.0|
|    463|                null|    7|3.4285714285714284|
|    471|             Bandyta|   49| 3.877551020408163|
|    496|Borat: Cultural L...|    3|2.6666666666666665|
|    833|          Umberto D.|    4|             2.625|
+-------+--------------------+-----+------------------+
only showing top 5 rows



In [409]:
# Filter and sort movies
movies_rating_top = movies_rating.filter(movies_rating['count']>=150).sort('avg',ascending =False)
movies_rating_top.show(10)

+-------+--------------------+-----+-----------------+
|movieid|               title|count|              avg|
+-------+--------------------+-----+-----------------+
|    858|Sleepless in Seattle|  200|           4.4875|
|    318|The Million Dolla...|  311|4.487138263665595|
|     50|                null|  201|4.370646766169155|
|    527|  Once Were Warriors|  244| 4.30327868852459|
|    608|     Men in Black II|  224|4.256696428571429|
|    296|Terminator 3: Ris...|  324|4.256172839506172|
|   2858|                null|  220|4.236363636363636|
|   1196|                null|  234|4.232905982905983|
|    260|        The 39 Steps|  291|4.221649484536083|
|   1197|                null|  163|4.208588957055214|
+-------+--------------------+-----+-----------------+
only showing top 10 rows



In [410]:
ratings = ratings.drop('date')
ratings.show(5)
print(ratings.count())

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
+------+-------+------+
only showing top 5 rows

100004


### ALS

In [411]:
from pyspark.mllib.recommendation import ALS
import math

In [412]:
ratings_train,ratings_test = ratings.randomSplit([0.8,0.2])

In [413]:
ratings_train.show(5)
ratings_test.show(5)

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
+------+-------+------+
only showing top 5 rows

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|     1|   1263|   2.0|
|     1|   1339|   3.5|
|     1|   1371|   2.5|
|     1|   1953|   4.0|
|     1|   3671|   3.0|
+------+-------+------+
only showing top 5 rows



In [414]:
print(ratings_train.count())
print(ratings_test.count())

79862
20142


In [415]:
iterations = 10
rank = 8
model = ALS.train(ratings_train, rank, iterations=iterations)

In [416]:
predictions = model.predictAll(ratings_test.select('userid','movieid').rdd)

In [417]:
predictions.take(5)

[Rating(user=558, product=1084, rating=4.624983411825044),
 Rating(user=96, product=1084, rating=3.668198081724305),
 Rating(user=472, product=1084, rating=4.538569484953629),
 Rating(user=436, product=1084, rating=4.1819722428804225),
 Rating(user=242, product=1084, rating=4.808922054204748)]

In [418]:
predictions_converted = predictions.map(lambda x: (x[0],x[1],x[2]))
predictions_df = predictions_converted.toDF(['userid','movieid','rating_pred'])

In [419]:
predictions_df.show()

+------+-------+------------------+
|userid|movieid|       rating_pred|
+------+-------+------------------+
|   558|   1084| 4.624983411825044|
|    96|   1084| 3.668198081724305|
|   472|   1084| 4.538569484953629|
|   436|   1084|4.1819722428804225|
|   242|   1084| 4.808922054204748|
|   311|   1084| 2.354858485898106|
|   195|   1084| 3.450496481990385|
|   605|   1084|3.6388471511325866|
|   387|   1084| 4.136729381469211|
|   551|   1084|2.1343503361412974|
|   499|   1084|4.0360097538426984|
|    65|   1084| 4.085830030580282|
|    99|   1084|3.7008005984499848|
|    67|   1084| 4.926155257333931|
|    97|   1084|3.9782619485581643|
|   468|   6400| 1.428061931337811|
|    34|   3702| 4.000833278512062|
|   220|   3702|3.4328240058791426|
|   564|   3702| 4.416042666040399|
|   139|   3702| 4.867633976334919|
+------+-------+------------------+
only showing top 20 rows



In [420]:
result = ratings_test.join(predictions_df, on=['userid','movieid'])
result.show()

+------+-------+------+------------------+
|userid|movieid|rating|       rating_pred|
+------+-------+------+------------------+
|   558|   1084|   5.0| 4.624983411825044|
|    96|   1084|   3.0| 3.668198081724305|
|   472|   1084|   4.0| 4.538569484953629|
|   436|   1084|   4.0|4.1819722428804225|
|   242|   1084|   5.0| 4.808922054204748|
|   311|   1084|   2.0| 2.354858485898106|
|   195|   1084|   4.0| 3.450496481990385|
|   605|   1084|   4.0|3.6388471511325866|
|   387|   1084|   5.0| 4.136729381469211|
|   551|   1084|   4.0|2.1343503361412974|
|   499|   1084|   5.0|4.0360097538426984|
|    65|   1084|   5.0| 4.085830030580282|
|    99|   1084|   4.0|3.7008005984499848|
|    67|   1084|   4.0| 4.926155257333931|
|    97|   1084|   2.5|3.9782619485581643|
|   468|   6400|   3.5| 1.428061931337811|
|    34|   3702|   5.0| 4.000833278512062|
|   220|   3702|   3.0|3.4328240058791426|
|   564|   3702|   3.0| 4.416042666040399|
|   139|   3702|   3.0| 4.867633976334919|
+------+---

In [421]:
result = result.withColumn('delta', F.abs(result.rating - result.rating_pred))
result.show()

+------+-------+------+------------------+-------------------+
|userid|movieid|rating|       rating_pred|              delta|
+------+-------+------+------------------+-------------------+
|   558|   1084|   5.0| 4.624983411825044| 0.3750165881749563|
|    96|   1084|   3.0| 3.668198081724305|  0.668198081724305|
|   472|   1084|   4.0| 4.538569484953629| 0.5385694849536291|
|   436|   1084|   4.0|4.1819722428804225|0.18197224288042246|
|   242|   1084|   5.0| 4.808922054204748|0.19107794579525184|
|   311|   1084|   2.0| 2.354858485898106|0.35485848589810587|
|   195|   1084|   4.0| 3.450496481990385| 0.5495035180096148|
|   605|   1084|   4.0|3.6388471511325866| 0.3611528488674134|
|   387|   1084|   5.0| 4.136729381469211| 0.8632706185307892|
|   551|   1084|   4.0|2.1343503361412974| 1.8656496638587026|
|   499|   1084|   5.0|4.0360097538426984| 0.9639902461573016|
|    65|   1084|   5.0| 4.085830030580282| 0.9141699694197181|
|    99|   1084|   4.0|3.7008005984499848|0.29919940155

In [422]:
result.agg({'delta':'mean'}).show()

+------------------+
|        avg(delta)|
+------------------+
|0.8317105338008836|
+------------------+



### ALS USER

In [423]:
user_id = 320
data_for_pred = ratings_train[ratings_train.userid==user_id]
data_for_pred.show()

+------+-------+------+
|userid|movieid|rating|
+------+-------+------+
|   320|    296|   4.5|
|   320|    318|   4.0|
|   320|    527|   4.0|
|   320|    541|   5.0|
|   320|    904|   4.5|
|   320|    908|   4.0|
|   320|    912|   5.0|
|   320|   1172|   5.0|
|   320|   1206|   4.5|
|   320|   1704|   4.0|
|   320|   2010|   4.0|
|   320|   2324|   3.5|
|   320|   2571|   4.5|
|   320|   2858|   3.0|
|   320|   3147|   3.5|
|   320|   3578|   3.5|
|   320|   4226|   4.0|
|   320|   4643|   3.5|
|   320|   4886|   3.5|
|   320|   4973|   3.5|
+------+-------+------+
only showing top 20 rows



In [424]:
movie_id = data_for_pred.rdd.map(lambda x:x[1])
movie_id.take(20)

[296,
 318,
 527,
 541,
 904,
 908,
 912,
 1172,
 1206,
 1704,
 2010,
 2324,
 2571,
 2858,
 3147,
 3578,
 4226,
 4643,
 4886,
 4973]

In [425]:
movie_id = movie_id.take(200)

In [426]:
user_movies = movies.rdd.filter(lambda x:x[0] not in movie_id).map(lambda y:(user_id,y[0]))
user_movies.take(10)

[(320, 949),
 (320, 710),
 (320, 1408),
 (320, 524),
 (320, 4584),
 (320, 5),
 (320, 8012),
 (320, 451),
 (320, 902),
 (320, 63)]

In [427]:
predictions_user = model.predictAll(user_movies)
predictions_user.take(10)

[Rating(user=320, product=7942, rating=2.049725852074794),
 Rating(user=320, product=5618, rating=4.476136537358549),
 Rating(user=320, product=1894, rating=1.3724223121207628),
 Rating(user=320, product=8906, rating=2.0235389425803625),
 Rating(user=320, product=140, rating=2.5355357082408965),
 Rating(user=320, product=33838, rating=2.1512940468125317),
 Rating(user=320, product=204, rating=2.084065118851246),
 Rating(user=320, product=956, rating=2.8257597816777187),
 Rating(user=320, product=4992, rating=0.569832751674737),
 Rating(user=320, product=2334, rating=3.561926202632327)]

In [428]:
predictions_user = predictions_user.toDF(['userid','movieid','rating_pred'])
predictions_user = predictions_user.sort('rating_pred',ascending =False)
predictions_user.show()

+------+-------+------------------+
|userid|movieid|       rating_pred|
+------+-------+------------------+
|   320|   2186| 5.727415016428609|
|   320|   2067| 5.573484296309421|
|   320|   2973| 5.555050284792588|
|   320|   2132| 5.554109862720418|
|   320|  25771|5.5331995482565866|
|   320|   2071| 5.498725113970246|
|   320|  94466| 5.488018753726225|
|   320|  85510| 5.421726568017368|
|   320|   3598| 5.375793936783239|
|   320|    680| 5.349823112761724|
|   320|   4326| 5.340106653221984|
|   320|    905|5.3283424977805645|
|   320|  84844| 5.291465138241649|
|   320|    363| 5.213062120012488|
|   320|   3011| 5.126686736831805|
|   320|   3686|  5.11990975565433|
|   320|   6978| 5.099135272321616|
|   320|   2731| 5.083483278314562|
|   320|   3989| 5.080804485843196|
|   320|    599| 5.075761328274296|
+------+-------+------------------+
only showing top 20 rows

