<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Objectives" data-toc-modified-id="Objectives-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Objectives</a></span></li><li><span><a href="#Spark:-Getting-Started" data-toc-modified-id="Spark:-Getting-Started-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Spark: Getting Started</a></span><ul class="toc-item"><li><span><a href="#Optional-Step-0:-Prerequisites-&amp;-Installation-for-Databricks-or-Colab" data-toc-modified-id="Optional-Step-0:-Prerequisites-&amp;-Installation-for-Databricks-or-Colab-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Optional Step 0: Prerequisites &amp; Installation for Databricks or Colab</a></span><ul class="toc-item"><li><span><a href="#Databricks-Setup" data-toc-modified-id="Databricks-Setup-2.1.1"><span class="toc-item-num">2.1.1&nbsp;&nbsp;</span>Databricks Setup</a></span></li></ul></li><li><span><a href="#Step-1:-Create-a-SparkSession-with-a-SparkContext" data-toc-modified-id="Step-1:-Create-a-SparkSession-with-a-SparkContext-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>Step 1: Create a SparkSession with a SparkContext</a></span></li><li><span><a href="#Step-2:-Download-some-Amazon-reviews-(Toys-&amp;-Games)" data-toc-modified-id="Step-2:-Download-some-Amazon-reviews-(Toys-&amp;-Games)-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>Step 2: Download some Amazon reviews (Toys &amp; Games)</a></span><ul class="toc-item"><li><span><a href="#Optional:-For-Databricks-Setup" data-toc-modified-id="Optional:-For-Databricks-Setup-2.3.1"><span class="toc-item-num">2.3.1&nbsp;&nbsp;</span>Optional: For Databricks Setup</a></span></li></ul></li><li><span><a href="#Step-3:-Create-a-Spark-DataFrame" data-toc-modified-id="Step-3:-Create-a-Spark-DataFrame-2.4"><span class="toc-item-num">2.4&nbsp;&nbsp;</span>Step 3: Create a Spark DataFrame</a></span></li><li><span><a href="#Exploring-the-DataFrame" data-toc-modified-id="Exploring-the-DataFrame-2.5"><span class="toc-item-num">2.5&nbsp;&nbsp;</span>Exploring the DataFrame</a></span><ul class="toc-item"><li><span><a href="#Count-the-Words-in-the-First-Row" data-toc-modified-id="Count-the-Words-in-the-First-Row-2.5.1"><span class="toc-item-num">2.5.1&nbsp;&nbsp;</span>Count the Words in the First Row</a></span></li><li><span><a href="#A-Few-More-Basic-Commands" data-toc-modified-id="A-Few-More-Basic-Commands-2.5.2"><span class="toc-item-num">2.5.2&nbsp;&nbsp;</span>A Few More Basic Commands</a></span></li></ul></li><li><span><a href="#Reading-files" data-toc-modified-id="Reading-files-2.6"><span class="toc-item-num">2.6&nbsp;&nbsp;</span>Reading files</a></span></li></ul></li></ul></div>

<a href="https://colab.research.google.com/github/flatiron-school/ds-spark/blob/main/spark-programming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Run for Google Colab environment
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 63.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=6eb19199995e628b8a25358e9f1fe921890e5e6bdbc6b42bd3590146d5c961d8
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-m

In [2]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Objectives

- Use `pyspark` to manipulate data

# Spark: Getting Started

## Optional Step 0: Prerequisites & Installation for Databricks or Local Run

