## Spark/pyspark for "big" data 

This notebook provides a demonstration of working with gzipped (compressed) JSON-L (line-delineated Javascript Object Notation) files on single computer using Python, pyspark, and Apache Spark.

#### Specs
 - Python version: 3.8
 - Spark version: 3.1.2
 - Java version: OpenJDK 11.0.11+9 
 - MacBook Pro, 2.8 GHz Intel Core i7
 - Number of cores: 4
 - Memory: 16 GB
 - OS: macOS Mojave (10.14.6)
 
#### Setup
 - 26GB of Tweets (gzipped JSON-L format)
 - 1,000 files
 - Files stored locally
 - Spark running in "local" mode (i.e., on a single machine)

#### Spark/Java system settings

Relevant lines from `.bash_profile` in my home directory. (These values will differ, depending on your setup.)

```
export SPARK_HOME=/opt/spark
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.0.11+9/Contents/Home
export PATH=$SPARK_HOME/bin:$PATH


```

#### A "naive" Python approach 

 - Loop over a list of gzipped JSON-L files
 - Increment a counter
 - Returns the number of documents across all files 
 - Avoids reading all files into memory at once
 - Takes a _long_ time to run

In [1]:
import gzip
import glob
import json

def count_tweets(files):
    i = 0
    for file in files:
        with gzip.open(file, mode='rt') as f:
            for line in f:
                i += 1
    return i

 - Get a list of files we want to read

In [2]:
files = glob.glob('/Users/dsmith/Documents/code/workshops/gwlibraries-workshops/spark-gwcoders/coronavirus/*.json.gz')

 - Takes 11 minutes to process 1,000 files

In [3]:
%time count_tweets(files)

CPU times: user 11min 5s, sys: 14.6 s, total: 11min 20s
Wall time: 11min 25s


26284777

What we might want is something like this:

Unnamed: 0,text,max(retweet_count)
0,RT @NicholsUprising: Sicily has figured out th...,211408
1,RT @_theghettomonk: Not enough praise has gone...,209404
2,RT @_theghettomonk: Not enough praise has gone...,209248
3,RT @MMarieSophie: HIS NAME IS STEPHEN WAMUKOTA,201466
4,RT @MMarieSophie: HIS NAME IS STEPHEN WAMUKOTA...,201304
...,...,...
95,RT @RachelPatzerPhD: My spouse is a physician ...,59764
96,RT @LeoSaldanaP: #quedateEnTuCasa Soy neumólog...,59723
97,RT @anmmoo: [ฝากรีนะคะ] พ่อเราและเพื่อนๆเขา ทำ...,59677
98,RT @DrPeckPNP: This is how far back we have to...,59458


### Leveraging Spark

#### Preliminaries

 - `findspark` connects our notebook to the pyspark library

In [4]:
import findspark
findspark.init()

Importing some necessary tools
- A `SparkSession` instance provides our interface with the Spark runtime
- Spark comes with a number of functions for working with data modeled on SQL

In [5]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

 - Create a session 
 - You might encounter an error if you haven't installed Spark/Java correctly

In [6]:
spark = SparkSession.builder.appName('GWCoders-Demo').getOrCreate()

#### Understanding Spark's data model

 - Works well with structured data (JSON, CSV, etc.)
 - Strongly typed: each field needs a well-defined (Scala) data type
 - Can infer the **schema** instead of our having to define it explicitly

##### A scaleable approach
 - If your dataset uses the same fields consistently, read a small sample first
 - Reuse the schema when reading the entire dataset
 - `read.json`: use with a single file or a whole directory
 - Add the prefix `file://` to direct Spark to use your local filesystem


In [7]:
sample = spark.read.json('file://' + files[0])

##### DataFrames in Spark
 - "Lazily" evaluated: data only read into memory at the time of computation
 - The DataFrame provides a "frame" with type information that is filled as needed
 - We can use the `show` method to view the first N rows
 - A Spark DataFrame supports nested data types
 - Use the `printSchema` method to view the schema hierarchy
 - Use the `schema` property to acess the schema directly

