In [None]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [None]:
# Set access keys for S3 bucket.
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", accessKeyId)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", secretAccessKey)

# Paths. Change PATH_DATASET to ml-latest/ to get the larger dataset.
PATH_BUCKET = 's3n://orvarsbucket/'
PATH_DATASET = 'ml-latest-small/'

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create Spark SQL context.
sql_context = SQLContext(sc)

# Read links.csv
filename = 'links.csv'
links_schema = StructType([ \
    StructField("movieId", StringType(), True), \
    StructField("imdbId", StringType(), True), \
    StructField("tmdbId", StringType(), True), \
])

links_rdd = sql_context.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load(PATH_BUCKET + PATH_DATASET + filename, schema=links_schema).rdd

#links_df.cache()
print 'Loaded ' + str(links_rdd.count()) + ' entries from ' + filename + '\n'
    
# Read movies.csv
filename = 'movies.csv'
movies_schema = StructType([ \
    StructField("movieId", StringType(), True), \
    StructField("title", StringType(), True), \
    StructField("genres", StringType(), True), \
])

movies_rdd = sql_context.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load(PATH_BUCKET + PATH_DATASET + filename, schema=movies_schema).rdd

#movies_df.cache()
print 'Loaded ' + str(movies_rdd.count()) + ' entries from ' + filename + '\n'
    
# Read ratings.csv
filename = 'ratings.csv'
ratings_schema = StructType([ \
    StructField("userId", StringType(), True), \
    StructField("movieId", StringType(), True), \
    StructField("rating", FloatType(), True), \
    StructField("timestamp", IntegerType(), True), \
])

ratings_rdd = sql_context.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load(PATH_BUCKET + PATH_DATASET + filename, schema=ratings_schema).rdd

#ratings_df.cache()
print 'Loaded ' + str(ratings_rdd.count()) + ' entries from ' + filename + '\n'
    
# Read tags.csv
filename = 'tags.csv'
tags_schema = StructType([ \
    StructField("userId", StringType(), True), \
    StructField("movieId", StringType(), True), \
    StructField("tag", StringType(), True), \
    StructField("timestamp", IntegerType(), True), \
])

tags_rdd = sql_context.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load(PATH_BUCKET + PATH_DATASET + filename, schema=tags_schema).rdd

#tags_df.cache()
print 'Loaded ' + str(tags_rdd.count()) + ' entries from ' + filename + '\n'

In [None]:
movies_rdd.take(1)[0]

In [None]:
ratings_rdd.take(1)[0]

## Convert to expected data structure

In [None]:
ratings = ratings_rdd.map(lambda row: Rating(int(row['userId']), int(row['movieId']), float(row['rating'])))

In [None]:
# IF FROM OTHER SOURCE!!!
# ratings = ratings_raw.zipWithIndex()\
# .filter(lambda kv: kv[1] > 0 and kv[1] < 100000)\
# .keys()\
# .map(lambda line: line.split(','))\
# .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

In [None]:
ratings.first()

# Train Model

In [None]:
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)


In [None]:
model.userFeatures().take(5)

# Evaluate Model

In [None]:
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))