In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz && rm spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
v = [1, 2, 3]
v_square = sc.parallelize([1, 2, 3]).map(lambda x : x**2).collect()

print("Vector: {0}, Vector square: {1}". format(v, v_square))

Vector: [1, 2, 3], Vector square: [1, 4, 9]


In [None]:
from google.colab import drive
drive.mount('/content/drive')
dataSetFolder = '/content/drive/My Drive/movies_data'

Mounted at /content/drive


In [None]:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recnn').getOrCreate()

In [None]:
data = spark.read.csv(dataSetFolder+'/filtered.csv',inferSchema=True,header=True)

In [None]:
data.head()

Row(movieId=949, userId=23, rating=3.5)

In [None]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            userId|            rating|
+-------+------------------+------------------+------------------+
|  count|             32166|             32166|             32166|
|   mean|2138.0610271715477| 345.8573649194802|3.5854784555120314|
| stddev|6162.8237020664155|195.05404031751385|1.0469430526673154|
|    min|                 2|                 1|               0.5|
|    max|            134368|               671|               5.0|
+-------+------------------+------------------+------------------+



In [None]:
(training, test) = data.randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

params = ParamGridBuilder().addGrid(als.rank,[12,13,14]).addGrid(als.maxIter,[18,19,20]).addGrid(als.regParam,[.17,.18,.19]).build()
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy='drop',nonnegative=True)



In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")

In [None]:
tv = TrainValidationSplit(estimator=als,estimatorParamMaps=params,evaluator=evaluator)

In [None]:
model = tv.fit(training)

In [None]:
bestModel = model.bestModel

In [None]:
predictions = bestModel.transform(test)
predictions.show()

+-------+------+------+----------+
|movieId|userId|rating|prediction|
+-------+------+------+----------+
|    496|   509|   3.0| 1.9897484|
|    833|   247|   3.0| 1.9508841|
|   1088|    15|   2.0| 2.3041434|
|   1088|   358|   3.0| 3.2161005|
|   1088|   547|   5.0| 3.1296427|
|   1088|   564|   2.0| 2.9442132|
|   1088|   387|   4.0| 3.4349847|
|   1088|   514|   3.0| 2.6347096|
|   1088|    97|   2.0| 1.2294201|
|   1088|    95|   5.0| 3.5982609|
|   1088|   370|   3.0| 3.8341415|
|   1088|   344|   3.0| 3.0364914|
|   1580|   137|   5.0| 3.5314248|
|   1580|    53|   3.0|    3.7748|
|   1580|   596|   3.0| 3.6478035|
|   1580|   384|   4.0| 3.5806534|
|   1580|   159|   3.5| 3.3416882|
|   1580|   185|   4.0| 3.7207062|
|   1580|   305|   3.0| 2.4768317|
|   1580|   603|   5.0| 4.1032786|
+-------+------+------+----------+
only showing top 20 rows



In [None]:
p = predictions.dropna(subset=('prediction'))

In [None]:
p.show()

+-------+------+------+----------+
|movieId|userId|rating|prediction|
+-------+------+------+----------+
|    833|   296|   4.5| 1.8338069|
|   1088|   128|   5.0| 4.3219113|
|   1088|   363|   2.0|  4.529824|
|   1088|   547|   5.0| 3.1482077|
|   1088|   564|   2.0| 3.0628922|
|   1088|   418|   5.0| 3.9763205|
|   1088|   200|   1.0| 2.7869658|
|   1088|   344|   3.0| 2.3649516|
|   1088|   294|   3.5| 3.9629717|
|   1088|   311|   5.0| 3.2683938|
|   1088|   187|   4.0| 3.7908418|
|   1580|   623|   3.5| 3.8105679|
|   1580|   155|   5.0|  2.586903|
|   1580|    34|   3.0|  3.865033|
|   1580|   384|   4.0| 3.8878832|
|   1580|   159|   3.5|  3.765738|
|   1580|   103|   3.0| 3.4589183|
|   1580|   388|   3.0| 3.5019011|
|   1580|   417|   4.0|  3.367466|
|   1580|    93|   3.5| 3.5183764|
+-------+------+------+----------+
only showing top 20 rows



In [None]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9258067559908534


In [None]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])

In [None]:
reccomendations = model.transform(single_user)
reccomendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    296|    11| 4.4688263|
|    923|    11| 3.7383432|
+-------+------+----------+

