In [3]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=7360083370b74787f95621e9e1ce79827f994c4d71daa45c2e7420ebd0581a4a
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

#Load Data

In [4]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


In [113]:
spark = SparkSession.builder.appName('movielens').getOrCreate()

df_ratings = spark.read.csv('ratings.csv', inferSchema = True, header = True)
df_ratings = df_ratings.dropDuplicates()
df_ratings.show()
df_ratings.describe().show()


+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
|     1|   1968|   4.0|1425942148|
|     1|   2762|   4.5|1425941300|
|     1|   2918|   5.0|1425941593|
|     1|   2959|   4.0|1425941601|
|     1|   4226|   4.0|1425942228|
|     1|   4878|   5.0|1425941434|
|     1|   5577|   5.0|1425941397|
|     1|  33794|   4.0|1425942005|
|     1|  54503|   3.5|1425941313|
|     1|  58559|   4.0|1425942007|
|     1|  59315|   5.0|1425941502|
|     1|  68358|   5.0|1425941464|
|     1|  69844|   5.0|1425942139|
|     1|  73017|   5.0|1425942699|
|     1|  81834|   5.0|1425942133|
+------+-------+------+----------+
only showing top 20 rows

+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId| 

In [None]:
spark2 = SparkSession.builder.appName("movielens").getOrCreate()

df_movies = spark2.read.csv('movies_metadata.csv', inferSchema = True, header = True)
df_movies = df_movies.dropDuplicates()
df_movies.show()
df_movies.describe().show()

#Regression Models

In [115]:
(train,test) = df_ratings.randomSplit([0.8,0.2], seed=42)


als = ALS(maxIter=5, regParam=0.1, userCol='userId', itemCol='movieId',ratingCol='rating', coldStartStrategy='drop')

model = als.fit(train)

pred = model.transform(test)

pred.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|     1|    858|   5.0|1425941523| 4.1971498|
|     1|   2762|   4.5|1425941300| 3.9475875|
|     1|   2959|   4.0|1425941601|  4.572944|
|     1|  54503|   3.5|1425941313| 4.2623706|
|     1|  81834|   5.0|1425942133|   4.06699|
|     1|  96821|   5.0|1425941382|  4.579473|
|     3|    480|   3.0|1048076925| 2.6197872|
|     3|    527|   4.0|1048076900|  3.507081|
|     3|   1270|   3.0|1048076976| 2.8426297|
| 52224|  51662|   3.5|1292347002| 3.3230581|
| 52224|  60069|   4.5|1292346644| 3.6704571|
| 52224|  60546|   4.5|1292346916| 3.2129042|
| 52224|  79132|   2.5|1292346348|  3.781142|
|205600|  99114|   4.0|1486187642| 4.0379176|
|205601|     11|   3.0| 846046750| 3.3675423|
|205601|    289|   4.0| 846047705| 3.1375399|
|205601|    319|   4.0| 846047486| 3.3660438|
|205601|    337|   3.0| 846046807| 3.4401486|
|205601|    339|   4.0| 846046572|

In [13]:
eval = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

rmse = eval.evaluate(pred)

print(f'RMSE: {rmse}')

RMSE: 1.95949835


#Prediction of the top movies recommended for the User 1

Here is the prediction of the best rated movie by the User 1.


In [147]:
def user_i_recommendation(i,test):
  user_i = test.filter(test['userId'] == 1).select(['movieId','userId'])
  model_i = model.transform(user_i).orderBy('prediction',ascending=False)
  model_i.show(1)
  first_row = model_i.first()
  movieId_recommended = first_row.movieId
  return movieId_recommended


movieId_recommended = user_i_recommendation(1,test)


+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|  96821|     1|  4.579473|
+-------+------+----------+
only showing top 1 row



In [144]:
i = '1'
movie_recommended = df_movies.filter(df_movies['id'] == str(movieId_recommended)).select(['original_title'])
print('Best movie recommended to the User ' + i + ' :')
movie_recommended.show()

print('Official rating of the User ' + i +  ' for this movie :')
movie_rating1 = df_ratings.filter(df_ratings['movieId'] == str(movieId_recommended))
movie_rating = movie_rating1.filter(movie_rating1['userId'] == i).select(['rating'])
movie_rating.show()


Best movie recommended to the User 1 :
+------------------+
|    original_title|
+------------------+
|Cesare deve morire|
+------------------+

Official rating of the user1 for this movie :
+------+
|rating|
+------+
|   5.0|
+------+



#Top N Movies Similar

In [245]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans


data = df_movies.select('budget', 'vote_count','genres','title')
data = data.dropna()
# create the tokenizer object
tokenizer = Tokenizer(inputCol='genres', outputCol='words')

# transform the data
words_data = tokenizer.transform(data)

# create the count vectorizer object
cv = CountVectorizer(inputCol='words', outputCol='raw_features')

# fit the count vectorizer to the data
cv_model = cv.fit(words_data)

# transform the data
featurized_data = cv_model.transform(words_data)

# create the idf object
idf = IDF(inputCol="raw_features", outputCol="features")

# fit the idf to the data
idf_model = idf.fit(featurized_data)

# transform the data
rescaled_data = idf_model.transform(featurized_data)

# create the assembler object
assembler = VectorAssembler(inputCols=["features"], outputCol="features_norm")

# transform the data
output = assembler.transform(rescaled_data)

# build the kmeans model

kmeans = KMeans().setK(5).setSeed(1)

model = kmeans.fit(output)

# Make predictions
predictions = model.transform(output)



In [246]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Columns - production_companies , production_countries , genres have Json array values
schema = ArrayType(StructType([
        StructField('id', IntegerType(), nullable=False), 
        StructField('name', StringType(), nullable=False)]))


#values based on Json keys will  produce list
convertUDF = udf(lambda s: ','.join(map(str, s)),StringType())

#Json parsing
predictions=predictions.withColumn("genres_value",convertUDF(from_json(predictions.genres,schema).getField("name")))



Let's see for example the cluster 2 and see if movies in this cluster are related.

In [247]:
predictions.filter(predictions['prediction'] == 2).select(['genres_value','title']).show(3,False)

+---------------------------+------------+
|genres_value               |title       |
+---------------------------+------------+
|Action,Crime,Drama,Thriller|Heat        |
|Action,Adventure,Thriller  |Sudden Death|
|Adventure,Action,Thriller  |GoldenEye   |
+---------------------------+------------+
only showing top 3 rows



Let's try to get top 3 movies similar to the movie : Toy Story

In [248]:
movie_name = 'Toy Story'

suggestions = predictions.select("title", "prediction")
cluster_suggested = suggestions.filter(suggestions.title == movie_name).select("prediction").collect()[0][0]

movies_suggested = predictions.filter(predictions.prediction == cluster_suggested)

# show the similar movies
movies_suggested.selectExpr("*","row_number() over (ORDER BY 1) as row_num").select(['genres_value','title']).filter("row_num > 1").show(3, truncate=False, vertical=True)


-RECORD 0-------------------------------------
 genres_value | Adventure,Fantasy,Family      
 title        | Jumanji                       
-RECORD 1-------------------------------------
 genres_value | Action,Adventure,Drama,Family 
 title        | Tom and Huck                  
-RECORD 2-------------------------------------
 genres_value | Family,Animation,Adventure    
 title        | Balto                         
only showing top 3 rows

