### Title: Build a recommendation system using movielens dataset  
### Dataset: https://grouplens.org/datasets/movielens/

In [1]:
movelines100kURL = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [2]:
import os
import urllib.request
import zipfile

In [3]:
movelines100kPath = os.path.join('data', 'ml-latest-small.zip')
urllib.request.urlretrieve(movelines100kURL, movelines100kPath)

('data/ml-latest-small.zip', <http.client.HTTPMessage at 0x7fd3c99b9438>)

In [4]:
with zipfile.ZipFile(movelines100kPath, "r") as z:
    z.extractall('data')

## Loading data into RDD

In [5]:
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [6]:
movelines100kRatingFile = os.path.join('data', 'ml-latest-small', 'ratings.csv')
movelines100kRatingData = spark.read.text(movelines100kRatingFile).rdd

# Print  lines of Data
header = movelines100kRatingData.take(1)[0]
movelines100kRatingData.take(10)

[Row(value='userId,movieId,rating,timestamp'),
 Row(value='1,31,2.5,1260759144'),
 Row(value='1,1029,3.0,1260759179'),
 Row(value='1,1061,3.0,1260759182'),
 Row(value='1,1129,2.0,1260759185'),
 Row(value='1,1172,4.0,1260759205'),
 Row(value='1,1263,2.0,1260759151'),
 Row(value='1,1287,2.0,1260759187'),
 Row(value='1,1293,2.0,1260759148'),
 Row(value='1,1339,3.5,1260759125')]

In [7]:
# Remove the header and extract user,movie and rating columns

ratingsRDD = movelines100kRatingData.filter(lambda line : line != header)\
                    .map(lambda line: line.value.split(","))\
                    .map(lambda features: Row(userId=int(features[0]), movieId=int(features[1]),
                                     rating=float(features[2])))

In [8]:
dataDf = spark.createDataFrame(ratingsRDD)

## Build a recommendation system

In [9]:
(X_train, X_test) = dataDf.randomSplit([0.8, 0.2])

In [10]:
alsModel = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
alsTrainedModel = alsModel.fit(X_train)

In [11]:
preds = alsTrainedModel.transform(X_test)

In [12]:
preds = preds.na.drop()

In [13]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [14]:
rmse = evaluator.evaluate(preds)
print("Root mean squared error = " + str(rmse))

Root mean squared error = 1.1154485452326366
