### Importing libraries

In [56]:
import findspark
findspark.init()

In [57]:
import pyspark
from pyspark.sql import SparkSession

In [58]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row


### reading data

In [59]:
lines = spark.read.text("C:/Users/shiva/Desktop/ALS Training/SampleALS.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2])))


### creating dataframe

In [60]:
ratings = spark.createDataFrame(ratingsRDD)


In [61]:
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|      2|   3.0|
|     0|      3|   1.0|
|     0|      5|   2.0|
|     0|      9|   4.0|
|     0|     11|   1.0|
|     0|     12|   2.0|
|     0|     15|   1.0|
|     0|     17|   1.0|
|     0|     19|   1.0|
|     0|     21|   1.0|
|     0|     23|   1.0|
|     0|     26|   3.0|
|     0|     27|   1.0|
|     0|     28|   1.0|
|     0|     29|   1.0|
|     0|     30|   1.0|
|     0|     31|   1.0|
|     0|     34|   1.0|
|     0|     37|   1.0|
|     0|     41|   2.0|
+------+-------+------+
only showing top 20 rows



In [82]:
ratings.count()

1501

In [88]:
ratings.dtypes

[('userId', 'bigint'), ('movieId', 'bigint'), ('rating', 'double')]

### spliting the dataset 

In [62]:
(training, test) = ratings.randomSplit([0.8, 0.2])


### training the dataset & tuning parameters

In [65]:
ranks = [5,10,15]
regParams = [0.001, 0.01, 0.1]
alphas = [10, 20, 40]

for rank in ranks:
    for regParam in regParams:
            for alpha in alphas:
                als = ALS(maxIter=5,
                          rank = rank,
                          regParam=regParam,
                          alpha = alpha,
                          userCol="userId", 
                          itemCol="movieId", 
                          ratingCol="rating",
                          coldStartStrategy="drop")
                model = als.fit(training)
                predictions = model.transform(test)
                evaluator = RegressionEvaluator(metricName="rmse",
                                                labelCol="rating",
                                                predictionCol="prediction")
                rmse = evaluator.evaluate(predictions)
                print("rank: "+str(rank)+
                      ", regParam: "+str(regParam)+
                      ", alpha: "+str(alpha)+
                      ", RMSE: "+str(rmse))

# Evaluate the model by computing the RMSE on the test data


rank: 5, regParam: 0.001, alpha: 10, RMSE: 1.6750438968203014
rank: 5, regParam: 0.001, alpha: 20, RMSE: 1.6750438968203014
rank: 5, regParam: 0.001, alpha: 40, RMSE: 1.6750438968203014
rank: 5, regParam: 0.01, alpha: 10, RMSE: 1.4766701312695494
rank: 5, regParam: 0.01, alpha: 20, RMSE: 1.4766701312695494
rank: 5, regParam: 0.01, alpha: 40, RMSE: 1.4766701312695494
rank: 5, regParam: 0.1, alpha: 10, RMSE: 1.1692416503785417
rank: 5, regParam: 0.1, alpha: 20, RMSE: 1.1692416503785417
rank: 5, regParam: 0.1, alpha: 40, RMSE: 1.1692416503785417
rank: 10, regParam: 0.001, alpha: 10, RMSE: 2.9584184742057063
rank: 10, regParam: 0.001, alpha: 20, RMSE: 2.9584184742057063
rank: 10, regParam: 0.001, alpha: 40, RMSE: 2.9584184742057063
rank: 10, regParam: 0.01, alpha: 10, RMSE: 1.7276876965351917
rank: 10, regParam: 0.01, alpha: 20, RMSE: 1.7276876965351917
rank: 10, regParam: 0.01, alpha: 40, RMSE: 1.7276876965351917
rank: 10, regParam: 0.1, alpha: 10, RMSE: 1.1258976834667058
rank: 10, regPa

####### from the rmse values we can see

- change in alpha does not affect the error   /// remove alpha 
- increase in regParam decrease the error
- larger rank with larger regparam decrease the error


In [81]:
als = ALS(maxIter=5,rank = 15,
          regParam=0.2,userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Best RMSE: "+str(rmse))

Best RMSE: 1.127190625157611


### recomending

In [84]:
userRecs = model.recommendForAllUsers(3)  #### recomending 3 movies to users
userRecs.show()


+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[{49, 3.0045063},...|
|    26|[{22, 4.1243496},...|
|    27|[{23, 2.5396621},...|
|    12|[{46, 3.941747}, ...|
|    22|[{75, 3.9501064},...|
|     1|[{22, 2.7746298},...|
|    13|[{52, 2.3649578},...|
|     6|[{25, 2.8013906},...|
|    16|[{90, 3.699558}, ...|
|     3|[{30, 3.3703048},...|
|    20|[{22, 3.3006942},...|
|     5|[{46, 3.5123692},...|
|    19|[{90, 2.8197718},...|
|    15|[{46, 3.336104}, ...|
|    17|[{46, 4.1951275},...|
|     9|[{49, 3.232447}, ...|
|     4|[{52, 3.0084107},...|
|     8|[{52, 4.0076814},...|
|    23|[{46, 4.254199}, ...|
|     7|[{52, 2.823106}, ...|
+------+--------------------+
only showing top 20 rows



In [100]:
#Results = spark.createDataFrame(userRecs)
#userRecs.write.csv("ALS Sample results")
userRecs.toPandas().to_excel('mycsv.csv')