## Spark
Research paper [here](https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf)

## Start PySpark

In [1]:
from pyspark.sql import functions as F

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1569250248755_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load the data
Our data is a series of [parquet files](https://acadgild.com/blog/parquet-file-format-hadoop#targetText=Parquet%20File%20Format%20Hadoop&targetText=Parquet%2C%20an%20open%20source%20file,terms%20of%20storage%20and%20performance), hosted on AWS as a [sample large data set](https://s3.amazonaws.com/amazon-reviews-pds/readme.html). We'll look at the books data set, though others are available. Spark provides easy helper methods to read and manipulate parquet files

In [2]:
input_bucket = 's3://amazon-reviews-pds'
input_path = '/parquet/product_category=Books/*.parquet'
df = spark.read.parquet(input_bucket + input_path)
df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|year|
+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+
|         US|   15444933|R1WWG70WK9VUCH|1848192576|     835940987|Standing Qigong f...|          5|            9|         10|   N|                Y|Informative AND i...|After attending a...| 2015-05-02|2015|
|         US|   20595117|R1EQ3POS0RIOD5|145162445X|     574044348|A Universe from N...|          4|            4|          7|   N|                N|Between 'Nothing'...

## Exploratory Data Analysis - Vanilla PySpark

When reading Parquet files, Spark automatically creates dataframes that can be used by SparkSQL. However, it can be useful to do some operations in vanilla PySpark. so they provide access to the underlying RDDs

In [12]:
rdd = df.rdd.map(list)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
rdd.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['US', '15444933', 'R1WWG70WK9VUCH', '1848192576', '835940987', 'Standing Qigong for Health and Martial Arts - Zhan Zhuang', 5, 9, 10, 'N', 'Y', 'Informative AND interesting!', "After attending a few Qigong classes, I wanted to have a book to read and re-read the instructions so I could practice at home.  I also wanted to gain more of an understanding of the purpose and benefit of the movements in order to practice them with a more focused purpose.<br /><br />The book exceeded my expectations.  The explanations are very clear and are paired with photos showing the correct form.  The book itself is more than just the Qigong, it's a very interesting read.  I read the whole book in two days and will read it again. I rarely read books twice!  The book has provided the information and additional instruction that I was looking for. I even use the breathing exercise to de-stress in traffic and fall asleep at night.  It really works!  I bought the book for my sister also and she's started pra

### Data cleaning and exploration

In [16]:
# What if we want only the review column?

review_column_id = 12
words = rdd.map(lambda row: row[review_column_id])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
words.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

["After attending a few Qigong classes, I wanted to have a book to read and re-read the instructions so I could practice at home.  I also wanted to gain more of an understanding of the purpose and benefit of the movements in order to practice them with a more focused purpose.<br /><br />The book exceeded my expectations.  The explanations are very clear and are paired with photos showing the correct form.  The book itself is more than just the Qigong, it's a very interesting read.  I read the whole book in two days and will read it again. I rarely read books twice!  The book has provided the information and additional instruction that I was looking for. I even use the breathing exercise to de-stress in traffic and fall asleep at night.  It really works!  I bought the book for my sister also and she's started practicing Standing Qigong and loves it.", "Krauss traces the remarkable transformation in cosmological understanding which has come with the Einstein- Hubble era. He explains how 

In [14]:
# Many of the lines above contain extra formatting 
# or characters that we don't want to include in 
# our word count. Here's one way we can remove them
text_to_remove = [
    "<br />",
    "&#34;",
    "&#34;",
    "\\\\",
]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
def clean_tokenize_and_split(s):
    if not s:
        return "<NULL>"
    for blob in text_to_remove:
        s = s.replace(blob, "")

    return s.split(" ")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
counts = words.flatMap(lambda line: clean_tokenize_and_split(line)) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

In [18]:
counts.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('', 64567505), ('like', 5751664), ('coins', 9438), ('it.', 2569090), ('falling', 98941)]

In [19]:
sorted_counts = counts.sortBy(lambda a: -a[1])
sorted_counts.take(40)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('the', 115835153), ('and', 74723703), ('of', 66665319), ('to', 65989760), ('', 64567505), ('a', 60859609), ('I', 41977853), ('is', 40851310), ('in', 36804630), ('that', 27917422), ('this', 24689246), ('for', 23326197), ('book', 23261508), ('it', 20948044), ('with', 17987221), ('was', 17030788), ('as', 15705347), ('are', 13861889), ('on', 13570515), ('you', 13551482), ('The', 13124487), ('have', 12477752), ('not', 11801839), ('but', 11790180), ('be', 11104859), ('his', 9917198), ('an', 8890318), ('from', 8677100), ('her', 8601030), ('my', 8579786), ('about', 8303782), ('by', 8294626), ('This', 7936805), ('read', 7833063), ('has', 7627995), ('at', 7499965), ('or', 7442488), ('he', 7430108), ('one', 7373912), ('all', 7272781)]

## Exploratory Data Analysis - Spark SQL
SparkSQL can be a useful way to exploring data sets, especially if you want to communicate it to a broader audience. It provides a SQL like interface on top of RDDs.

In [3]:
# Let's see what our rows look like. Note that the table 
# schema is inferred automatically from the parquet files
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(marketplace='US', customer_id='15444933', review_id='R1WWG70WK9VUCH', product_id='1848192576', product_parent='835940987', product_title='Standing Qigong for Health and Martial Arts - Zhan Zhuang', star_rating=5, helpful_votes=9, total_votes=10, vine='N', verified_purchase='Y', review_headline='Informative AND interesting!', review_body="After attending a few Qigong classes, I wanted to have a book to read and re-read the instructions so I could practice at home.  I also wanted to gain more of an understanding of the purpose and benefit of the movements in order to practice them with a more focused purpose.<br /><br />The book exceeded my expectations.  The explanations are very clear and are paired with photos showing the correct form.  The book itself is more than just the Qigong, it's a very interesting read.  I read the whole book in two days and will read it again. I rarely read books twice!  The book has provided the information and additional instruction that I was looking f

In [None]:
# Our initial data set is very large. We can start to think about how we
# would want to filter the data set. Many of the reviews products receive are
# low, so we can likely remove those from consideration

query = """SELECT count(*) as c
FROM ratings
WHERE star_rating < 4.0
Order by c desc"""

# this line registers a "table" with spark sql
df.createOrReplaceTempView("ratings")

low_ratings = spark.sql(query)
low_ratings.show()

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [4]:
# When we make a recommendation, we want to suggest highly rated content
# What if we want to only suggest products where the average rating is 
# 4.8 and there are at least 20 reviews?

query = """SELECT product_title, AVG(star_rating), count(*) as c
FROM ratings
GROUP BY product_title
HAVING AVG(star_rating) > 4.8 and count(*) > 20
Order by c desc"""

highly_rated = spark.sql(query)
highly_rated.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+----+
|       product_title| avg(star_rating)|   c|
+--------------------+-----------------+----+
|Jesus Calling: En...|4.860709592641261|5327|
|The Legend of Zel...|4.842859988050189|5021|
|  Humans of New York|4.823193444251304|4027|
|Oh, the Places Yo...|4.860881174899866|3745|
|Being Mortal: Med...|4.818432453721938|2539|
|A Higher Call: An...|4.840779853777417|2462|
|Fearless: The Und...|4.839542760372566|2362|
|Enchanted Forest:...|4.815551537070524|2212|
|The Liberty Amend...|4.803883495145631|2060|
|Diary of a Wimpy ...|4.839126117179742|2014|
|With the Old Bree...|4.895336787564767|1930|
|Rush Revere and t...| 4.88272921108742|1876|
|Goodnight, Goodni...| 4.90027397260274|1825|
|The Walking Dead:...|4.816969696969697|1650|
|   Room on the Broom|4.876988984088127|1634|
|Harry Potter And ...|4.817396668723011|1621|
|Dr. Seuss's Begin...|4.848258706467662|1608|
|Little Blue Truck...|4.903486924034869|1606|
|The Jesus Storybo...|4.8028696194

In [5]:
highly_rated.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

19867

In [6]:
# Product names can be non-unique - there's likely 
# many different names for the same product. However,
# product_ids are unique. Let's work with those moving forward

query = """SELECT product_id, AVG(star_rating), count(*) as c
FROM ratings
GROUP BY product_id
HAVING AVG(star_rating) > 4.8 and count(*) > 20
Order by c desc"""

highly_rated_ids = spark.sql(query)
highly_rated_ids.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+----+
|product_id|  avg(star_rating)|   c|
+----------+------------------+----+
|1616550414| 4.843502377179081|5048|
|1591451884| 4.862152848165416|4933|
|1250038820| 4.823119083208769|4014|
|0679805273| 4.861286919831223|3792|
|0375869026| 4.812842852533075|3099|
|0805095152| 4.816022544283414|2484|
|1780674880| 4.815529622980251|2228|
|141971189X|             4.843|2000|
|1451606273| 4.805413687436159|1958|
|1476755884|  4.88272921108742|1876|
|0811877825|  4.90027397260274|1825|
|0425252868| 4.847946725860155|1802|
|0740748475|4.8270238788584745|1717|
|0375851569| 4.847528290649196|1679|
|1607060760| 4.816969696969697|1650|
|0439136350| 4.820273631840796|1608|
|0547248288| 4.903486924034869|1606|
|0061906220| 4.929122807017544|1425|
|0375413405| 4.822730521801287|1399|
|0988667908| 4.873107426099495|1387|
+----------+------------------+----+
only showing top 20 rows

In [7]:
highly_rated_ids.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

19965

In [8]:
# transform the data from a dataframe to a normal python list
ids = highly_rated_ids.rdd.map(lambda r: r.product_id).collect()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
len(ids)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

19965

In [10]:
type(ids)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'list'>

In [11]:
# broadcast_ids = sc.broadcast(ids)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Filter dataset by ids

In [20]:
# sample data cleaning
# filtered_rdd = rdd.filter(lambda x: x[3] in broadcast_ids)
filtered_rdd = rdd.filter(lambda x: x[3] in ids)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
filtered_rdd.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['US', '44704475', 'R36X05VUTXCEJQ', '0307730697', '62706984', 'Fearless: The Undaunted Courage and Ultimate Sacrifice of Navy SEAL Team SIX Operator Adam Brown', 4, 0, 0, 'N', 'Y', 'American courage', "For those who think life is hard and they can't do what they'd like to do or don't believe that they'll ever acccomplish anything, this book is for them. Even without a military background, everyone can appreciate the tenacity and intestinal fortitude that is exhibited. It would seem that a young man with his early failures would never achieve much of anything, but his belief in God and his tremendous will to succeed overcame almost insurmountable obstacles to reach his goal.", datetime.date(2012, 6, 29), 2012], ['US', '17099985', 'R2VZUPBM5AGRKA', '0393089622', '311014023', 'Target Tokyo: Jimmy Doolittle and the Raid That Avenged Pearl Harbor', 5, 1, 1, 'N', 'Y', 'Well written piece documenting a significant in histort', "One of the absolute best documentaries, with the accompanying p

## Recommendation

We're going to take what we've built so far and build a simple recommender. We'll start with a [collaborative filtering](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html) based approach, though you should feel free to try others

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

import re


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
# What's going on here?
# Product and User IDs are not always numeric - sometimes they include letters too
# ALS expects numeric indices for products and users
# We make the following simplification, though it is far from perfect
# What are the problems with this? What should we do instead?

re.sub('[^0-9]','','abcd12352342342342343')[-8:]

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [24]:
customer_id = 1
product_id = 3
rating_id = 6

ratingsRDD = filtered_rdd.map(lambda p: Row(userId=int(re.sub('[^0-9]','', p[customer_id])[-7:]) , productID=int(re.sub('[^0-9]','', p[product_id])[-7:]),
                                     rating=float(p[rating_id])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="productID", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 product recommendations for each user
# userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
# movieRecs = model.recommendForAllItems(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Invalid status code '400' from https://172.31.7.138:18888/sessions/1/statements/25 with error payload: {"msg":"requirement failed: Session isn't active."}


In [None]:
# userRecs.head()

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [None]:
# movieRecs.head()

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [None]:
# Generate top 3 book recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(users, 3)


# Generate top 3 user recommendations for a specified set of books
# books = ratings.select(als.getItemCol()).distinct().limit(3)
# booksSubSetRecs = model.recommendForItemSubset(books, 10)

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [None]:
print(userSubsetRecs.take(2))

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


## Spark Streaming
See some great sample examples [here](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/) and [here](https://github.com/jleetutorial/python-spark-streaming)

In [None]:
from pyspark.streaming import StreamingContext

sc = spark.sparkContext
ssc = StreamingContext(sc, 1)



print(sc.appName)

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [None]:
stream = ssc.textFileStream('s3n://pg-streaming-example/*')
stream.map(lambda v: json.loads(v[1]))
stream.pprint()

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


In [None]:
ssc.start()

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.7.138:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}


## Saving and loading
See example code in the [docs](https://spark.apache.org/docs/2.3.0/mllib-collaborative-filtering.html)

## Discussion Questions
- What steps are necessary in productionizing the model
- Model predictions are very slow. What ways can we improve the performance of predictions?
- Does it make sense to precompute the predictions for a given user?
- What do we do with new users?
- Why use spark vs kinesis/cloud data flow