<a href="https://colab.research.google.com/github/endrisbezawit/Movie-RS-based-on-Collaborative-Filtering/blob/main/ALS_for_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 40.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=ae075dc3ea952fc1d2bb2df45859f4ca8345aef248100b082a0b40b2a088664b
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


# New Section

In [None]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

In [None]:
from pyspark.sql import SparkSession
sc = SparkContext
spark = SparkSession.builder.appName('Recommendations').getOrCreate()


In [None]:
# Mounting drive to google colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# path config
# /content/drive/MyDrive/ml-latest
data_path = '/content/drive/MyDrive/'

In [None]:
movies = spark.read.csv('movies.csv', header=True)
ratings = spark.read.csv('ratings.csv',  header=True)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
ratings = ratings.drop('Timestamp')
ratings = ratings.withColumn('UserID', col('UserID').cast('integer'))
ratings = ratings.withColumn('MovieID', col('MovieID').cast('integer'))
ratings = ratings.withColumn('Rating', col('rating').cast('float'))
ratings.limit(10).show()

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
+------+-------+------+



In [None]:
numerator = ratings.select("Rating").count()

# Count the number of distinct userIds and distinct movieIds
unique_users = ratings.select("UserID").distinct().count()
unique_movies = ratings.select("MovieID").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = unique_users * unique_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings data is ", "%.2f" % sparsity + "% empty.")

The ratings data is  98.30% empty.


In [None]:
# Group data by userId, count ratings
UserID_pivot = ratings.groupBy("UserID").count().orderBy('count', ascending=False)
UserID_pivot.limit(10).show()
# Group data by userId, count ratings
MovieID_pivot = ratings.groupBy("MovieID").count().orderBy('count', ascending=False)
MovieID_pivot.limit(10).show()

+------+-----+
|UserID|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
+------+-----+

+-------+-----+
|MovieID|count|
+-------+-----+
|    356|  329|
|    318|  317|
|    296|  307|
|    593|  279|
|   2571|  278|
|    260|  251|
|    480|  238|
|    110|  237|
|    589|  224|
|    527|  220|
+-------+-----+



Implementing ALS(Alternating Least Square) algorithm in Spark

In [None]:
# Import the required functions

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="UserID", 
          itemCol="MovieID",
          ratingCol="Rating",
          nonnegative = True,
          implicitPrefs = False,
          coldStartStrategy="drop")

In [None]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [None]:
# Build cross validation using CrossValidator
# numFolds=3 means the CrossValidator will create 3 different models.
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2)

In [None]:
# We fit the cross validator to the 'train' dataset
#model = cv.fit(train)
best_model=als.fit(train)

# We Extract best model from the cv model above
#best_model = model.bestModel

In [None]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 10
  MaxIter: 10
  RegParam: 0.1


In [None]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8792346230792731


In [None]:
test_predictions.show()

+------+-------+------+----------+
|UserID|MovieID|Rating|prediction|
+------+-------+------+----------+
|   580|   1580|   4.0| 3.4834278|
|   580|  44022|   3.5|  3.737207|
|   597|    471|   2.0| 4.2806745|
|   108|   1959|   5.0| 4.2783747|
|   368|   2122|   2.0|  1.794088|
|   436|    471|   3.0|  3.857335|
|   587|   1580|   4.0|  3.865708|
|    27|   1580|   3.0| 3.4556212|
|   606|   1580|   2.5|  3.074989|
|   606|  44022|   4.0| 2.5122876|
|    91|   2122|   4.0|  2.505557|
|   157|   3175|   2.0| 3.5739088|
|   232|   1580|   3.5| 3.5226269|
|   232|  44022|   3.0|    3.3277|
|   246|   1645|   4.0| 4.0916376|
|   599|   2366|   3.0| 2.8905628|
|   111|   1088|   3.0| 3.3892274|
|   111|   3175|   3.5| 2.7912552|
|    47|   1580|   1.5| 2.6379178|
|   140|   1580|   3.0| 3.3501647|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def somefunc1(value1):
  if   value1<3: 
      return 0
  else:
      return 1


def somefunc2(value1,value2):
  if   value1 == value2: 
      return 1
  else:
      return 0      
#convert to a UDF Function by passing in the function and return type of function

udfsomefunc1 = F.udf(somefunc1, IntegerType())
udfsomefunc2 = F.udf(somefunc2, IntegerType())
ratings_1 = test_predictions.withColumn("Rating_binary", udfsomefunc1("Rating"))
ratings_2 = ratings_1.withColumn("predictions_binary", udfsomefunc1("prediction"))
ratings_with_high_low = ratings_2.withColumn("Truth", udfsomefunc2("Rating_binary","predictions_binary"))
ratings_with_high_low.show()
# ratings_1.show()


+------+-------+------+----------+-------------+------------------+-----+
|UserID|MovieID|Rating|prediction|Rating_binary|predictions_binary|Truth|
+------+-------+------+----------+-------------+------------------+-----+
|   148|    356|   4.0|  3.462273|            1|                 1|    1|
|   148|   4896|   4.0| 3.6566792|            1|                 1|    1|
|   148|   4993|   3.0| 3.4139667|            1|                 1|    1|
|   148|   7153|   3.0| 3.3469503|            1|                 1|    1|
|   148|   8368|   4.0| 3.6763072|            1|                 1|    1|
|   148|  40629|   5.0| 2.9380376|            1|                 0|    0|
|   148|  50872|   3.0| 3.7552364|            1|                 1|    1|
|   148|  60069|   4.5| 3.8022726|            1|                 1|    1|
|   148|  69757|   3.5|  3.026741|            1|                 1|    1|
|   148|  72998|   4.0|  3.359804|            1|                 1|    1|
|   148|  81847|   4.5|  3.303099|    

