In [1]:
import os
import sys

# 1. Mengeset variabel yang menyimpan lokasi di mana Spark diinstal
spark_path = "H:/spark"

# 2. Menentukan environment variable SPARK_HOME
os.environ['SPARK_HOME'] = spark_path

# 3. Simpan lokasi winutils.exe sebagai environment variable HADOOP_HOME
os.environ['HADOOP_HOME'] = spark_path

# 4. Lokasi Python yang dijalankan --> punya Anaconda
#    Apabila Python yang diinstall hanya Anaconda, maka tidak perlu menjalankan baris ini.
os.environ['PYSPARK_PYTHON'] = sys.executable

# 5. Konfigurasi path library PySpark
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

# 6. Import library Spark
#    Dua library yang WAJIB di-import adalah **SparkContext** dan **SparkConf**.
from pyspark import SparkContext
from pyspark import SparkConf

# Setting konfigurasi (opsional)
conf = SparkConf()
conf.set("spark.executor.memory", "2g")
conf.set("spark.cores.max", "4")

sc = SparkContext("local", conf=conf)
#    Apabila berhasil, maka ketika sc di-print akan mengeluarkan nilai <pyspark.context.SparkContext object>
print (sc)

<pyspark.context.SparkContext object at 0x000001ED0C38A828>


In [2]:
path_small_data_ratings = "H:/spark/tugas-6-bigdata/ml-latest-small/ratings.csv"

In [4]:
small_data_ratings_raw = sc.textFile(path_small_data_ratings)
small_data_ratings_raw_header = small_data_ratings_raw.take(1)[0]

