In [28]:
import pyspark
import random
import findspark

findspark.init()
sc = pyspark.SparkContext.getOrCreate()

In [29]:
import pandas as pd

from pyspark.sql import SQLContext, SparkSession
sparkSession = SparkSession.builder.config('spark.sql.pivotMaxValues', 600000).getOrCreate()

In [30]:
books = sparkSession.read.csv('./BX-Books.csv', sep=';', header=True)
users = sparkSession.read.csv('./BX-Users.csv', sep=';', header=True)
books_ratings = sparkSession.read.csv('./BX-Book-Ratings.csv', sep=';', header=True)

In [31]:
books_ratings.show(5)

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
+-------+----------+-----------+
only showing top 5 rows



In [32]:
type(books)

pyspark.sql.dataframe.DataFrame

In [33]:
pd.DataFrame(books.take(5))

Unnamed: 0,0,1,2,3,4,5,6,7
0,195153448,Classical Mythology,Mark P. O. Morford,2002,Oxford University Press,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...
1,2005018,Clara Callan,Richard Bruce Wright,2001,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...
2,60973129,Decision in Normandy,Carlo D'Este,1991,HarperPerennial,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...
3,374157065,Flu: The Story of the Great Influenza Pandemic...,Gina Bari Kolata,1999,Farrar Straus Giroux,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...
4,393045218,The Mummies of Urumchi,E. J. W. Barber,1999,W. W. Norton &amp; Company,http://images.amazon.com/images/P/0393045218.0...,http://images.amazon.com/images/P/0393045218.0...,http://images.amazon.com/images/P/0393045218.0...


In [34]:
top_rated_books = books_ratings.groupBy('ISBN').count().orderBy('count', ascending=False)
top_rated_books.show(5)

+----------+-----+
|      ISBN|count|
+----------+-----+
|0971880107| 2502|
|0316666343| 1295|
|0385504209|  883|
|0060928336|  732|
|0312195516|  723|
+----------+-----+
only showing top 5 rows



In [35]:
pd.DataFrame(books.filter('ISBN in ("0971880107","0316666343", "0385504209", "0060928336", "0312195516")').take(5))

Unnamed: 0,0,1,2,3,4,5,6,7
0,971880107,Wild Animus,Rich Shapero,2004,Too Far,http://images.amazon.com/images/P/0971880107.0...,http://images.amazon.com/images/P/0971880107.0...,http://images.amazon.com/images/P/0971880107.0...
1,316666343,The Lovely Bones: A Novel,Alice Sebold,2002,"Little, Brown",http://images.amazon.com/images/P/0316666343.0...,http://images.amazon.com/images/P/0316666343.0...,http://images.amazon.com/images/P/0316666343.0...
2,312195516,The Red Tent (Bestselling Backlist),Anita Diamant,1998,Picador USA,http://images.amazon.com/images/P/0312195516.0...,http://images.amazon.com/images/P/0312195516.0...,http://images.amazon.com/images/P/0312195516.0...
3,385504209,The Da Vinci Code,Dan Brown,2003,Doubleday,http://images.amazon.com/images/P/0385504209.0...,http://images.amazon.com/images/P/0385504209.0...,http://images.amazon.com/images/P/0385504209.0...
4,60928336,Divine Secrets of the Ya-Ya Sisterhood: A Novel,Rebecca Wells,1997,Perennial,http://images.amazon.com/images/P/0060928336.0...,http://images.amazon.com/images/P/0060928336.0...,http://images.amazon.com/images/P/0060928336.0...


In [36]:
from pyspark.sql import functions as F

average_rating = books_ratings.groupBy('ISBN').agg(F.mean('book-rating'), F.count('book-rating')).orderBy('count(book-rating)', ascending=False).show(5)

+----------+------------------+------------------+
|      ISBN|  avg(book-rating)|count(book-rating)|
+----------+------------------+------------------+
|0971880107|1.0195843325339728|              2502|
|0316666343| 4.468725868725869|              1295|
|0385504209| 4.652321630804077|               883|
|0060928336| 3.448087431693989|               732|
|0312195516| 4.334716459197787|               723|
+----------+------------------+------------------+
only showing top 5 rows



In [37]:
user_activity = books_ratings.groupBy('User-ID').agg(F.count('User-ID').alias('user_activity'))
books_ratings = books_ratings.join(user_activity, 'User-ID', 'left').filter('user_activity >= 200')

books_popularity = books_ratings.groupBy('ISBN').agg(F.count('ISBN').alias('books_activity'))
books_ratings = books_ratings.join(books_popularity, 'ISBN', 'left').filter('books_activity >= 100')

In [38]:
books_ratings.show(5)

+----------+-------+-----------+-------------+--------------+
|      ISBN|User-ID|Book-Rating|user_activity|books_activity|
+----------+-------+-----------+-------------+--------------+
|044021145X| 104665|          7|          453|           204|
|044021145X| 230249|          0|          271|           204|
|044021145X|  32773|          0|          745|           204|
|044021145X| 131046|          0|          941|           204|
|044021145X| 148744|         10|         1550|           204|
+----------+-------+-----------+-------------+--------------+
only showing top 5 rows



In [39]:
books_ratings.count()

13793

In [40]:
from pyspark.sql.types import DoubleType

books_ratings = books_ratings.withColumn('Book-Rating', books_ratings["Book-Rating"].cast(DoubleType()))

In [None]:
books_ratings.cache()

In [41]:
ratings_pivot = books_ratings.groupby('User-ID').pivot('ISBN').max('Book-Rating')

In [42]:
ratings_pivot.select('0316666343').show()

+----------+
|0316666343|
+----------+
|       0.0|
|      null|
|      null|
|      10.0|
|      null|
|       8.0|
|      null|
|      null|
|      null|
|      null|
|       0.0|
|      null|
|      null|
|      null|
|      null|
|      null|
|       0.0|
|       0.0|
|      null|
|      null|
+----------+
only showing top 20 rows



In [43]:
from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors

rdd = ratings_pivot.rdd.map(lambda data: Vectors.dense([float(c) for c in data]))
particular_book_rdd = ratings_pivot.select('0316666343').rdd.map(lambda data: Vectors.dense([float(c) for c in data]))


In [44]:
particular_book_rdd.dtypes

AttributeError: 'PipelinedRDD' object has no attribute 'dtypes'

In [None]:
correlation = Statistics.corr(ratings_pivot.rdd, y=ratings_pivot.select('0316666343').rdd)

In [None]:
pd.DataFrame(correlation)

In [None]:
ratings_pivot.foreach(lambda x: print(x))