In [2]:
from pyspark import SparkContext
from pyspark import SparkConf
from operator import add
from pyspark import SparkContext,SQLContext
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS

In [3]:
conf = SparkConf().setAppName("book_recommendation-server")
sc = SparkContext(conf=conf)

In [11]:
books_raw_RDD = sc.textFile('main/BX-CSV-Dump/BX-Books.csv')

In [16]:
books_raw_RDD.take(3)

['"ISBN";"Book-Title";"Book-Author";"Year-Of-Publication";"Publisher";"Image-URL-S";"Image-URL-M";"Image-URL-L"',
 '"0195153448";"Classical Mythology";"Mark P. O. Morford";"2002";"Oxford University Press";"http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg";"http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg";"http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg"',
 '"0002005018";"Clara Callan";"Richard Bruce Wright";"2001";"HarperFlamingo Canada";"http://images.amazon.com/images/P/0002005018.01.THUMBZZZ.jpg";"http://images.amazon.com/images/P/0002005018.01.MZZZZZZZ.jpg";"http://images.amazon.com/images/P/0002005018.01.LZZZZZZZ.jpg"']

In [17]:
books_raw_data_header = books_raw_RDD.take(1)

In [28]:
books_RDD = books_raw_RDD.filter(lambda line: line != books_raw_data_header)\
                            .map(lambda line: line.split(';'))\
                            .map(lambda tokens: (abs(hash(tokens[0][1:-1])) % (10 ** 8), tokens[1][1:-1], tokens[2][1:-1], tokens[3][1:-1], tokens[4][1:-1], tokens[5][1:-1]))\
                            .cache()

In [29]:
books_RDD.take(1)

[(58397390,
  'Book-Title',
  'Book-Author',
  'Year-Of-Publication',
  'Publisher',
  'Image-URL-S')]

In [30]:
books_titles_RDD = books_RDD.map(lambda x : (int(x[0]), x[1], x[2], x[3], x[4], x[5])).cache()

In [31]:
books_titles_RDD.take(1)

[(58397390,
  'Book-Title',
  'Book-Author',
  'Year-Of-Publication',
  'Publisher',
  'Image-URL-S')]

In [32]:
ratings_raw_RDD = sc.textFile('main/BX-CSV-Dump/BX-Book-Ratings.csv')

In [33]:
ratings_raw_data_header = ratings_raw_RDD.take(1)[0]

In [34]:
ratings_raw_data_header

'"User-ID";"ISBN";"Book-Rating"'

In [35]:
ratings_RDD = ratings_raw_RDD.filter(lambda line : line != ratings_raw_data_header) \
                                .map(lambda line : line.split(';'))\
                                .map(lambda tokens: (int(tokens[0][1:-1]), hash(tokens[1][1:-1]) % 10 ** 8, int(tokens[2][1:-1])))

In [36]:
ratings_RDD.take(10)

[(276725, 68701106, 0),
 (276726, 46426205, 5),
 (276727, 90758053, 0),
 (276729, 44987036, 3),
 (276729, 78989836, 6),
 (276733, 30122732, 0),
 (276736, 64479825, 8),
 (276737, 43822043, 6),
 (276744, 83371955, 7),
 (276745, 78660650, 10)]

In [37]:
book_ID_with_ratings_RDD = ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey()

In [39]:
def get_counts_and_average(ID_and_ratings_tuple):
    n = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (n, float(sum(x for x in ID_and_ratings_tuple[1]))/ n)

In [40]:
book_ID_with_avg_ratings_RDD = book_ID_with_ratings_RDD.map(get_counts_and_average)

In [41]:
book_ID_with_avg_ratings_RDD.take(10)

[(68701106, (60, 2.933333333333333)),
 (44987036, (1, 3.0)),
 (78989836, (1, 6.0)),
 (30122732, (3, 3.6666666666666665)),
 (78660650, (2, 5.0)),
 (82154322, (134, 2.529850746268657)),
 (65782350, (137, 2.153284671532847)),
 (98900142, (7, 3.2857142857142856)),
 (80851704, (66, 3.6363636363636362)),
 (88265094, (22, 1.8181818181818181))]

