In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, mean, udf, lit, current_timestamp, unix_timestamp, array_contains
#from pyspark.sql.types import IntegerType, FloatType

import pandas as pd
import math
import mlflow
import os

In [2]:
# instantiate the Spark session

from pyspark.conf import SparkConf

sparkConf = SparkConf()
sparkConf.setAppName("My app").set("spark.jars", "/home/avani/UMass/Fall 2022/CS 532/Project/postgresql-42.5.0.jar")
sparkConf.set("spark.dynamicAllocation.enabled", "true")
sparkConf.set("spark.executor.cores", 8)
sparkConf.set("spark.dynamicAllocation.minExecutors","1")
sparkConf.set("spark.dynamicAllocation.maxExecutors","5000")
sparkConf.set("spark.executor.memory", "32g")
sparkConf.set("spark.ui.port","4050")
sparkConf.set("spark.memory.fraction", 0.7)

spark = SparkSession.builder.master('local[*]').config(conf=sparkConf).getOrCreate()

22/12/03 04:17:47 WARN Utils: Your hostname, avani-HP resolves to a loopback address: 127.0.1.1; using 192.168.0.9 instead (on interface wlo1)
22/12/03 04:17:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/12/03 04:17:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
#tables_list = ['movies', 'ratings', 'tags', 'links', 'genome_tags', 'genome_scores']

tables_list = ['movies', 'ratings', 'tags', 'links']

dataframeList = {} #defaultdict(None)

for table in tables_list:

    if table == 'ratings':
        dataframeList[table] = spark.read.format("jdbc"). \
                                options(
                                 url = 'jdbc:postgresql://localhost:5432/movielens_dataset', # using jdbc:postgresql://<host>:<port>/<database>
                                 dbtable = table + "_new",
                                 user = 'postgres',
                                 password = 'postgres',
                                 driver = 'org.postgresql.Driver',
                                 fetchSize = 1000,
                                 partitionColumn = "userId",
                                 lowerBound = 1,
                                 upperBound = 283228,
                                 numPartitions = 32).\
                                load()
    else:
        dataframeList[table] = spark.read.format("jdbc"). \
                                options(
                                 url = 'jdbc:postgresql://localhost:5432/movielens_dataset', # using jdbc:postgresql://<host>:<port>/<database>
                                 dbtable = table + "_new",
                                 user = 'postgres',
                                 password = 'postgres',
                                 driver = 'org.postgresql.Driver').\
                                load()

In [4]:
for key, value in dataframeList.items():
    print(key + " table")
    print(value.printSchema())

movies table
root
 |-- movieid: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

None
ratings table
root
 |-- userid: integer (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- rating: decimal(38,18) (nullable = true)
 |-- timestamp: integer (nullable = true)

None
tags table
root
 |-- userid: integer (nullable = true)
 |-- movieid: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)

None
links table
root
 |-- movieid: integer (nullable = true)
 |-- imdbid: integer (nullable = true)
 |-- tmdbid: integer (nullable = true)

None


In [5]:
movies_df = dataframeList['movies']
ratings_df = dataframeList['ratings']
links_df = dataframeList['links']
tags_df = dataframeList['tags']

### Show the dataframes and make the lifetime of dataframes sames as spark session

In [6]:
movies_df.show(20)
movies_df.createOrReplaceTempView("movies_df")

[Stage 0:>                                                          (0 + 1) / 1]

+-------+--------------------+--------------------+
|movieid|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

                                                                                

In [7]:
ratings_df.show(20)
ratings_df.createOrReplaceTempView("ratings_df")

