In [1]:
import os
import sys

# 1. Mengeset variabel yang menyimpan lokasi di mana Spark diinstal
spark_path = "D:/spark"

# 2. Menentukan environment variable SPARK_HOME
os.environ['SPARK_HOME'] = spark_path

# 3. Simpan lokasi winutils.exe sebagai environment variable HADOOP_HOME
os.environ['HADOOP_HOME'] = spark_path

# 4. Lokasi Python yang dijalankan --> punya Anaconda
#    Apabila Python yang diinstall hanya Anaconda, maka tidak perlu menjalankan baris ini.
os.environ['PYSPARK_PYTHON'] = sys.executable

# 5. Konfigurasi path library PySpark
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

# 6. Import library Spark
#    Dua library yang WAJIB di-import adalah **SparkContext** dan **SparkConf**.
from pyspark import SparkContext
from pyspark import SparkConf

# Setting konfigurasi (opsional)
conf = SparkConf()
conf.set("spark.executor.memory", "2g")
conf.set("spark.cores.max", "4")

sc = SparkContext("local", conf=conf)
#    Apabila berhasil, maka ketika sc di-print akan mengeluarkan nilai <pyspark.context.SparkContext object>
print sc

<pyspark.context.SparkContext object at 0x000000000215E8D0>


In [2]:
#small dataset

In [4]:
path_small_data_ratings = "D:/spark/tugas6/ml-latest-small/ratings.csv"

In [5]:
small_data_ratings_raw = sc.textFile(path_small_data_ratings)
small_data_ratings_raw_header = small_data_ratings_raw.take(1)[0]

In [6]:
small_data_ratings = small_data_ratings_raw.filter(lambda line: line!=small_data_ratings_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [7]:
small_data_ratings_raw_header

u'userId,movieId,rating,timestamp'

In [8]:
small_data_ratings_raw

D:/spark/tugas6/ml-latest-small/ratings.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
small_data_ratings.take(10)

[(u'1', u'31', u'2.5'),
 (u'1', u'1029', u'3.0'),
 (u'1', u'1061', u'3.0'),
 (u'1', u'1129', u'2.0'),
 (u'1', u'1172', u'4.0'),
 (u'1', u'1263', u'2.0'),
 (u'1', u'1287', u'2.0'),
 (u'1', u'1293', u'2.0'),
 (u'1', u'1339', u'3.5'),
 (u'1', u'1343', u'2.0')]

In [10]:
small_data_ratings.count()

100004

In [11]:
path_small_data_movies = "D:/spark/tugas6/ml-latest-small/movies.csv"

In [12]:
small_data_movies_raw = sc.textFile(path_small_data_movies)
small_data_movies_raw_header = small_data_movies_raw.take(1)[0]

In [13]:
small_data_movies = small_data_movies_raw.filter(lambda line: line!=small_data_movies_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [14]:
small_data_movies_raw_header

u'movieId,title,genres'

In [15]:
small_data_movies.take(10)

[(u'1', u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy'),
 (u'2', u'Jumanji (1995)', u'Adventure|Children|Fantasy'),
 (u'3', u'Grumpier Old Men (1995)', u'Comedy|Romance'),
 (u'4', u'Waiting to Exhale (1995)', u'Comedy|Drama|Romance'),
 (u'5', u'Father of the Bride Part II (1995)', u'Comedy'),
 (u'6', u'Heat (1995)', u'Action|Crime|Thriller'),
 (u'7', u'Sabrina (1995)', u'Comedy|Romance'),
 (u'8', u'Tom and Huck (1995)', u'Adventure|Children'),
 (u'9', u'Sudden Death (1995)', u'Action'),
 (u'10', u'GoldenEye (1995)', u'Action|Adventure|Thriller')]

In [16]:
small_data_movies.count()

9125

In [17]:
training_RDD, validation_RDD, test_RDD = small_data_ratings.randomSplit([6, 2, 2], seed=0L)

validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [18]:
numRatings = small_data_ratings.distinct().count()
print numRatings

100004


In [19]:
numMovies = small_data_movies.distinct().count()
print numMovies

9125


In [20]:
from pyspark.mllib.recommendation import ALS
import math

In [21]:
seed = 5L
iterations = 5
lambda_ = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02
min_error = float('inf')
best_rank = -1
best_iteration = -1

tmp_rank=0

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=lambda_)
    
    #validation_for_predict_RDD
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    
    print 'For rank %s the RMSE is %s' % (rank, error)
    
    if error < min_error:
        min_error = error
        best_rank = rank
        
    tmp_rank += 1

print 'The best model was trained with rank %s' % best_rank
print 'Total rank is %s' % tmp_rank

For rank 4 the RMSE is 0.96150916576
For rank 8 the RMSE is 0.959599041459
For rank 12 the RMSE is 0.957360347451
The best model was trained with rank 12
Total rank is 3


In [22]:
predictions.take(10)

[((624, 69069), 2.3666732230837337),
 ((628, 5618), 3.2874442736428686),
 ((81, 5618), 3.7382701151292754),
 ((243, 5618), 4.074323695835438),
 ((430, 5618), 4.428618002537369),
 ((362, 5618), 3.990383362546674),
 ((433, 5618), 4.210760990457246),
 ((297, 5618), 4.046031139647107),
 ((472, 5618), 3.757293429682568),
 ((519, 5618), 3.8912150055491455)]

In [23]:
predictions.count()

18960

In [24]:
rates_and_preds.take(10)

[((63, 2987), (4.5, 3.051393838552299)),
 ((353, 4387), (2.5, 2.7024183231163934)),
 ((472, 2014), (3.0, 2.987208870448867)),
 ((590, 196), (3.0, 2.692934504563828)),
 ((102, 1278), (4.0, 4.250010202147024)),
 ((294, 7444), (4.0, 3.1951936414256457)),
 ((235, 1307), (4.0, 4.3792490616046775)),
 ((48, 27156), (4.0, 3.9230501374488265)),
 ((596, 2596), (4.0, 4.153753127865525)),
 ((153, 3825), (3.0, 2.7480362340396454))]

In [25]:
rates_and_preds.count()

18960

In [27]:
print 'Best Rank is %s' % best_rank

model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lambda_)

predictions_best_rank = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds_best_rank = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error_best_rank = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error_best_rank)

