In [1]:
import os
import sys

# 1. Lokasi di mana Spark diinstal
spark_path = "E:\spark"

# 2. Menentukan environment variable
os.environ['SPARK_HOME'] = spark_path


# 3. Download winutils dari https://github.com/steveloughran/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true
#   dan letakkan di dalam folder E:\spark\bin\
#   Lokasi winutils.exe
os.environ['HADOOP_HOME'] = spark_path

# 4. Lokasi Python yang dijalankan --> punya Anaconda
#    Apabila Python yang diinstall hanya Anaconda, maka tidak perlu menjalankan baris ini.
os.environ['PYSPARK_PYTHON'] = sys.executable

# 5. Konfigurasi path library PySpark
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf() \
    .setAppName("MovieLensALS") \
    .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

print sc

<pyspark.context.SparkContext object at 0x00000000047C9A90>


In [2]:
#!/usr/bin/env python

import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

def notHeader(row):
    return "movieId" not in row

def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")    
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parseRatingCsv(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split(",")  
    print fields
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split(",")
    return int(fields[0]), fields[1]

def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print "File %s does not exist." % ratingsFile
        sys.exit(1)     
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])    
    f.close()
    if not ratings:
        print "No ratings provided."
        sys.exit(1)
    else:
        return ratings

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

In [4]:
myRatings = loadRatings("../personalRatings.txt")
myRatingsRDD = sc.parallelize(myRatings, 1)

In [5]:
myRatings

[(0, 1, 5.0),
 (0, 780, 4.0),
 (0, 590, 5.0),
 (0, 1210, 4.0),
 (0, 648, 5.0),
 (0, 344, 5.0),
 (0, 165, 5.0),
 (0, 153, 5.0),
 (0, 597, 5.0),
 (0, 1580, 5.0),
 (0, 231, 5.0)]

In [6]:
movieLensHomeDir = "./data/"

In [7]:
ratings = sc.textFile(join(movieLensHomeDir, "ratings.csv")).filter(notHeader).map(parseRatingCsv)

In [9]:
movieLensHomeDir = "../"

In [12]:
ratings = sc.textFile(join(movieLensHomeDir, "ratings.csv")).filter(notHeader).map(parseRatingCsv)

In [13]:
ratings.collect()

[(4L, (1, 31, 2.5)),
 (9L, (1, 1029, 3.0)),
 (2L, (1, 1061, 3.0)),
 (5L, (1, 1129, 2.0)),
 (5L, (1, 1172, 4.0)),
 (1L, (1, 1263, 2.0)),
 (7L, (1, 1287, 2.0)),
 (8L, (1, 1293, 2.0)),
 (5L, (1, 1339, 3.5)),
 (1L, (1, 1343, 2.0)),
 (5L, (1, 1371, 2.5)),
 (3L, (1, 1405, 1.0)),
 (1L, (1, 1953, 4.0)),
 (9L, (1, 2105, 4.0)),
 (4L, (1, 2150, 3.0)),
 (8L, (1, 2193, 2.0)),
 (8L, (1, 2294, 2.0)),
 (3L, (1, 2455, 2.5)),
 (0L, (1, 2968, 1.0)),
 (7L, (1, 3671, 3.0)),
 (3L, (2, 10, 4.0)),
 (1L, (2, 17, 5.0)),
 (4L, (2, 39, 5.0)),
 (2L, (2, 47, 4.0)),
 (6L, (2, 50, 4.0)),
 (1L, (2, 52, 3.0)),
 (9L, (2, 62, 3.0)),
 (2L, (2, 110, 4.0)),
 (6L, (2, 144, 3.0)),
 (5L, (2, 150, 5.0)),
 (1L, (2, 153, 4.0)),
 (3L, (2, 161, 3.0)),
 (1L, (2, 165, 3.0)),
 (0L, (2, 168, 3.0)),
 (1L, (2, 185, 3.0)),
 (4L, (2, 186, 3.0)),
 (1L, (2, 208, 3.0)),
 (0L, (2, 222, 5.0)),
 (9L, (2, 223, 1.0)),
 (2L, (2, 225, 3.0)),
 (4L, (2, 235, 3.0)),
 (6L, (2, 248, 3.0)),
 (1L, (2, 253, 4.0)),
 (1L, (2, 261, 4.0)),
 (7L, (2, 265, 5.0)),

In [14]:
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.csv")).filter(notHeader).map(parseMovie).collect())

In [15]:
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()

print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)

Got 100004 ratings from 671 users on 9066 movies.


In [16]:
# split ratings into train (60%), validation (20%), and test (20%) based on the 
# last digit of the timestamp, add myRatings to train, and cache them

# training, validation, test are all RDDs of (userId, movieId, rating)

numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6) \
  .values() \
  .union(myRatingsRDD) \
  .repartition(numPartitions) \
  .cache()

validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
  .values() \
  .repartition(numPartitions) \
  .cache()

test = ratings.filter(lambda x: x[0] >= 8).values().cache()

numTraining = training.count()
numValidation = validation.count()
numTest = test.count()

print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)

Training: 59876, validation: 20285, test: 19854


In [17]:
# train models and evaluate them on the validation set

ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    model = ALS.train(training, rank, numIter, lmbda)
    validationRmse = computeRmse(model, validation, numValidation)
    print "RMSE (validation) = %f for the model trained with " % validationRmse + \
          "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter

testRmse = computeRmse(bestModel, test, numTest)

# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
  + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

RMSE (validation) = 0.927604 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.926817 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.621969 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.621969 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.923682 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.924697 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.621969 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.621969 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 12 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.934485.


In [18]:
# compare the best model with a naive baseline that always returns the mean rating
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."

The best model improves the baseline by 11.87%.


In [19]:
# make personalized recommendations

myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]

print "Movies recommended for you:"
for i in xrange(len(recommendations)):
    print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')

# clean up
sc.stop()

Movies recommended for you:
 1: Breaking the Waves (1996)
 2: "Adventures of Robin Hood
 3: My Best Friend (Mon meilleur ami) (2006)
 4: Lake of Fire (2006)
 5: Ben X (2007)
 6: Cyclo (Xich lo) (1995)
 7: Rory O'Shea Was Here (Inside I'm Dancing) (2004)
 8: Schindler's List (1993)
 9: Love & Human Remains (1993)
10: Out of Africa (1985)
11: Micmacs (Micmacs  tire-larigot) (2009)
12: "Salton Sea
13: The Jane Austen Book Club (2007)
14: "Grand Day Out with Wallace and Gromit
15: "Killing Fields
16: Amateur (1994)
17: Headhunters (Hodejegerne) (2011)
18: "Wings of Desire (Himmel ber Berlin
19: Law Abiding Citizen (2009)
20: "Shawshank Redemption
21: Coma (1978)
22: "Commitments
23: Rebecca (1940)
24: Persuasion (1995)
25: Salvador (1986)
26: Saw (2004)
27: "Timecrimes (Cronocrmenes
28: "Lion in Winter
29: It's Such a Beautiful Day (2012)
30: "Corporation
31: Captain Phillips (2013)
32: "Amazing Spider-Man
33: "Court Jester
34: Nine Queens (Nueve reinas) (2000)
35: American Beauty (1999)
3