In [42]:
book_ratings_count_RDD = book_ID_with_avg_ratings_RDD.map(lambda x : (x[0], x[1][0]))

In [43]:
book_ratings_count_RDD.take(2)

[(68701106, 60), (44987036, 1)]

In [44]:
ratings_RDD.take(2)

[(276725, 68701106, 0), (276726, 46426205, 5)]

In [45]:
rank = 16
seed = 5
iterations = 10
regularization_parameter = 0.1

In [46]:
model = ALS.train(ratings_RDD, rank=rank, seed=seed, iterations=iterations, lambda_=regularization_parameter )

In [47]:
user_unrated_books_RDD = ratings_RDD.filter(lambda x : x[0] != 276725).map(lambda x : (276725, x[1])).distinct()

In [48]:
user_unrated_books_RDD.take(9)

[(276725, 46426205),
 (276725, 90758053),
 (276725, 64479825),
 (276725, 43822043),
 (276725, 83371955),
 (276725, 52414159),
 (276725, 82551089),
 (276725, 53423433),
 (276725, 64841065)]

In [49]:
predicted_RDD = model.predictAll(user_unrated_books_RDD)

In [50]:
len(predicted_RDD.take(2))

2

In [51]:
predicted_rating_RDD = predicted_RDD.map(lambda x: (x.product, x.rating))

In [52]:
predicted_rating_RDD.takeOrdered(5, key=lambda x: -x[1])

[(65259168, 0.0),
 (81442256, 0.0),
 (15026744, 0.0),
 (52640784, 0.0),
 (475832, 0.0)]

In [53]:
sqlContext=SQLContext(sc)

In [65]:
ratings_df=sqlContext.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/books_recommend")\
  .option("dbtable","main_rating").option("user","root").option("password","password").load()

In [66]:
ratings_df.show()

+---+-------+----------+------+
| id|user_id|      isbn|rating|
+---+-------+----------+------+
|  1| 276725|034545104X|     0|
|  2| 276726|0155061224|     5|
|  3| 276727|0446520802|     0|
|  4| 276729|052165615X|     3|
|  5| 276729|0521795028|     6|
|  6| 276733|2080674722|     0|
|  7| 276736|3257224281|     8|
|  8| 276737|0600570967|     6|
|  9| 276744|038550120X|     7|
| 10| 276745| 342310538|    10|
| 11| 276746|0425115801|     0|
| 12| 276746|0449006522|     0|
| 13| 276746|0553561618|     0|
| 14| 276746|055356451X|     0|
| 15| 276746|0786013990|     0|
| 16| 276746|0786014512|     0|
| 17| 276747|0060517794|     9|
| 18| 276747|0451192001|     0|
| 19| 276747|0609801279|     0|
| 20| 276747|0671537458|     9|
+---+-------+----------+------+
only showing top 20 rows



In [67]:
ratings_raw_rdd = ratings_df.rdd

In [81]:
ratings_rdd  = ratings_raw_rdd.map(lambda x: (x.user_id, hash(x.isbn) % 10 ** 8, x.rating))

In [82]:
ratings_rdd.count()

2299560

In [97]:
model = ALS.train(ratings_rdd, rank=rank, seed=seed, iterations=iterations, lambda_=regularization_parameter )

In [96]:
model.recommendProducts(300,10)

[Rating(user=300, product=89045893, rating=11.57677750938257),
 Rating(user=300, product=70170468, rating=10.574515524477663),
 Rating(user=300, product=56959835, rating=10.50381284428478),
 Rating(user=300, product=74798910, rating=10.45654691773706),
 Rating(user=300, product=92609290, rating=10.187099639969338),
 Rating(user=300, product=51433389, rating=10.187099639969338),
 Rating(user=300, product=90393308, rating=10.187099639969338),
 Rating(user=300, product=61144460, rating=10.187099639969338),
 Rating(user=300, product=47539547, rating=10.187099639969338),
 Rating(user=300, product=41840529, rating=10.187099639969338)]