In [5]:
small_data_ratings = small_data_ratings_raw.filter(lambda line: line!=small_data_ratings_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [6]:
small_data_ratings_raw_header

'userId,movieId,rating,timestamp'

In [7]:
small_data_ratings_raw

H:/spark/tugas-6-bigdata/ml-latest-small/ratings.csv MapPartitionsRDD[4] at textFile at <unknown>:0

In [8]:
small_data_ratings.take(10)

[('1', '31', '2.5'),
 ('1', '1029', '3.0'),
 ('1', '1061', '3.0'),
 ('1', '1129', '2.0'),
 ('1', '1172', '4.0'),
 ('1', '1263', '2.0'),
 ('1', '1287', '2.0'),
 ('1', '1293', '2.0'),
 ('1', '1339', '3.5'),
 ('1', '1343', '2.0')]

In [9]:
small_data_ratings.count()

100004

In [10]:
path_small_data_movies = "H:/spark/tugas-6-bigdata/ml-latest-small/movies.csv"

In [11]:
small_data_movies_raw = sc.textFile(path_small_data_movies)
small_data_movies_raw_header = small_data_movies_raw.take(1)[0]

In [12]:
small_data_movies = small_data_movies_raw.filter(lambda line: line!=small_data_movies_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [13]:
small_data_movies_raw_header

'movieId,title,genres'

In [14]:
small_data_movies.take(15)

[('1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 ('2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 ('3', 'Grumpier Old Men (1995)', 'Comedy|Romance'),
 ('4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance'),
 ('5', 'Father of the Bride Part II (1995)', 'Comedy'),
 ('6', 'Heat (1995)', 'Action|Crime|Thriller'),
 ('7', 'Sabrina (1995)', 'Comedy|Romance'),
 ('8', 'Tom and Huck (1995)', 'Adventure|Children'),
 ('9', 'Sudden Death (1995)', 'Action'),
 ('10', 'GoldenEye (1995)', 'Action|Adventure|Thriller'),
 ('11', '"American President', ' The (1995)"'),
 ('12', 'Dracula: Dead and Loving It (1995)', 'Comedy|Horror'),
 ('13', 'Balto (1995)', 'Adventure|Animation|Children'),
 ('14', 'Nixon (1995)', 'Drama'),
 ('15', 'Cutthroat Island (1995)', 'Action|Adventure|Romance')]

In [15]:
small_data_movies.count()

9125

In [16]:
training_RDD, validation_RDD, test_RDD = small_data_ratings.randomSplit([6, 2, 2], seed=0)

validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [17]:
numRatings = small_data_ratings.distinct().count()
print (numRatings)

100004


In [18]:
numMovies = small_data_movies.distinct().count()
print (numMovies)

9125


In [19]:
from pyspark.mllib.recommendation import ALS
import math

In [20]:
seed = 5
iterations = 5
lambda_ = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02
min_error = float('inf')
best_rank = -1
best_iteration = -1

tmp_rank=0

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=lambda_)
    
    #validation_for_predict_RDD
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    
    print ("For rank %s the RMSE is %s" % (rank, error))
    
    if error < min_error:
        min_error = error
        best_rank = rank
        
    tmp_rank += 1

print ("The best model was trained with rank %s" % best_rank)
print ("Total rank is %s" % tmp_rank)

For rank 4 the RMSE is 0.9467276356834381
For rank 8 the RMSE is 0.948850568384866
For rank 12 the RMSE is 0.9470522227861955
The best model was trained with rank 4
Total rank is 3


In [21]:
predictions.take(10)

[((390, 667), 3.6298103088435294),
 ((48, 44828), 0.35637136257127566),
 ((428, 5618), 4.430292353094585),
 ((450, 5618), 4.567901218327973),
 ((429, 5618), 1.2551725562710176),
 ((664, 5618), 4.112759026391471),
 ((397, 5618), 4.1376866926941736),
 ((430, 5618), 4.844480482206358),
 ((73, 5618), 4.042735515184425),
 ((406, 5618), 3.865122428612649)]

In [22]:
rates_and_preds.take(10)

[((1, 1061), (3.0, 2.5641919094461265)),
 ((1, 1129), (2.0, 2.44966942915915)),
 ((1, 1371), (2.5, 1.9937138050408498)),
 ((1, 1953), (4.0, 3.382927786980675)),
 ((2, 52), (3.0, 3.465618061823249)),
 ((2, 144), (3.0, 3.307158969229371)),
 ((2, 370), (2.0, 2.8206703133557394)),
 ((2, 382), (3.0, 2.787520973977276)),
 ((2, 550), (3.0, 3.1737969760165683)),
 ((2, 720), (4.0, 3.8117535046759072))]

In [23]:
rates_and_preds.count()

18877

In [24]:
print ("Best Rank is %s" % best_rank)

model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lambda_)

predictions_best_rank = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds_best_rank = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error_best_rank = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ("For testing data the RMSE is %s" % (error_best_rank))

Best Rank is 4
For testing data the RMSE is 0.9470522227861955


In [25]:
path_all_data_ratings = "H:/spark/tugas-6-bigdata/ml-latest-all/ratings.csv"

In [26]:
all_data_ratings_raw = sc.textFile(path_all_data_ratings)
all_data_ratings_raw_header = all_data_ratings_raw.take(1)[0]

In [27]:
all_data_ratings = all_data_ratings_raw.filter(lambda line: line!=all_data_ratings_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

In [28]:
training_RDD, test_RDD = all_data_ratings.randomSplit([7, 3], seed=0)

all_model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lambda_)

In [29]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions_all = all_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds_all = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions_all)
error_all = math.sqrt(rates_and_preds_all.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ("For testing data the RMSE is %s" % (error_all))

For testing data the RMSE is 0.8353251300966041


In [32]:
path_all_data_movies = "H:/spark/tugas-6-bigdata/ml-latest-all/movies.csv"

In [33]:
all_data_movies_raw = sc.textFile(path_all_data_movies)
all_data_movies_raw_header = all_data_movies_raw.take(1)[0]

In [34]:
all_movies_data = all_data_movies_raw.filter(lambda line: line!=all_data_movies_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

In [35]:
complete_movies_titles = all_movies_data.map(lambda x: (int(x[0]),x[1]))

In [36]:
complete_movies_titles.count()

40110

In [37]:
all_movies_data.count()

40110

In [38]:
def counts_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (all_data_ratings.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(counts_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [39]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [(0,260,4), (0,1,3), (0,16,3), (0,25,4), (0,32,4), (0,335,1), (0,379,1), (0,296,3), (0,858,5), (0,50,4)]

new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ("New user ratings: %s" % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [40]:
complete_data_with_new_ratings_RDD = small_data_ratings.union(new_user_ratings_RDD)

In [41]:
#new trainning ALS
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lambda_)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 7.54 seconds


In [42]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)

new_user_unrated_movies_RDD = (all_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [43]:
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)

new_user_recommendations_rating_title_and_count_RDD.take(10)

[(138204, ((2.228243072597687, '7 Days in Hell (2015)'), 97)),
 (5874, ((1.605582397858254, 'Friday After Next (2002)'), 444)),
 (48268, ((3.7163477948685513, 'Empire Falls (2005)'), 29)),
 (65230, ((1.469555228369149, 'Marley & Me (2008)'), 1504)),
 (2640, ((2.4898372718269703, 'Superman (1978)'), 16903)),
 (6446, ((2.5502693956504086, '"Comancheros'), 126)),
 (81158, ((2.1625232510538206, 'Restrepo (2010)'), 398)),
 (2134, ((2.262294179484031, 'Weird Science (1985)'), 6604)),
 (2354, ((2.390420884508978, '"Rugrats Movie'), 1479)),
 (1078, ((3.7702241434521655, 'Bananas (1971)'), 3819))]

In [44]:
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [47]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=10).takeOrdered(10, key=lambda x: -x[1])

dom = list(map(str,[str(result_x) for result_x in top_movies]))
# print dom

print ("TOP TEN (10) recommended movies:\n")
p=0
no=1
for i in range(0, len(top_movies)):
    print (no, "%s\n" % dom[p])
    
    no+=1
    p+=1

TOP TEN (10) recommended movies:

1 ('"Vampyros Lesbos (Vampiras', 5.599213491210403, 130)

2 ('Dylan Moran: Monster (2004)', 5.599213491210403, 107)

3 ('"Play House', 5.352672287404715, 32)

4 ('"Goat', 5.352672287404715, 27)

5 ('Land of Silence and Darkness (Land des Schweigens und der Dunkelheit) (1971)', 5.352672287404715, 37)

6 ('Cops (1922)', 5.352672287404715, 41)

7 ("Dead Man's Shoes (2004)", 5.319174228669035, 517)

8 ('Excision (2012)', 5.303345348721313, 98)

9 ('Gates of Heaven (1978)', 4.8174051036647185, 182)

10 ('"Cameraman', 4.8174051036647185, 187)



In [48]:
new_user_recommendations_rating_title_and_count_RDD.count()

9065