> If you run this notebook in Google Colab (clicking the button at the beginning of this notebook that says "*Open in Colab*") you can skip to [Step 1](#Step-1:-Create-a-SparkSession-with-a-SparkContext)

### Databricks Setup

Follow [these instructions](https://docs.databricks.com/notebooks/notebooks-manage.html#import-a-notebook) to import this notebook into Databricks

## Step 1: Create a SparkSession with a SparkContext

In [3]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [4]:
spark

In [5]:
sc

## Step 2: Download some Amazon reviews (Toys & Games)

In [6]:
# Get data directly from repo
!wget https://github.com/flatiron-school/ds-spark/releases/download/v1.0/reviews_Toys_and_Games_5.json.gz

--2021-11-05 18:24:18--  https://github.com/flatiron-school/ds-spark/releases/download/v1.0/reviews_Toys_and_Games_5.json.gz
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github-releases.githubusercontent.com/379727666/13773e80-d431-11eb-8290-22c32da354e0?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20211105%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20211105T182418Z&X-Amz-Expires=300&X-Amz-Signature=1b811a692e0aa1b44cfbea537cf364b64ee38773db7e8361a481246f45d7df53&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=379727666&response-content-disposition=attachment%3B%20filename%3Dreviews_Toys_and_Games_5.json.gz&response-content-type=application%2Foctet-stream [following]
--2021-11-05 18:24:18--  https://github-releases.githubusercontent.com/379727666/13773e80-d431-11eb-8290-22c32da354e0?X-Amz-Algorithm=AWS4-HMAC-SHA2

### Optional: For Databricks Setup

Follow [these instructions](https://docs.databricks.com/data/data.html#import-data-1) to import `reviews_Toys_and_Games_5.json` into Databricks

## Step 3: Create a Spark DataFrame

In [7]:
# this file path will be different if you are running Spark locally
df = spark.read.json('reviews_Toys_and_Games_5.json.gz')

In [8]:
df.persist()

DataFrame[asin: string, helpful: array<bigint>, overall: double, reviewText: string, reviewTime: string, reviewerID: string, reviewerName: string, summary: string, unixReviewTime: bigint]

*Presist stores data locally in memory. We need this because if you want to work with the dataframe, you need to store it locally in your memory. Spark is all about memory (Hadoop is about Disk/HDD/SDD)*

*For Colab, it is usually the Google servers in the cloud.*


*Aiden asked "Maybe .persist() is storing the data in RAM vs storage?" <br>
Nick asked "Local memory meaning RAM, correct?" Norm said "Yes"*


This last command, `.persist()`, simply stores the DataFrame in memory. See [this page](https://unraveldata.com/to-cache-or-not-to-cache/). It is similar to `.cache()`, but actually more flexible than the latter since you can specify which storage level you want. See [here](https://stackoverflow.com/questions/26870537/what-is-the-difference-between-cache-and-persist).

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

*Not a pandas dataframe, but a pyspark dataframe.*

In [10]:
df.show(5) # default of 20 lines

+----------+-------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|  reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|0439893577| [0, 0]|    5.0|I like the item p...|01 29, 2014|A1VXOAVRGKGEAK|         Angie|      Magnetic board|    1390953600|
|0439893577| [1, 1]|    4.0|Love the magnet e...|03 28, 2014| A8R62G708TSCM|       Candace|it works pretty g...|    1395964800|
|0439893577| [1, 1]|    5.0|Both sides are ma...|01 28, 2013|A21KH420DK0ICA|capemaychristy|          love this!|    1359331200|
|0439893577| [0, 0]|    5.0|Bought one a few ...| 02 8, 2014| AR29QK6HPFYZ4|          dcrm|   Daughters love it|    1391817600|
|0439893577| [1, 1]|    4.0|I have a stainles...| 05 5, 2014| ACCH8EOML6FN5|          DoyZ|Great to have

In [11]:
pdf = df.limit(5).toPandas()
pdf

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,439893577,"[0, 0]",5.0,I like the item pricing. My granddaughter want...,"01 29, 2014",A1VXOAVRGKGEAK,Angie,Magnetic board,1390953600
1,439893577,"[1, 1]",4.0,Love the magnet easel... great for moving to d...,"03 28, 2014",A8R62G708TSCM,Candace,it works pretty good for moving to different a...,1395964800
2,439893577,"[1, 1]",5.0,Both sides are magnetic. A real plus when you...,"01 28, 2013",A21KH420DK0ICA,capemaychristy,love this!,1359331200
3,439893577,"[0, 0]",5.0,Bought one a few years ago for my daughter and...,"02 8, 2014",AR29QK6HPFYZ4,dcrm,Daughters love it,1391817600
4,439893577,"[1, 1]",4.0,I have a stainless steel refrigerator therefor...,"05 5, 2014",ACCH8EOML6FN5,DoyZ,Great to have so he can play with his alphabet...,1399248000


*This is We are only limiting it to the first 5 rows (i.e. .head())*

J*eff asked "also, can you load the entire dataset into a pandas DF or will that break something?"*

In [12]:
type(pdf)

pandas.core.frame.DataFrame

In [13]:
df.count()

167597

In [14]:
df.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime']

In [15]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



The 'nullable = true' bit means that the relevant column tolerates null values.

In [16]:
df.describe().show()

+-------+--------------------+------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|summary|                asin|           overall|          reviewText|reviewTime|          reviewerID|        reviewerName|             summary|      unixReviewTime|
+-------+--------------------+------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|  count|              167597|            167597|              167597|    167597|              167597|              166759|              167597|              167597|
|   mean|2.1290876613696043E9| 4.356307093802395|                null|      null|                null|                 NaN|               600.0|1.3487585072095563E9|
| stddev|2.1305790004603045E9|0.9935012992131987|                null|      null|                null|                 NaN|                null|6.0895808450016156E7|
|   

In [17]:
df.describe('overall').show()

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            167597|
|   mean| 4.356307093802395|
| stddev|0.9935012992131987|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



In [18]:
reviews_df = df[['asin', 'overall']]

In [19]:
reviews_df.show()

+----------+-------+
|      asin|overall|
+----------+-------+
|0439893577|    5.0|
|0439893577|    4.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    4.0|
|0439893577|    3.0|
|0439893577|    3.0|
|0439893577|    5.0|
|0439893577|    4.0|
|0439893577|    3.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    3.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    5.0|
|0439893577|    5.0|
|048645195X|    5.0|
|048645195X|    4.0|
|048645195X|    5.0|
+----------+-------+
only showing top 20 rows



*We can also define functions*

In [20]:
def show(df, n=5):
    return df.limit(n).toPandas()

In [21]:
show(reviews_df)

Unnamed: 0,asin,overall
0,439893577,5.0
1,439893577,4.0
2,439893577,5.0
3,439893577,5.0
4,439893577,4.0


In [23]:
#Code for Alex Marshall's question
reviews_df.describe('overall').limit(5).show()

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            167597|
|   mean| 4.356307093802395|
| stddev|0.9935012992131987|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



In [27]:
#Code for Alex Marshall's question
reviews_df.describe('overall').show(5)

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            167597|
|   mean| 4.356307093802395|
| stddev|0.9935012992131987|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



In [24]:
reviews_df.count()

167597

In [25]:
sorted_review_df = reviews_df.sort('overall')

In [26]:
show(sorted_review_df)

Unnamed: 0,asin,overall
0,786955708,1.0
1,976990709,1.0
2,963679600,1.0
3,786955708,1.0
4,974665207,1.0


In [28]:
#This is the pyspark equivalent of pandas .value_count()
counts = reviews_df.agg(F.countDistinct('overall'))

In [29]:
#5 unique columns in our reviews_df Dataframe
counts.show()

+--------------+
|count(overall)|
+--------------+
|             5|
+--------------+



In [31]:
#simple query to get information from an sql database
query = """
SELECT overall, COUNT(*)
FROM reviews
GROUP BY overall
ORDER BY overall
"""

In [32]:
#you can see the docstring by hovering your arrow over the function
reviews_df.createOrReplaceTempView('reviews')

*Andrew said "I think createOrReplaceTempView is saying, "let's take review_df and call it "reviews", so now you can do a sql query on this"*

In [33]:
output = spark.sql(query)

In [34]:
show(output)

Unnamed: 0,overall,count(1)
0,1.0,4707
1,2.0,6298
2,3.0,16357
3,4.0,37445
4,5.0,102790


In [35]:
output.collect()

[Row(overall=1.0, count(1)=4707),
 Row(overall=2.0, count(1)=6298),
 Row(overall=3.0, count(1)=16357),
 Row(overall=4.0, count(1)=37445),
 Row(overall=5.0, count(1)=102790)]

In [38]:
#This double checks that our counts for each dataframe is the same (i.e. a difference of 0)
reviews_df.count() - sum(output.collect()[i][1] for i in range(5))

0

In [37]:
type(reviews_df)

pyspark.sql.dataframe.DataFrame

Convert to RDD!

In [39]:
reviews_df.rdd

MapPartitionsRDD[162] at javaToPython at NativeMethodAccessorImpl.java:0

*This is object is converted to an RDD network object to be distrubted into the system*

In [40]:
type(reviews_df.rdd)

pyspark.rdd.RDD

## Exploring the DataFrame

### Count the Words in the First Row

In [41]:
row_one = df.first()

In [42]:
row_one

Row(asin='0439893577', helpful=[0, 0], overall=5.0, reviewText='I like the item pricing. My granddaughter wanted to mark on it but I wanted it just for the letters.', reviewTime='01 29, 2014', reviewerID='A1VXOAVRGKGEAK', reviewerName='Angie', summary='Magnetic board', unixReviewTime=1390953600)

In [44]:
#Check to make sure it's a PySpark row
type(row_one)

pyspark.sql.types.Row

In [45]:
def word_count(text):
    return len(text.split())

In [46]:
word_count(row_one['reviewText'])

20

*There are 20 words in the following sentence <br> "'I like the item pricing. My granddaughter wanted to mark on it but I wanted it just for the letters.'"*

In [47]:
#'udf' is for User Defined Function!
#'F' is for any general function in Spark. Norman kept using the word 'Module' instead of 'Function'
word_count_udf = F.udf(word_count, returnType=IntegerType())

In [48]:
review_text_col = df['reviewText']

In [49]:
counts_df = df.withColumn('wordCount', word_count_udf(review_text_col))

In [50]:
# Remember that we set the default number of lines to show at 5.

show(counts_df).T

Unnamed: 0,0,1,2,3,4
asin,0439893577,0439893577,0439893577,0439893577,0439893577
helpful,"[0, 0]","[1, 1]","[1, 1]","[0, 0]","[1, 1]"
overall,5,4,5,5,4
reviewText,I like the item pricing. My granddaughter want...,Love the magnet easel... great for moving to d...,Both sides are magnetic. A real plus when you...,Bought one a few years ago for my daughter and...,I have a stainless steel refrigerator therefor...
reviewTime,"01 29, 2014","03 28, 2014","01 28, 2013","02 8, 2014","05 5, 2014"
reviewerID,A1VXOAVRGKGEAK,A8R62G708TSCM,A21KH420DK0ICA,AR29QK6HPFYZ4,ACCH8EOML6FN5
reviewerName,Angie,Candace,capemaychristy,dcrm,DoyZ
summary,Magnetic board,it works pretty good for moving to different a...,love this!,Daughters love it,Great to have so he can play with his alphabet...
unixReviewTime,1390953600,1395964800,1359331200,1391817600,1399248000
wordCount,20,22,76,31,47


*Notice the wordCount row (which would of been a column if it wasn't transposed)*

*We can't just use our word_count() function. For the spark dataframe object, We can't throw in a whole column. We have to create a User Defined Function*

In [51]:
counts_df_not_udf = df.withColumn('wordCount', word_count(review_text_col))
show(counts_df_not_udf).T

TypeError: ignored

In [52]:
word_count_udf = F.udf(word_count, IntegerType())

# Registering our word_count() function so that we
# can use it with SQL! See documentation here:
# https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UDFRegistration.html

df.createOrReplaceTempView('reviews')
spark.udf.register('word_count', word_count_udf)

<function __main__.word_count>

In [53]:
# Now we can use our function in a SQL query!

query = """
SELECT asin, overall, reviewText, word_count(reviewText) AS wordCount
FROM reviews
"""

In [56]:
#It knows to point to the review because we pointed to the reviews at an earlier stage of the code (scroll up)
counts_df = spark.sql(query)

In [55]:
show(counts_df)

Unnamed: 0,asin,overall,reviewText,wordCount
0,439893577,5.0,I like the item pricing. My granddaughter want...,20
1,439893577,4.0,Love the magnet easel... great for moving to d...,22
2,439893577,5.0,Both sides are magnetic. A real plus when you...,76
3,439893577,5.0,Bought one a few years ago for my daughter and...,31
4,439893577,4.0,I have a stainless steel refrigerator therefor...,47


In [None]:
def count_all_the_things(text):
    return [len(text), len(text.split())]

In [None]:
count_udf = F.udf(count_all_the_things, returnType=ArrayType(IntegerType()))

In [None]:
counts_df = df.withColumn('counts', count_udf(df['reviewText']))

In [None]:
show(counts_df, 1)

In [None]:
slim_counts_df = (
    df.drop('reviewTime', 'helpful')
#       .drop('helpful')
      .withColumn('counts', count_udf(df['reviewText']))
      .drop('reviewText')
)

In [None]:
show(slim_counts_df, n=1)

In [None]:
aggs = counts_df.groupBy('reviewerID').agg({'overall': 'mean'})
aggs.collect()

### A Few More Basic Commands

Please refer also to the [official programming guide](http://spark.apache.org/docs/latest/rdd-programming-guide.html).

In [None]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [None]:
def multiply(a, b):
    return a * b

In [None]:
distData.reduce(multiply)

In [None]:
distData.filter(lambda x: x < 4).collect()

## Reading files

```sc.textFile()``` for .txt files

`.toJSON()` for .json files

In [None]:
dfjson = counts_df.toJSON()

In [None]:
df2 = spark.read.json(dfjson)

In [None]:
df2.printSchema()

In [None]:
counts_df

In [None]:
type(df.toPandas())