In [None]:
from pyspark import SparkContext
from pyspark import SparkConf
from operator import add
from pyspark import SparkContext,SQLContext
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS

In [8]:
conf = SparkConf().setAppName("book_recommendation-server")
sc = SparkContext(conf=conf)

In [18]:
books_raw_RDD = sc.textFile('main/BX-CSV-Dump/BX-Books.csv')

In [21]:
type(books_raw_RDD)

pyspark.rdd.RDD

In [229]:
books_raw_data_header

'"ISBN";"Book-Title";"Book-Author";"Year-Of-Publication";"Publisher";"Image-URL-S";"Image-URL-M";"Image-URL-L"'

In [230]:
books_RDD = books_raw_RDD.filter(lambda line: line != ratings_raw_data_header)\
                            .map(lambda line: line.split(';'))\
                            .map(lambda tokens: (abs(hash(tokens[0][1:-1])) % (10 ** 8), tokens[1][1:-1], tokens[2][1:-1], tokens[3][1:-1], tokens[4][1:-1], tokens[5][1:-1]))\
                            .cache()

In [231]:
books_RDD.take(1)

[(58397390,
  'Book-Title',
  'Book-Author',
  'Year-Of-Publication',
  'Publisher',
  'Image-URL-S')]

In [232]:
books_titles_RDD = books_RDD.map(lambda x : (int(x[0]), x[1], x[2], x[3], x[4], x[5])).cache()

In [233]:
books_titles_RDD.take(1)

[(58397390,
  'Book-Title',
  'Book-Author',
  'Year-Of-Publication',
  'Publisher',
  'Image-URL-S')]

In [234]:
ratings_raw_RDD = sc.textFile('main/BX-CSV-Dump/BX-Book-Ratings.csv')

In [235]:
ratings_raw_data_header = ratings_raw_RDD.take(1)[0]

In [236]:
ratings_raw_data_header

'"User-ID";"ISBN";"Book-Rating"'

In [237]:
ratings_RDD = ratings_raw_RDD.filter(lambda line : line != ratings_raw_data_header) \
                                .map(lambda line : line.split(';'))\
                                .map(lambda tokens: (int(tokens[0][1:-1]), hash(tokens[1][1:-1]) % 10 ** 8, int(tokens[2][1:-1])))

In [238]:
ratings_RDD.take(10)

[(276725, 68701106, 0),
 (276726, 46426205, 5),
 (276727, 90758053, 0),
 (276729, 44987036, 3),
 (276729, 78989836, 6),
 (276733, 30122732, 0),
 (276736, 64479825, 8),
 (276737, 43822043, 6),
 (276744, 83371955, 7),
 (276745, 78660650, 10)]

In [239]:
book_ID_with_ratings_RDD = ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey()

In [240]:
for x in book_ID_with_ratings_RDD.take(2)[0][1]:
    print(x)

0
5
0
5
9
0
0
0
0
9
0
0
0
8
8
0
0
5
9
6
5
0
7
5
7
0
0
0
0
0
0
0
0
0
0
0
5
0
0
4
0
0
6
9
10
3
0
0
0
3
3
9
8
6
5
3
7
0
0
7


In [241]:
def get_counts_and_average(ID_and_ratings_tuple):
    n = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (n, float(sum(x for x in ID_and_ratings_tuple[1]))/ n)

In [242]:
book_ID_with_avg_ratings_RDD = book_ID_with_ratings_RDD.map(get_counts_and_average)

In [243]:
book_ID_with_avg_ratings_RDD.take(10)

[(68701106, (60, 2.933333333333333)),
 (44987036, (1, 3.0)),
 (78989836, (1, 6.0)),
 (30122732, (3, 3.6666666666666665)),
 (78660650, (2, 5.0)),
 (82154322, (134, 2.529850746268657)),
 (65782350, (137, 2.153284671532847)),
 (98900142, (7, 3.2857142857142856)),
 (80851704, (66, 3.6363636363636362)),
 (88265094, (22, 1.8181818181818181))]