In [8]:
sample.show()

+------------+-----------+--------------------+------------------+--------------------+-----------------+--------------------+--------------+---------+------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+--------------------+------------------+-----------+--------------------+-------------------+--------------------+-----------------------+-----------+-------------+---------+--------------------+--------------------+--------------------+-------------+---------+--------------------+---------------------+
|contributors|coordinates|          created_at|display_text_range|            entities|extended_entities|      extended_tweet|favorite_count|favorited|filter_level| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|   

In [9]:
sample.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: bo

In [10]:
# We'll re-use this later on
schema = sample.schema

#### Spark DataFrame methods
 - Modeled on SQL
 - Use the `select` method to create a subset of one or more columns
 - Use the `where` or `filter` functions to create a subset of rows matching certain conditions
 - The `pyspark.sql.functions` [module](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html) provides access to Spark's DataFrame functions for manipulating data 

In [11]:
sample.count()

73816

In [12]:
# The id_str is the field in a Tweet document that uniquely identifies the Tweet
sample.select('id_str').distinct().count()

73809

In [13]:
# This result is the same as the above
# Here we create a new DataFrame with a single column containing our distinct count
sample.select(F.countDistinct('id_str')).show()

+----------------------+
|count(DISTINCT id_str)|
+----------------------+
|                 73809|
+----------------------+



In [14]:
# We can filter our sample to just those Tweets that are re-tweets
sample.where(F.col('retweeted_status').isNotNull()).select(F.countDistinct('id_str')).show()

+----------------------+
|count(DISTINCT id_str)|
+----------------------+
|                 60991|
+----------------------+



##### Grouping and aggregating over fields
 - The `groupBy` methods works like the `groupby` method in pandas
 - Select the relevant columns, group, then apply an aggregation
 - Need to call `show` to display the result

In [15]:
sample.select('lang', 'id_str').groupBy('lang').agg(F.countDistinct('id_str')).show()

+----+-------------+
|lang|count(id_str)|
+----+-------------+
|  en|        41003|
|  ne|           18|
|  vi|            4|
|  ps|            1|
|  ro|            7|
|  sl|           10|
| und|         1988|
|  ur|          150|
|  lv|            5|
|  pl|           43|
|  pt|          860|
|  tl|         1522|
|  in|         2433|
|  ko|           64|
|  uk|            7|
|  cs|            4|
|  mr|           77|
|  sr|            2|
|  tr|          292|
|  de|          245|
+----+-------------+
only showing top 20 rows



##### Working with nested data
 - Much harder to do in pandas
 - Use **dot notation** to drill down into a hiearchical column
 - The `explode` method: unroll an array into a new column, creating 1 row per element 
 - Chain `select` statements to extract subfields from array elements
 - Create intermediate DataFrames for group by/aggregate operations
 - "Lazy" evaluation: not computation performed until `show` is called

In [16]:
# The hashtags field is an array nested under the entities field
sample.select('entities.hashtags').show()