+------+-------+--------------------+---------+
|userid|movieid|              rating|timestamp|
+------+-------+--------------------+---------+
|     1|      1|4.000000000000000000|964982703|
|     1|      3|4.000000000000000000|964981247|
|     1|      6|4.000000000000000000|964982224|
|     1|     47|5.000000000000000000|964983815|
|     1|     50|5.000000000000000000|964982931|
|     1|     70|3.000000000000000000|964982400|
|     1|    101|5.000000000000000000|964980868|
|     1|    110|4.000000000000000000|964982176|
|     1|    151|5.000000000000000000|964984041|
|     1|    157|5.000000000000000000|964984100|
|     1|    163|5.000000000000000000|964983650|
|     1|    216|5.000000000000000000|964981208|
|     1|    223|3.000000000000000000|964980985|
|     1|    231|5.000000000000000000|964981179|
|     1|    235|4.000000000000000000|964980908|
|     1|    260|5.000000000000000000|964981680|
|     1|    296|3.000000000000000000|964982967|
|     1|    316|3.000000000000000000|964

In [8]:
links_df.show(20)
links_df.createOrReplaceTempView("links_df")

+-------+------+------+
|movieid|imdbid|tmdbid|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
|      6|113277|   949|
|      7|114319| 11860|
|      8|112302| 45325|
|      9|114576|  9091|
|     10|113189|   710|
|     11|112346|  9087|
|     12|112896| 12110|
|     13|112453| 21032|
|     14|113987| 10858|
|     15|112760|  1408|
|     16|112641|   524|
|     17|114388|  4584|
|     18|113101|     5|
|     19|112281|  9273|
|     20|113845| 11517|
+-------+------+------+
only showing top 20 rows



In [9]:
tags_df.show(20)
tags_df.createOrReplaceTempView("tags_df")

+------+-------+-----------------+----------+
|userid|movieid|              tag| timestamp|
+------+-------+-----------------+----------+
|     2|  60756|            funny|1445714994|
|     2|  60756|  Highly quotable|1445714996|
|     2|  60756|     will ferrell|1445714992|
|     2|  89774|     Boxing story|1445715207|
|     2|  89774|              MMA|1445715200|
|     2|  89774|        Tom Hardy|1445715205|
|     2| 106782|            drugs|1445715054|
|     2| 106782|Leonardo DiCaprio|1445715051|
|     2| 106782|  Martin Scorsese|1445715056|
|     7|  48516|     way too long|1169687325|
|    18|    431|        Al Pacino|1462138765|
|    18|    431|         gangster|1462138749|
|    18|    431|            mafia|1462138755|
|    18|   1221|        Al Pacino|1461699306|
|    18|   1221|            Mafia|1461699303|
|    18|   5995|        holocaust|1455735472|
|    18|   5995|       true story|1455735479|
|    18|  44665|     twist ending|1456948283|
|    18|  52604|  Anthony Hopkins|

### Registering the dataframes to spark 

In [10]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")



### Analyse the data

In [11]:
minRating_1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
minRating_2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()

print('Minimum number of ratings per user: {}'.format(minRating_1))
print('Minimum number of ratings per movie: {}'.format(minRating_2))

                                                                                

Minimum number of ratings per user: 20
Minimum number of ratings per movie: 1


In [12]:
_rating1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
_total = ratings_df.select('movieId').distinct().count()

print('movies are rated by only one user: {} out of {} '.format(_rating1, _total))

movies are rated by only one user: 3446 out of 9724 


In [13]:
# number of distinct users
num_users = spark.sql("SELECT count (distinct userID) as num_users FROM ratings")
ratings_df.select("userId").distinct().count()

610

In [14]:
# number of movies
num_movies = spark.sql("SELECT count (distinct movieID) as num_movies FROM movies")
print(movies_df.select('movieID').distinct().count())

9742


In [15]:
rated_by_users = ratings_df.select('movieID').distinct().count()
print('Total Number of movies rated by users:', rated_by_users)

Total Number of movies rated by users: 9724


In [16]:
# movie genres
spark.sql("SELECT DISTINCT(genres) FROM movies LIMIT 10").show()

+--------------------+
|              genres|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Action|Animation|...|
|Animation|Childre...|
|Action|Adventure|...|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|Adventure|Childre...|
+--------------------+



In [17]:
extract_genres = udf(lambda x: x.split("|"), ArrayType(StringType()))
movies_df_clean = movies_df.select("movieId", "title", extract_genres("genres").alias("genres"))