In [244]:
book_ratings_count_RDD = book_ID_with_avg_ratings_RDD.map(lambda x : (x[0], x[1][0]))

In [245]:
book_ratings_count_RDD.take(2)

[(68701106, 60), (44987036, 1)]

In [246]:
ratings_RDD.take(2)

[(276725, 68701106, 0), (276726, 46426205, 5)]

In [247]:
rank = 16
seed = 5
iterations = 10
regularization_parameter = 0.1

In [258]:
model = ALS.train(ratings_RDD, rank=rank, seed=seed, iterations=iterations, lambda_=regularization_parameter )

In [249]:
model.take(1)

AttributeError: 'MatrixFactorizationModel' object has no attribute 'take'

In [272]:
user_unrated_books_RDD = ratings_RDD.filter(lambda x : x[0] != 276725).map(lambda x : (276725, x[1])).distinct()

In [17]:
user_unrated_books_RDD.take(9)

NameError: name 'user_unrated_books_RDD' is not defined

In [274]:
predicted_RDD = model.predictAll(user_unrated_books_RDD)

In [275]:
len(predicted_RDD.take(2))

2

In [276]:
predicted_rating_RDD = predicted_RDD.map(lambda x: (x.product, x.rating))

In [284]:
predicted_rating_RDD.takeOrdered(5, key=lambda x: -x[1])

[(65259168, 0.0),
 (81442256, 0.0),
 (15026744, 0.0),
 (52640784, 0.0),
 (475832, 0.0)]

In [10]:
sqlContext=SQLContext(sc)

In [11]:
mydf001=sqlContext.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/books_recommend")\
  .option("dbtable","main_book").option("user","root").option("password","password").load()

In [23]:
mydf001.show()

+---+--------------------+--------------------+--------------+--------------------+--------------------+----------+
| id|               title|              author|published_year|           publisher|           image_url|      isbn|
+---+--------------------+--------------------+--------------+--------------------+--------------------+----------+
|  1| Classical Mythology|  Mark P. O. Morford|          2002|Oxford University...|http://images.ama...|0195153448|
|  2|        Clara Callan|Richard Bruce Wright|          2001|HarperFlamingo Ca...|http://images.ama...|0002005018|
|  3|Decision in Normandy|        Carlo D'Este|          1991|     HarperPerennial|http://images.ama...|0060973129|
|  4|Flu: The Story of...|    Gina Bari Kolata|          1999|Farrar Straus Giroux|http://images.ama...|0374157065|
|  5|The Mummies of Ur...|     E. J. W. Barber|          1999|W. W. Norton &amp...|http://images.ama...|0393045218|
|  6|The Kitchen God's...|             Amy Tan|          1991|    Putnam

In [24]:
type(mydf001)

pyspark.sql.dataframe.DataFrame

In [25]:
sqlContext

<pyspark.sql.context.SQLContext at 0x19b9fb279e8>

In [31]:
rd = mydf001.rdd

In [43]:
(rd.take(10)[0])

Row(id=1, title='Classical Mythology', author='Mark P. O. Morford', published_year='2002', publisher='Oxford University Press', image_url='http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg', isbn='0195153448')

In [53]:
rd = mydf001.rdd.map(lambda x: (x.id, x.title, x.published_year, x.isbn))

In [72]:
rd.count()

271379

In [54]:
rd

PythonRDD[30] at RDD at PythonRDD.scala:52

In [73]:
rd.take(5)

[(1, 'Classical Mythology', '2002', '0195153448'),
 (2, 'Clara Callan', '2001', '0002005018'),
 (3, 'Decision in Normandy', '1991', '0060973129'),
 (4,
  'Flu: The Story of the Great Influenza Pandemic of 1918 and the Search for the Virus That Caused It',
  '1999',
  '0374157065'),
 (5, 'The Mummies of Urumchi', '1999', '0393045218')]