+--------------------+
|            hashtags|
+--------------------+
|[{[24, 30], Putin...|
|                  []|
|                  []|
|[{[16, 28], coron...|
|                  []|
|    [{[71, 74], US}]|
|                  []|
| [{[18, 24], China}]|
|                  []|
|[{[33, 41], COVID...|
|[{[54, 66], coron...|
|[{[99, 111], Coro...|
|[{[64, 72], COVID...|
|[{[77, 85], COVID...|
|                  []|
|                  []|
|[{[19, 23], ANF},...|
|[{[68, 76], COVID...|
|                  []|
|[{[63, 71], COVID...|
+--------------------+
only showing top 20 rows



In [17]:
# It consists of multiple subfields
# The text subfield contains the hashtag text
sample.select(F.explode('entities.hashtags'), 'id_str').show()

+--------------------+-------------------+
|                 col|             id_str|
+--------------------+-------------------+
|   {[24, 30], Putin}|1245558553961611264|
|   {[37, 43], China}|1245558553961611264|
|{[68, 80], Corona...|1245558553961611264|
|{[16, 28], corona...|1245558554133696513|
|      {[71, 74], US}|1245558554259423232|
|   {[18, 24], China}|1245558554330718210|
| {[33, 41], COVID19}|1245558554641096706|
|{[54, 66], corona...|1245558554670620677|
|{[99, 111], Coron...|1245558554800451584|
| {[64, 72], COVID19}|1245558554712399872|
| {[77, 85], COVID19}|1245558554834206722|
|     {[19, 23], ANF}|1245558555207483393|
|{[103, 111], COVI...|1245558555207483393|
| {[68, 76], COVID19}|1245558555274547200|
| {[63, 71], COVID19}|1245558555173949440|
|{[85, 103], emerg...|1245558555173949440|
|{[66, 78], corona...|1245558555387875331|
|{[105, 113], COVI...|1245558555387875331|
|{[114, 122], Flor...|1245558555387875331|
|{[60, 72], corona...|1245558555576500226|
+----------

In [18]:
# the take method makes more explicit how the data is structured in each row
sample.select(F.explode('entities.hashtags'), 'id_str').take(1)

[Row(col=Row(indices=[24, 30], text='Putin'), id_str='1245558553961611264')]

In [19]:
# We can refer to the _exploded_ hashtags field by its default name, col
sample.select(F.explode('entities.hashtags'),'id_str')\
        .select('id_str', 'col.text').show()

+-------------------+-----------------+
|             id_str|             text|
+-------------------+-----------------+
|1245558553961611264|            Putin|
|1245558553961611264|            China|
|1245558553961611264|      CoronaVirus|
|1245558554133696513|      coronavirus|
|1245558554259423232|               US|
|1245558554330718210|            China|
|1245558554641096706|          COVID19|
|1245558554670620677|      coronavirus|
|1245558554800451584|      Coronavirus|
|1245558554712399872|          COVID19|
|1245558554834206722|          COVID19|
|1245558555207483393|              ANF|
|1245558555207483393|          COVID19|
|1245558555274547200|          COVID19|
|1245558555173949440|          COVID19|
|1245558555173949440|emergencyservices|
|1245558555387875331|      coronavirus|
|1245558555387875331|          COVID19|
|1245558555387875331|          Florida|
|1245558555576500226|      coronavirus|
+-------------------+-----------------+
only showing top 20 rows



In [20]:
hashtags = sample.select(F.explode('entities.hashtags'), 'id_str')\
        .select('id_str', 'col.text')

In [21]:
# Running the groupBy method on our intermediate DataFrame
hashtags.groupBy('text').count().sort(F.desc('count')).show()

+--------------+-----+
|          text|count|
+--------------+-----+
|       COVID19|18416|
|   coronavirus|10557|
|   Coronavirus| 3462|
|       Covid19| 2208|
|       covid19| 1696|
|         China| 1529|
|     COVIDー19| 1488|
|   CoronaVirus| 1235|
|         WATCH|  546|
|       Ecuador|  491|
|        Taiwan|  383|
| AprilFoolsDay|  355|
|      ผนงรจตกม|  340|
|         Putin|  335|
|     Guayaquil|  250|
|      lockdown|  249|
|TablighiJamaat|  237|
|         Trump|  229|
|     Venezuela|  209|
|         Wuhan|  208|
+--------------+-----+
only showing top 20 rows



 - We can use nested fields in other kinds of aggregations, too.

In [22]:
# None of the Tweets in our sample have been retweeted
sample.select(F.max('retweet_count')).show()

+------------------+
|max(retweet_count)|
+------------------+
|                 0|
+------------------+



In [23]:
# The retweeted_status field contains metadata about the Tweet that was retweeted
# We can use that field to find the most commonly retweeted Tweets in our set
sample.select('retweeted_status.retweet_count', 'text').groupby('text').max('retweet_count')\
        .sort(F.desc('max(retweet_count)')).show()

+--------------------+------------------+
|                text|max(retweet_count)|
+--------------------+------------------+
|RT @NeoWokio: He ...|            195289|
|RT @maxbrooksauth...|            195268|
|RT @siravariety: ...|            179633|
|RT @bts_love_myse...|             97036|
|RT @BarackObama: ...|             93892|
|RT @yagiirlalyssa...|             84481|
|RT @protectheflam...|             74491|
|RT @donni__69: Pa...|             72101|
|RT @Craig_A_Spenc...|             69442|
|RT @AnaCabrera: J...|             64530|
|RT @DrPeckPNP: Th...|             58176|
|RT @AnnaPodolancz...|             57740|
|RT @_xlifeofmo: G...|             53820|
|RT @mbahndi: masa...|             52074|
|RT @DarylDust: 92...|             49885|
|RT @PalliThordars...|             49648|
|RT @BeardedGenius...|             47228|
|RT @OhGreatChoice...|             46129|
|RT @lainiemarsh: ...|             45279|
|RT @SoSplendidPM:...|             45172|
+--------------------+------------

#### Spark SQL
 - DataFrame functions can be expressed as [direct SQL expressions](https://spark.apache.org/docs/latest/api/sql/index.html)
 - Create a temp view/table 
 - Run SQL statements against the temp view
 - Handy for complex queries

In [24]:
# Defines a SQL statement as a Python multi-line string
lang_counts = '''
select lang, count(distinct id_str) as tweet_count
from tweets
group by lang
order by tweet_count desc
'''

In [25]:
# Instantiates a temporary view (referenced in our SQL expression)
sample.createOrReplaceTempView('tweets')

In [26]:
# Use the spark.sql method to execute the query
spark.sql(lang_counts).show()

+----+-----------+
|lang|tweet_count|
+----+-----------+
|  en|      41003|
|  es|      14468|
|  th|       3855|
|  in|       2433|
| und|       1988|
|  ja|       1828|
|  hi|       1620|
|  tl|       1522|
|  fr|       1481|
|  pt|        860|
|  ta|        410|
|  it|        308|
|  tr|        292|
|  de|        245|
|  ar|        222|
|  zh|        181|
|  ca|        172|
|  ur|        150|
|  ru|        104|
|  mr|         77|
+----+-----------+
only showing top 20 rows



Below is a SQL query that will create a table of edges between Twitter users who have authored Tweets in our dataset and Twitter users mentioned by those authors.

In [27]:
# Using a nested query to explode the mentions field, which is an array, each element of which
# corresponds to a mentioned user
mentions_sql = '''
select mentions.screen_name as mentioned_screen_name,
mentions.id_str as mentioned_user_id,
user_screen_name,
user_id
from (select explode(entities.user_mentions) as mentions,
    user.id_str as user_id,
    user.screen_name as user_screen_name
    from tweets)
'''

In [28]:
sample.createOrReplaceTempView('tweets')

In [29]:
spark.sql(mentions_sql).show()

+---------------------+-------------------+----------------+-------------------+
|mentioned_screen_name|  mentioned_user_id|user_screen_name|            user_id|
+---------------------+-------------------+----------------+-------------------+
|           marcorubio|           15745368| StevenL34059537|1185910077569392643|
|        SusanSarandon|          618300542|     Ana21704204|1217287094990598146|
|          briebriejoy|         1332630685|     Ana21704204|1217287094990598146|
|           schweizok2| 957154283002179584|        _Steirer|1016428260186521601|
|           rameshlaus|          102268884|      im_ravi___|         3155297557|
|               DFRLab| 722117494899679232|       diplowonk|          300589741|
|           r_osadchuk|         2822354336|       diplowonk|          300589741|
|         GordonGChang|          151930383|     Sam00070683|1051479796658491392|
|         himantabiswa|          131188226| tamimchoudhury9|         2550160466|
|          tcsnoticias|     

#### Querying the full dataset
 - From an in-memory sample to a full dataset on disk
 - Query optimization: write/show only at the end
 - Grouping/joining data takes longer other kinds of transformations
 - Re-use the schema wherever possible
 - The Spark [Jobs UI](http://localhost:4040/jobs/) can help you monitor/troubleshoot


In [30]:
# Reading in our full dataset
# Note the call to the schema method, using the variable defined earlier
df = spark.read.schema(schema).json('file:///Users/dsmith/Documents/code/workshops/gwlibraries-workshops/spark-gwcoders/coronavirus/')

In [31]:
# Create a new temp view in order to apply our SQL
df.createOrReplaceTempView('tweets')
results = spark.sql(mentions_sql)

In [33]:
# Took 6.9 minutes on my machine
results.write.csv('file:///Users/dsmith/Documents/code/workshops/gwlibraries-workshops/spark-gwcoders/covid-mentions')

In [39]:
# Creating a DataFrame of the top 100 most retweeted Tweets across the whole datasset
# http://localhost:4040/jobs/
retweets = df.select('retweeted_status.retweet_count', 'text').groupby('text').max('retweet_count')\
        .sort(F.desc('max(retweet_count)'))

In [40]:
retweets.limit(100).toPandas()

Unnamed: 0,text,max(retweet_count)
0,RT @NicholsUprising: Sicily has figured out th...,211408
1,RT @_theghettomonk: Not enough praise has gone...,209404
2,RT @_theghettomonk: Not enough praise has gone...,209248
3,RT @MMarieSophie: HIS NAME IS STEPHEN WAMUKOTA,201466
4,RT @MMarieSophie: HIS NAME IS STEPHEN WAMUKOTA...,201304
...,...,...
95,RT @RachelPatzerPhD: My spouse is a physician ...,59764
96,RT @LeoSaldanaP: #quedateEnTuCasa Soy neumólog...,59723
97,RT @anmmoo: [ฝากรีนะคะ] พ่อเราและเพื่อนๆเขา ทำ...,59677
98,RT @DrPeckPNP: This is how far back we have to...,59458


#### Take-aways & considerations
 - Spark is useful for
   - Datasets too large to fit into memory
   - Deeply nested data structures
   - Distributed datasets
 - Requires Java & Scala
 - Interfaces exist for Python (PySpark) and [R](https://spark.apache.org/docs/latest/sparkr.html)
 - Type & organization of the data matter (e.g., many small files vs. a few large files)
 - Many [configuration](https://spark.apache.org/docs/latest/configuration.html) options 
 - Groupby aggregations & joins "cost" more than other transformations

#### Further resources

##### Installation guides consulted

   - https://kevinvecmanis.io/python/pyspark/install/2019/05/31/Installing-Apache-Spark.html
   - https://medium.com/swlh/pyspark-on-macos-installation-and-use-31f84ca61400
   - https://www.datacamp.com/community/tutorials/installation-of-pyspark

Note that some guides recommend installing Scala separately. If you do that, make sure that the version of Scala you are using is compatible with the version of Spark you are using. More information on the [Apache Spark downloads](https://spark.apache.org/downloads.html) page.

##### Books & tutorials on Spark/pyspark

   - [Spark: The Definitive Guide](https://learning.oreilly.com/library/view/spark-the-definitive/9781491912201/)
   - [Data Analytics with Spark Using Python](https://learning.oreilly.com/library/view/data-analytics-with/9780134844855/)
   - [Learning Spark](https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/)
   - [Best Practices for Spark Performance Optimization](https://developer.ibm.com/blogs/spark-performance-optimization-guidelines/)
   