### Data Processing Pipeline - Spark EMR Cluster, with Livy as the REST interface to interact with spark clusters 

We will look at book ratings dataset from book crossing community. The dataset consists of ratings from over 100k book lovers.  More details on this dataset can be found at its [`Book-Crossing Dataset`](http://www2.informatik.uni-freiburg.de/~cziegler/BX/).


The two source datasets can be placed in s3 bucket (s3://<bucket-name>/<folder>)

-  BX-Book-Ratings.csv
-  BX-Books.csv

There are a total of 340,215 books and 1,149,780 book ratings 

In [1]:
%%info

In [1]:
s3_bucket = 's3://ai-in-aws/'
output_prefix = 'object2vec/bookratings'

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1549126534611_0009,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


#### Read data from s3

In [2]:
from pyspark.sql.window import Window 
from pyspark.sql import functions as F

VBox()

In [3]:
ratings = spark.read.option("header","true").option("quote", "\"").option("delimiter", ";").csv("s3://ai-in-aws/awsglue-datasets/BX-Book-Ratings.csv")

#s3://<bucket-name>/<folder>

VBox()

In [14]:
ratings.count()

VBox()

1149780

In [4]:
#Ensure ISBN is cleaned
ratings = ratings.withColumn("ISBN", F.lower(F.trim(F.col("ISBN"))))

VBox()

In [5]:
ratings.show(10)

VBox()

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104x|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615x|          3|
| 276729|0521795028|          6|
| 276733|2080674722|          0|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
| 276744|038550120x|          7|
| 276745| 342310538|         10|
+-------+----------+-----------+
only showing top 10 rows

#### Address the long tail problem in the dataset

In [6]:
def value_counts(df, colName):
    return (df.groupby(colName).count()
              .orderBy('count', ascending=False))

VBox()

In [7]:
# Number of ratings per user
# Let's pick users who have rated atleast 13 books
users = value_counts(ratings, 'User-ID')
users.approxQuantile('count', [0.0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.96, 0.97, 0.98, 0.99, 1.0], 0.01)
#approxQuantile(col, probabilities, relativeError)

VBox()

[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 4.0, 14.0, 40.0, 40.0, 141.0, 141.0, 13602.0, 13602.0]

In [8]:
# Number of ratings per book
# Let's pick books that have been rated by atleast 6 users

books = value_counts(ratings, 'ISBN')
books.approxQuantile('count', [0.0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.96, 0.97, 0.98, 0.99, 1.0], 0.01)

VBox()

[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 6.0, 9.0, 12.0, 16.0, 26.0, 2502.0, 2502.0]

In [9]:
# Filter ratings by selecting books that have been rated by at least 6 users and users who have rated at least 13 books
fil_users = users.filter(F.col("count") >= 13)
fil_books = books.filter(F.col("count") >= 6)

VBox()

In [10]:
#Number of books meeting the threshold; 34780
fil_books.count()

VBox()

34780

In [11]:
#Number of users meeting the threshold
fil_users.count()

VBox()

10468

#### Obtain book title information

In [12]:
#Read books csv to load book title
books_csv = spark.read.option("header","true").option("quote", "\"").option("delimiter", ";").csv("s3://ai-in-aws/awsglue-datasets/BX-Books.csv")

VBox()

In [13]:
#Explore the first few records of the books_csv dataframe
books_csv.show(5)

VBox()

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari

In [15]:
#Clean up the ISBN column
books_csv = books_csv.withColumn("ISBN", F.lower(F.trim(F.col("ISBN"))))
#Drop columns that are not of interest
books_csv = books_csv.select('ISBN', 'Book-Title')

VBox()

In [16]:
books_csv.show(5)

VBox()

+----------+--------------------+
|      ISBN|          Book-Title|
+----------+--------------------+
|0195153448| Classical Mythology|
|0002005018|        Clara Callan|
|0060973129|Decision in Normandy|
|0374157065|Flu: The Story of...|
|0393045218|The Mummies of Ur...|
+----------+--------------------+
only showing top 5 rows

In [17]:
#34780
fil_books = fil_books.join(books_csv, on=['ISBN'], how='inner')\
                    .select(F.col("ISBN"),
                       F.col("count"),
                       F.col("Book-Title")
                       )

VBox()

In [18]:
#Let's inspect the number of books with title
fil_books.count()

#32, 978 relevant books have titles

VBox()

32978

#### Select relevant book ratings

In [19]:
# Create filtered ratings containing user and book indexes, along with rating
fil_ratings = ratings.join(fil_users, on=['User-ID'], how='inner').join(fil_books, on=['ISBN'], how='inner')\
                .select(F.col("ISBN"),
                       F.col("User-ID").alias("UserID"),
                       F.col("Book-Rating").alias("BookRating"),
                       F.col("Book-Title").alias("BookTitle")
                     )

VBox()

In [20]:
fil_ratings.show()
#fil_ratings.count()
#520,330 ratings

VBox()

+----------+------+----------+--------------------+
|      ISBN|UserID|BookRating|           BookTitle|
+----------+------+----------+--------------------+
|0060160772|131046|         0|Peace, Love and H...|
|0060160772|136205|         0|Peace, Love and H...|
|0060160772|170227|         0|Peace, Love and H...|
|0060160772|242445|         0|Peace, Love and H...|
|0060160772| 36836|         0|Peace, Love and H...|
|0060160772|230522|         0|Peace, Love and H...|
|0060160772|275922|        10|Peace, Love and H...|
|0060160772|128835|        10|Peace, Love and H...|
|0060160772|124363|         0|Peace, Love and H...|
|0060176059|142524|         8|Tales of Burning ...|
|0060176059| 36836|         0|Tales of Burning ...|
|0060176059|265205|         5|Tales of Burning ...|
|0060176059|108005|         0|Tales of Burning ...|
|0060176059| 35857|         0|Tales of Burning ...|
|0060176059|107324|         7|Tales of Burning ...|
|0060557818|234359|         7|Neverwhere : A Novel|
|0060557818|

In [21]:
fil_ratings.select('ISBN').distinct().count()
#s_df.select('k').distinct()

VBox()

32745

#### Create integer indexes for users and books to develop a recommender system

In [22]:
#Create indexes for books and users

#Determine unique users and books from ratings
uniq_users = value_counts(fil_ratings, 'UserID')
uniq_books = value_counts(fil_ratings, 'ISBN')


w1 = Window.orderBy("UserID") 
uniq_users = uniq_users.withColumn("user_ind", F.row_number().over(w1)-1)

w2 = Window.orderBy("ISBN") 
uniq_books = uniq_books.withColumn("book_ind", F.row_number().over(w2)-1)

VBox()

In [23]:
#Check the indexes created
row1 = uniq_books.agg({"book_ind": "max"}).collect()[0]
print(row1)

VBox()

Row(max(book_ind)=32744)

In [24]:
row2 = uniq_users.agg({"user_ind": "max"}).collect()[0]
print(row2)

VBox()

Row(max(user_ind)=10298)

In [25]:
# Create filtered ratings containing user and book indexes, along with rating
upd_fil_ratings = fil_ratings.join(uniq_users, on=['UserID'], how='inner').join(uniq_books, on=['ISBN'], how='inner')\
                .select(F.col("ISBN"),
                       F.col("UserID"),
                       F.col("BookRating"),
                       F.col("BookTitle"), 
                       F.col("book_ind"),
                       F.col("user_ind"))

VBox()

In [26]:
upd_fil_ratings.count()
#520,330 ratings

VBox()

520330

In [None]:
#books - 340,215; users - 105,283; #Ratings - 1,149,780
#filtered: books - 32744; users - 10298; ratings - 520,330 
#Some eligible books may not be part of the ratings dataset if the users who rated them have not rated at least 13 books and viceversa)


In [30]:
#Inspect the new filtered ratings dataset
upd_fil_ratings.show(5)

VBox()

+----------+------+----------+--------------------+--------+--------+
|      ISBN|UserID|BookRating|           BookTitle|book_ind|user_ind|
+----------+------+----------+--------------------+--------+--------+
|0060160772|131046|         0|Peace, Love and H...|     280|    1203|
|0060160772|136205|         0|Peace, Love and H...|     280|    1424|
|0060160772|170227|         0|Peace, Love and H...|     280|    2779|
|0060160772|242445|         0|Peace, Love and H...|     280|    5751|
|0060160772| 36836|         0|Peace, Love and H...|     280|    7672|
+----------+------+----------+--------------------+--------+--------+
only showing top 5 rows

In [31]:
print(upd_fil_ratings.agg({"user_ind": "max"}).collect()[0])

VBox()

Row(max(user_ind)=10298)

In [33]:
print(upd_fil_ratings.agg({"book_ind": "max"}).collect()[0])

VBox()

Row(max(book_ind)=32744)

#### Save the final book ratings information to s3 bucket

In [34]:
output_loc = s3_bucket + output_prefix
upd_fil_ratings.write.parquet(output_loc+"/bookratings.parquet", mode='overwrite')

VBox()