Best Rank is 12
For testing data the RMSE is 0.957360347451


In [28]:
#complete dataset

In [29]:
path_all_data_ratings = "D:/spark/tugas6/ml-latest-all/ml-latest-all/ratings.csv"

In [30]:
all_data_ratings_raw = sc.textFile(path_all_data_ratings)
all_data_ratings_raw_header = all_data_ratings_raw.take(1)[0]

In [31]:
all_data_ratings = all_data_ratings_raw.filter(lambda line: line!=all_data_ratings_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

In [34]:
training_RDD, test_RDD = all_data_ratings.randomSplit([7, 3], seed=0L)

all_model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lambda_)

In [35]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions_all = all_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds_all = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions_all)
error_all = math.sqrt(rates_and_preds_all.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error_all)

For testing data the RMSE is 0.822510589854


In [36]:
path_all_data_movies = "D:/spark/tugas6/ml-latest-all/ml-latest-all/movies.csv"

In [37]:
all_data_movies_raw = sc.textFile(path_all_data_movies)
all_data_movies_raw_header = all_data_movies_raw.take(1)[0]

In [38]:
all_movies_data = all_data_movies_raw.filter(lambda line: line!=all_data_movies_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

In [39]:
complete_movies_titles = all_movies_data.map(lambda x: (int(x[0]),x[1]))

In [52]:
complete_movies_titles.count()

40110

In [40]:
all_movies_data.count()

40110

In [53]:
def counts_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (all_data_ratings.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(counts_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [54]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [(0,260,4), (0,1,3), (0,16,3), (0,25,4), (0,32,4), (0,335,1), (0,379,1), (0,296,3), (0,858,5), (0,50,4)]

new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [55]:
complete_data_with_new_ratings_RDD = small_data_ratings.union(new_user_ratings_RDD)

In [56]:
#new trainning ALS
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lambda_)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

New model trained in 3.817 seconds


In [57]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)

new_user_unrated_movies_RDD = (all_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [59]:
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)

new_user_recommendations_rating_title_and_count_RDD.take(10)

[(122892, ((2.595476967155233, u'Avengers: Age of Ultron (2015)'), 4342)),
 (22, ((2.301671824366193, u'Copycat (1995)'), 10935)),
 (8228, ((4.0195123637449175, u'"Maltese Falcon'), 3015)),
 (44, ((2.7412436458672476, u'Mortal Kombat (1995)'), 12519)),
 (66,
  ((1.4265093712753907, u'Lawnmower Man 2: Beyond Cyberspace (1996)'), 2557)),
 (88, ((2.4743486278488405, u'Black Sheep (1996)'), 5725)),
 (110, ((3.2933233553848513, u'Braveheart (1995)'), 63920)),
 (73854, ((2.8523662738829483, u'"Rudolph'), 555)),
 (49280, ((2.411532489664756, u'Bobby (2006)'), 495)),
 (114818, ((1.6096496427137414, u'Stretch (2014)'), 112))]

In [60]:
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [87]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=10).takeOrdered(10, key=lambda x: -x[1])

dom = map(str,[str(result_x) for result_x in top_movies])
# print dom

print ('TOP TEN (10) recommended movies:\n')
p=0
no=1
for i in range(0, len(top_movies)):
    print no, '%s\n' % dom[p]
    
    no+=1
    p+=1

TOP TEN (10) recommended movies:

1 (u'"Play House', 5.290253158302269, 32)

2 (u'"Goat', 5.290253158302269, 27)

3 (u'Land of Silence and Darkness (Land des Schweigens und der Dunkelheit) (1971)', 5.290253158302269, 37)

4 (u'Cops (1922)', 5.290253158302269, 41)

5 (u'Gates of Heaven (1978)', 4.761227928658874, 182)

6 (u'"Cameraman', 4.761227928658874, 187)

7 (u"Zorn's Lemma (1970)", 4.761227928658874, 12)

8 (u'"Last Laugh', 4.761227928658874, 283)

9 (u'Our Hospitality (1923)', 4.761227928658874, 229)

10 (u'"Navigator', 4.761227928658874, 225)



In [89]:
new_user_recommendations_rating_title_and_count_RDD.count()

9056