In [74]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import Rating

In [75]:
CORES_PER_NODE = 2
NUM_WORKERS = 3
REP_FACTOR = 4

In [76]:
# Read in the ratings file (fromUserId, toUserId, rating).  These ratings are 0-9.
ratings_raw_DF = sqlCtx.read.format("com.databricks.spark.csv") \
                       .options(header="false") \
                       .load("s3n://insight-spark-after-dark/ratings.csv.gz") \
                       .repartition(CORES_PER_NODE*NUM_WORKERS*REP_FACTOR)

In [77]:
# Register the ratings_raw DataFrame as a temp table
ratings_raw_DF.registerTempTable("ratings_tbl")

In [78]:
ratings_raw_DF.count()

17359346

In [79]:
ratings_raw_DF.take(5)

[Row(C0=u'1', C1=u'720', C2=u'6'),
 Row(C0=u'1', C1=u'13895', C2=u'5'),
 Row(C0=u'1', C1=u'22691', C2=u'1'),
 Row(C0=u'1', C1=u'37184', C2=u'9'),
 Row(C0=u'1', C1=u'49038', C2=u'9')]

In [80]:
# Cast the DataFrame to enforce a schema with (from_user_id, to_user_id, rating)
ratings_DF = sqlCtx.sql("""
SELECT
    CAST(C0 as int) AS from_user_id,
    CAST(C1 as int) AS to_user_id,
    CAST(C2 as int) AS rating
FROM 
    ratings_raw_tbl
""")

In [81]:
# Create mllib.recommendation.Rating RDD from ratings DataFrame
ratings_RDD = ratings_DF.rdd.map(lambda r: Rating(r.from_user_id, r.to_user_id, r.rating))

ratings_RDD.take(5)

[Rating(user=1, product=12638, rating=4.0),
 Rating(user=1, product=22319, rating=10.0),
 Rating(user=1, product=35707, rating=1.0),
 Rating(user=1, product=48470, rating=8.0),
 Rating(user=1, product=58839, rating=7.0)]

In [82]:
# Separate ratings data into training data (80%) and test data (20%)
split_ratings_RDD = ratings_RDD.randomSplit([0.8, 0.2])
training_ratings_RDD = split_ratings_RDD[0]
test_ratings_RDD = split_ratings_RDD[1]

In [83]:
# Train the ALS model using the training data and various model hyperparameters
model = ALS.train(training_ratings_RDD, 1, 5, 0.01, 10)

In [84]:
# Convert known test data to have only (from, to)
test_from_to_RDD = test_ratings_RDD.map(lambda r: (r[0], r[1]))

In [85]:
# Test the model by predicting the ratings for the known test data
actual_predictions_RDD = model.predictAll(test_from_to_RDD)

actual_predictions_RDD.take(5)

[Rating(user=61292, product=108150, rating=10.277404629792272),
 Rating(user=33408, product=108150, rating=6.406982963487508),
 Rating(user=107380, product=108150, rating=10.062431847175276),
 Rating(user=129733, product=108150, rating=-4.845537426028841),
 Rating(user=85632, product=28730, rating=10.242428392288048)]

In [86]:
# Prepare the known test predictions and actual predictions for comparison keyed by (from, to)
actual_predictions_RDD = actual_predictions_RDD.map(lambda r: ((r[0], r[1]), r[2]))
test_predictions_RDD = test_ratings_RDD.map(lambda r: ((r[0], r[1]), r[2]))

In [87]:
# Join the known test predictions with the actual predictions
test_to_actual_ratings_RDD = test_predictions_RDD.join(actual_predictions_RDD)
test_to_actual_ratings_RDD.take(10)

[((8964, 187978), (1, 0.9634531397584283)),
 ((126106, 204378), (10, 0.041438381336472485)),
 ((125464, 192666), (1, 2.2574677964252032)),
 ((60642, 183146), (5, 4.823080857943978)),
 ((17237, 127791), (8, 8.4853084650872)),
 ((27972, 16204), (1, 3.2294395784073515)),
 ((105140, 175428), (1, 1.433956705499071)),
 ((63229, 149421), (5, 4.881046296534805)),
 ((68551, 165937), (3, 4.237967778511717)),
 ((59257, 117047), (1, 4.833109587359445))]

In [88]:
# Evaluate the model using Mean Absolute Error (MAE) between the known test ratings and the actual predictions 
mean_absolute_rating_error = test_to_actual_ratings_RDD.map(lambda r: abs(r[1][0] - r[1][1])).mean()

print mean_absolute_rating_error

2.12934055857