In [None]:
Perf_values = ratings_with_high_low.groupBy("Truth").count().orderBy('count', ascending=False)
Perf_values.limit(6).show()

+-----+-----+
|Truth|count|
+-----+-----+
|    1|14939|
|    0| 4274|
+-----+-----+



In [None]:
a=list(Perf_values.select('Truth').toPandas()['Truth']) 
b=list(Perf_values.select('count').toPandas()['count'])

print(a)
print(b)

[1, 0]
[14939, 4274]


In [None]:
accuracy=b[0]/(b[0]+b[1])
print(accuracy*100)

77.75464529225003


In [None]:
from sklearn.metrics import mean_squared_error
from math import sqrt

#rms = sqrt(mean_squared_error('Rating', 'prediction'))

**Recommending Movies**

The final part of our code comes i.e. predicting the best movies for the user based on personalized choice and recommending the movies to the user.

In [None]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(10)
recommendations.limit(10).show()



+------+--------------------+
|UserID|     recommendations|
+------+--------------------+
|     1|[{8477, 6.027934}...|
|     3|[{70946, 5.251848...|
|     5|[{58301, 4.971614...|
|     6|[{3925, 5.471189}...|
|     9|[{3379, 5.2894783...|
|    12|[{3925, 6.1210833...|
|    13|[{3379, 5.4368963...|
|    15|[{27611, 5.272330...|
|    16|[{3379, 4.665858}...|
|    17|[{3379, 5.2830396...|
+------+--------------------+



In [None]:
recommendations = recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

ratings.join(movies, on='MovieID').filter('UserId = 50').sort('Rating', ascending=False).limit(10).show()

In [None]:
recommendations.join(movies, on='MovieID').filter('UserID = 50').show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|  96004|    50|4.0556064|Dragon Ball Z: Th...|Action|Adventure|...|
|   7982|    50|3.9035714|Tale of Two Siste...|Drama|Horror|Myst...|
|   3379|    50|3.8740618| On the Beach (1959)|               Drama|
|   3030|    50|3.8706512|      Yojimbo (1961)|    Action|Adventure|
|  27156|    50|3.8205142|Neon Genesis Evan...|Action|Animation|...|
|   7767|    50| 3.812018|Best of Youth, Th...|               Drama|
|    923|    50|3.8014812| Citizen Kane (1941)|       Drama|Mystery|
|   7096|    50|3.7968893|Rivers and Tides ...|         Documentary|
|  92475|    50|3.7956157|All Watched Over ...|         Documentary|
|   6460|    50|3.7923243|Trial, The (Procè...|               Drama|
+-------+------+---------+--------------------+--------------------+



In [None]:
single_user=test.filter(test['UserID']==20).select(['MovieID','UserID','Rating'])

single_user.show(5)

+-------+------+------+
|MovieID|UserID|Rating|
+-------+------+------+
|     13|    20|   4.0|
|    364|    20|   5.0|
|    531|    20|   4.5|
|    551|    20|   5.0|
|    783|    20|   3.5|
+-------+------+------+
only showing top 5 rows



In [None]:
reccomendations = best_model.transform(single_user)
reccomendations.sort('prediction',ascen=True).show(10)

+-------+------+------+----------+
|MovieID|UserID|Rating|prediction|
+-------+------+------+----------+
|   1489|    20|   4.0| 2.0797367|
|   4369|    20|   2.0| 2.2084951|
|   5313|    20|   0.5| 2.3399696|
|   4367|    20|   0.5| 2.4314096|
|   1021|    20|   2.0| 2.4782176|
|   2116|    20|   2.5|  2.568874|
|   2167|    20|   2.0| 2.6475766|
|   5388|    20|   4.0|  2.786439|
|   3438|    20|   4.0| 3.0330524|
|   3755|    20|   4.5|  3.038228|
+-------+------+------+----------+
only showing top 10 rows



In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",predictionCol="prediction")
rmse = evaluator.evaluate(test_predictions)
print("Root-mean-square error = " + str(rmse))


evaluator = RegressionEvaluator(metricName="r2", labelCol="Rating",predictionCol="prediction")
r2 = evaluator.evaluate(test_predictions)  
print("r2= " + str(r2))

evaluator = RegressionEvaluator(metricName="mae", labelCol="Rating",predictionCol="prediction")
mae = evaluator.evaluate(test_predictions)
print("mean absolute error = " + str(mae))

Root-mean-square error = 0.8779523470349659
r2= 0.2779400594715987
mean absolute error = 0.6769390148733815


Reference  

https://colab.research.google.com/github/asifahmed90/pyspark-ML-in-Colab/blob/master/PySpark_Regression_Analysis.ipynb#scrollTo=lh5NCoc8fsSO


https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html?highlight=crossvalidator 