movies_df_clean.createOrReplaceTempView("movies_df_clean")

display (spark.sql("SELECT * FROM movies_df_clean limit 5"))

DataFrame[movieId: int, title: string, genres: array<string>]

In [18]:
movies_df_clean.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|[Adventure, Anima...|
|      2|      Jumanji (1995)|[Adventure, Child...|
|      3|Grumpier Old Men ...|   [Comedy, Romance]|
|      4|Waiting to Exhale...|[Comedy, Drama, R...|
|      5|Father of the Bri...|            [Comedy]|
+-------+--------------------+--------------------+
only showing top 5 rows



In [19]:
# All movie categories
genres_result = list(set(movies_df_clean.select('genres').rdd.flatMap(tuple).flatMap(tuple).collect()))
genres_result

[Stage 41:>                                                         (0 + 1) / 1]                                                                                

['IMAX',
 'War',
 'Drama',
 'Action',
 'Crime',
 'Film-Noir',
 'Western',
 'Fantasy',
 'Documentary',
 'Sci-Fi',
 'Musical',
 'Thriller',
 'Comedy',
 '(no genres listed)',
 'Animation',
 'Mystery',
 'Adventure',
 'Children',
 'Horror',
 'Romance']

In [20]:
movie_pdf = movies_df.toPandas()
list_of_movie = list(movie_pdf['title'])

## Recommender

In [21]:
# Data type convert
movie_ratings = ratings_df.drop('timestamp')

movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [22]:
movie_ratings.show(20)
movie_ratings.createOrReplaceTempView("movie_ratings")
display (spark.sql("SELECT * FROM movie_ratings limit 10"))

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



DataFrame[userId: int, movieId: int, rating: float]

In [23]:
movie_rating_sample = movie_ratings.sample(False, 1/500)
movie_rating_sample.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     6|    378|   3.0|
|     6|    667|   3.0|
|    21|  59369|   3.0|
|    23|    334|   4.0|
|    28|   2706|   3.0|
|    28|  33166|   2.0|
|    28|  34405|   4.5|
|    33|      7|   1.0|
|    37|    661|   3.0|
|    42|    415|   2.0|
|    42|   1667|   2.0|
|    42|   2352|   4.0|
|    45|   2762|   5.0|
|    51|    858|   3.5|
|    64|     25|   3.5|
|    66|     18|   4.0|
|    68|   1884|   2.5|
|    68|   5419|   4.5|
|    68|  55232|   3.0|
|    73|  58047|   4.5|
+------+-------+------+
only showing top 20 rows



In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [31]:
# Create ALS model

als = ALS(
         userCol="userId", 
         itemCol="movieId",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

In [32]:
(trainData, testData) = movie_ratings.randomSplit([0.8,0.2])

In [37]:
paramGrid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .addGrid(als.maxIter, [15]) \
            .build()

[Stage 3484:>              (0 + 8) / 10][Stage 3527:>              (0 + 0) / 10]

In [38]:
evaluator = RegressionEvaluator(
               metricName="rmse", 
               labelCol="rating", 
               predictionCol="prediction")

print ("Num models to be tested: ", len(paramGrid))

Num models to be tested:  16




In [39]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [40]:
cvModel = cv.fit(trainData)

22/12/03 04:21:57 WARN CacheManager: Asked to cache already cached data.
22/12/03 04:21:57 WARN CacheManager: Asked to cache already cached data.


                                                                                

In [41]:
bestALSModel = cvModel.bestModel

testPredictions = bestALSModel.transform(testData)

rmse = evaluator.evaluate(testPredictions)

print(rmse)

                                                                                

0.8711823065385077


In [42]:
print ("Best Model Parameters")
print ("Rank: ", bestALSModel._java_obj.parent().getRank())
print ("MaxIter: ", str(bestALSModel._java_obj.parent().getMaxIter()))
print ("RegParam:",  bestALSModel._java_obj.parent().getRegParam())

Best Model Parameters
Rank:  50
MaxIter:  15
RegParam: 0.15
