# Welcome to the Colab Spark Tutorial.

We will be using Spark a few times in this course, and the colab environment provides the compute (for 12 hours at a time) we need, along with this wonderful web-based notebook.

Today we will be configuring PySpark and exploring the SparkSQL features in relation to the Spark API

Source material includes [1]

Sections:

1.   Configuring your colab
2.   Using PySpark

Firstly, we need to configure the _colab_ instance

In [None]:
!lsb_release -a

No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 18.04.6 LTS
Release:	18.04
Codename:	bionic


In [None]:
!apt-get update

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Wait                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7

In [None]:
# Install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# get spark 
VERSION='3.2.2'
!wget https://dlcdn.apache.org/spark/spark-$VERSION/spark-$VERSION-bin-hadoop3.2.tgz

--2022-08-13 11:03:33--  https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 301112604 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.2-bin-hadoop3.2.tgz.1’


2022-08-13 11:03:35 (164 MB/s) - ‘spark-3.2.2-bin-hadoop3.2.tgz.1’ saved [301112604/301112604]



In [None]:
# decompress spark
!tar xf spark-$VERSION-bin-hadoop3.2.tgz

# install python package to help with system paths
!pip install -q findspark

In [None]:
# Let Colab know where the java and spark folders are

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{VERSION}-bin-hadoop3.2"

In [None]:
# add pyspark to sys.path using findspark
import findspark
findspark.init()

# Exploring Spark with Pandas


Using pandas examples, convert the analysis to pyspark. This is useful if you discover your data grows too large for your tooling.

The purpose of this notebook is to familiarise yourself you the pyspark API. You are welcome to use the R version of this if you wish. As long as you are able to obtain the correct results. We will be using python in this notebook as it is quite widely used through data science and the community is very large.



#### Firstly, let's get our spark session

In [None]:
from pyspark.sql import SparkSession
import pandas as pd 
spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()

### Overview


* Joining two dataframes/data sets
* Simple aggregations
* Persisting

#### JOIN: Pandas

We won't use this more in this notebook, but observe how the joins work.

We what happens if you change from the default inner join to outer joins.

In [None]:
customer_raw = [(1, 'bob', 3462543658686),
           (2, 'rob', 9087567565439),
           (3, 'tim', 5436586999467),
           (4, 'tom', 8349756853250)]

customer_cols = ['id', 'name', 'credit_card_number']



orders_raw = [(1, 'ketchup', 'bob', 1.20),
           (2, 'rutabaga', 'bob', 3.35),
           (3, 'fake vegan meat', 'rob', 13.99),
           (4, 'cheesey poofs', 'tim', 3.99),
           (5, 'ice cream', 'tim', 4.95),
           (6, 'protein powder', 'tom', 49.95)]

orders_cols = ['id', 'product_name', 'customer', 'price']

In [None]:
customer_df = pd.DataFrame(customer_raw, columns=customer_cols)
orders_df = pd.DataFrame(orders_raw, columns=orders_cols)

customer_df

joined_df = pd.merge(customer_df, orders_df, how='inner', left_on='name', right_on='customer')
joined_df

## For self study. What happens if (4, 'tom', 8349756853250) in valuesA becomes (4, 'tod', 8349756853250)
## How do the results change?
## More sensibly; what if customers have not made any orders but we still require them in the result set? 

Unnamed: 0,id_x,name,credit_card_number,id_y,product_name,customer,price
0,1,bob,3462543658686,1,ketchup,bob,1.2
1,1,bob,3462543658686,2,rutabaga,bob,3.35
2,2,rob,9087567565439,3,fake vegan meat,rob,13.99
3,3,tim,5436586999467,4,cheesey poofs,tim,3.99
4,3,tim,5436586999467,5,ice cream,tim,4.95
5,4,tom,8349756853250,6,protein powder,tom,49.95


#### JOIN: Spark

In [None]:
customersDF = spark.createDataFrame(customer_raw, customer_cols)

ordersDF = spark.createDataFrame(orders_raw, orders_cols)

# Show tables
customersDF.show()
ordersDF.show()

+---+----+------------------+
| id|name|credit_card_number|
+---+----+------------------+
|  1| bob|     3462543658686|
|  2| rob|     9087567565439|
|  3| tim|     5436586999467|
|  4| tom|     8349756853250|
+---+----+------------------+

+---+---------------+--------+-----+
| id|   product_name|customer|price|
+---+---------------+--------+-----+
|  1|        ketchup|     bob|  1.2|
|  2|       rutabaga|     bob| 3.35|
|  3|fake vegan meat|     rob|13.99|
|  4|  cheesey poofs|     tim| 3.99|
|  5|      ice cream|     tim| 4.95|
|  6| protein powder|     tom|49.95|
+---+---------------+--------+-----+



In [None]:
joinedDF = customersDF.join(ordersDF, customersDF.name == ordersDF.customer)
joinedDF.show()

+---+----+------------------+---+---------------+--------+-----+
| id|name|credit_card_number| id|   product_name|customer|price|
+---+----+------------------+---+---------------+--------+-----+
|  1| bob|     3462543658686|  1|        ketchup|     bob|  1.2|
|  1| bob|     3462543658686|  2|       rutabaga|     bob| 3.35|
|  2| rob|     9087567565439|  3|fake vegan meat|     rob|13.99|
|  3| tim|     5436586999467|  4|  cheesey poofs|     tim| 3.99|
|  3| tim|     5436586999467|  5|      ice cream|     tim| 4.95|
|  4| tom|     8349756853250|  6| protein powder|     tom|49.95|
+---+----+------------------+---+---------------+--------+-----+



## Simple Aggregations

Now let's explore simple aggregations. You will be using these often when doing exploratory work in big data. Remember, the intention here is that you grow familiar with the way the API works, and how to translate inquiries into that API.

> _How much did each person spend?_

In [None]:
joined_df.groupby('name').agg({"price": ["sum"]}) 

Unnamed: 0_level_0,price
Unnamed: 0_level_1,sum
name,Unnamed: 1_level_2
bob,4.55
rob,13.99
tim,8.94
tom,49.95


In [None]:
import pyspark.sql.functions as f

joinedDF.groupby('name').agg(f.sum('price').alias('total')).show()

+----+-----------------+
|name|            total|
+----+-----------------+
| bob|             4.55|
| rob|            13.99|
| tim|8.940000000000001|
| tom|            49.95|
+----+-----------------+



Let's use bigger data
  * NYC crash data

In [None]:
# save to the filesystem to prevent another load
! curl -o rows.csv https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  391M    0  391M    0     0  4238k      0 --:--:--  0:01:34 --:--:-- 4814k


In [None]:
import pandas as pd
nyc_df = pd.read_csv('rows.csv')
nyc_df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,CRASH DATE,CRASH TIME,BOROUGH,ZIP CODE,LATITUDE,LONGITUDE,LOCATION,ON STREET NAME,CROSS STREET NAME,OFF STREET NAME,...,CONTRIBUTING FACTOR VEHICLE 2,CONTRIBUTING FACTOR VEHICLE 3,CONTRIBUTING FACTOR VEHICLE 4,CONTRIBUTING FACTOR VEHICLE 5,COLLISION_ID,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,VEHICLE TYPE CODE 3,VEHICLE TYPE CODE 4,VEHICLE TYPE CODE 5
0,07/20/2022,1:25,,,40.835808,-73.89083,"(40.835808, -73.89083)",BOSTON ROAD,,,...,Unspecified,,,,4547589,Sedan,Sedan,,,
1,07/21/2022,5:20,,,,,,FDR DRIVE,,,...,Unspecified,,,,4548075,Sedan,Sedan,,,
2,04/14/2021,5:32,,,,,,BRONX WHITESTONE BRIDGE,,,...,Unspecified,,,,4407480,Sedan,Sedan,,,
3,04/13/2021,21:35,BROOKLYN,11217.0,40.68358,-73.97617,"(40.68358, -73.97617)",,,620 ATLANTIC AVENUE,...,,,,,4407147,Sedan,,,,
4,04/15/2021,16:15,,,,,,HUTCHINSON RIVER PARKWAY,,,...,,,,,4407665,Station Wagon/Sport Utility Vehicle,,,,


In [None]:
# number or rows

print(len(nyc_df))

# this is quite large so we will work with a sample while we experiment in pandas as least.

1917681


We'll take a random sample at 20% of the original data

In [None]:
nyc_small = nyc_df.sample(frac=0.2, replace=False, random_state=1)

In [None]:
# we are also going to limit the columns to those we are going to work with

nyc_small = nyc_small[['CRASH DATE', 'CONTRIBUTING FACTOR VEHICLE 1', 
                       'BOROUGH', 'VEHICLE TYPE CODE 1', 
                       'NUMBER OF PERSONS INJURED']]

In [None]:
nyc_small.head(2)

Unnamed: 0,CRASH DATE,CONTRIBUTING FACTOR VEHICLE 1,BOROUGH,VEHICLE TYPE CODE 1,NUMBER OF PERSONS INJURED
797894,09/05/2017,Driver Inattention/Distraction,QUEENS,Sedan,0.0
461101,03/12/2019,Following Too Closely,QUEENS,Station Wagon/Sport Utility Vehicle,0.0


Now, let's create the pyspark dataframe. Now we two frames with the same content
  * nyc_small: pandas
  * sdf_small: pyspark

In [None]:
from pyspark.sql import SQLContext


# there are nan's in the frame with strings, and spark can't 'infer the schema', so we have to help it out 
# by replacing them with empty strings and forcing the column to be a string

sdf_small = SQLContext(spark).createDataFrame(nyc_small.fillna('').astype('str'))


# Lets check the schema quickly

print(sdf_small.schema)



StructType(List(StructField(CRASH DATE,StringType,true),StructField(CONTRIBUTING FACTOR VEHICLE 1,StringType,true),StructField(BOROUGH,StringType,true),StructField(VEHICLE TYPE CODE 1,StringType,true),StructField(NUMBER OF PERSONS INJURED,StringType,true)))


In [None]:
sdf_small.show(2)

+----------+-----------------------------+-------+--------------------+-------------------------+
|CRASH DATE|CONTRIBUTING FACTOR VEHICLE 1|BOROUGH| VEHICLE TYPE CODE 1|NUMBER OF PERSONS INJURED|
+----------+-----------------------------+-------+--------------------+-------------------------+
|09/05/2017|         Driver Inattentio...| QUEENS|               Sedan|                      0.0|
|03/12/2019|         Following Too Clo...| QUEENS|Station Wagon/Spo...|                      0.0|
+----------+-----------------------------+-------+--------------------+-------------------------+
only showing top 2 rows



In [None]:
sql_small = nyc_small

dict = {'CRASH DATE':'CRASH_DATE', 
        'CONTRIBUTING FACTOR VEHICLE 1' : 'CONTRIBUTING_FACTOR_VEHICLE_1' ,
        'BOROUGH':'BOROUGH',
        'VEHICLE TYPE CODE 1':'VEHICLE_TYPE_CODE_1',
        'NUMBER OF PERSONS INJURED':'NUMBER_OF_PERSONS_INJURED'}

sql_small.rename(columns=dict,
          inplace=True)

sql_small = SQLContext(spark).createDataFrame(sql_small.fillna('').astype('str'))

sql_small.registerTempTable('data')



In [None]:
sql_small.show(2)

+----------+-----------------------------+-------+--------------------+-------------------------+
|CRASH_DATE|CONTRIBUTING_FACTOR_VEHICLE_1|BOROUGH| VEHICLE_TYPE_CODE_1|NUMBER_OF_PERSONS_INJURED|
+----------+-----------------------------+-------+--------------------+-------------------------+
|09/05/2017|         Driver Inattentio...| QUEENS|               Sedan|                      0.0|
|03/12/2019|         Following Too Clo...| QUEENS|Station Wagon/Spo...|                      0.0|
+----------+-----------------------------+-------+--------------------+-------------------------+
only showing top 2 rows



# Questions

Answer the following questions by porting the pandas code to the Spark API



# Question 1


> On what day do most crashes occcur?

In [None]:
# Pandas
nyc_small.groupby('CRASH_DATE')['CRASH_DATE'].count().sort_values(ascending=False).head(5)

CRASH_DATE
01/21/2014    241
11/15/2018    214
12/15/2017    210
05/19/2017    206
03/06/2015    188
Name: CRASH_DATE, dtype: int64

In [None]:
# Spark API
sdf_small.groupby('CRASH DATE').count().orderBy('count', ascending=False).show(5)

+----------+-----+
|CRASH DATE|count|
+----------+-----+
|01/21/2014|  241|
|11/15/2018|  214|
|12/15/2017|  210|
|05/19/2017|  206|
|03/06/2015|  188|
+----------+-----+
only showing top 5 rows



In [None]:
# Spark SQL

spark.sql("""
      SELECT CRASH_DATE, count(*) as freq
      FROM data
      GROUP BY CRASH_DATE
      ORDER BY 2 DESC LIMIT 5""").show()

+----------+----+
|CRASH_DATE|freq|
+----------+----+
|01/21/2014| 241|
|11/15/2018| 214|
|12/15/2017| 210|
|05/19/2017| 206|
|03/06/2015| 188|
+----------+----+



# Question 2

> _Where do most crashes occur?_

In [None]:
# Pandas

nyc_small.groupby('BOROUGH')['BOROUGH'].count().sort_values(ascending=False).head(5)

BOROUGH
BROOKLYN         83238
QUEENS           71207
MANHATTAN        60261
BRONX            38811
STATEN ISLAND    11121
Name: BOROUGH, dtype: int64

In [None]:
# Spark API

sdf_small.groupby('BOROUGH').count().orderBy('count', ascending=False).show()

+-------------+------+
|      BOROUGH| count|
+-------------+------+
|             |118898|
|     BROOKLYN| 83238|
|       QUEENS| 71207|
|    MANHATTAN| 60261|
|        BRONX| 38811|
|STATEN ISLAND| 11121|
+-------------+------+



In [None]:
# Spark SQL

spark.sql("""
      SELECT BOROUGH, count(*) as freq
      FROM data
      GROUP BY BOROUGH
      ORDER BY 2 DESC""").show()


+-------------+------+
|      BOROUGH|  freq|
+-------------+------+
|             |118898|
|     BROOKLYN| 83238|
|       QUEENS| 71207|
|    MANHATTAN| 60261|
|        BRONX| 38811|
|STATEN ISLAND| 11121|
+-------------+------+



 # Question 3
 
 > What is the most common cause of accident in 'QUEENS'

In [None]:
# Pandas 

nyc_small[(nyc_small.BOROUGH == 'QUEENS')]['CONTRIBUTING_FACTOR_VEHICLE_1'].value_counts()

# you can also use a group by (to avoid the pandas value_counts function)

nyc_small[(nyc_small.BOROUGH == 'QUEENS')].groupby(
    'CONTRIBUTING_FACTOR_VEHICLE_1'
)['CONTRIBUTING_FACTOR_VEHICLE_1'].count().sort_values(ascending=False).head(5)

CONTRIBUTING_FACTOR_VEHICLE_1
Unspecified                       25864
Driver Inattention/Distraction    14963
Failure to Yield Right-of-Way      6015
Backing Unsafely                   3546
Following Too Closely              2479
Name: CONTRIBUTING_FACTOR_VEHICLE_1, dtype: int64

In [None]:
# Spark API

sdf_small.filter(sdf_small.BOROUGH == 'QUEENS')\
                  .groupBy('CONTRIBUTING FACTOR VEHICLE 1')\
                  .agg(f.count(f.lit(1)).alias('frequency'))\
                  .orderBy('frequency', ascending=False).show(10)


+-----------------------------+---------+
|CONTRIBUTING FACTOR VEHICLE 1|frequency|
+-----------------------------+---------+
|                  Unspecified|    25864|
|         Driver Inattentio...|    14963|
|         Failure to Yield ...|     6015|
|             Backing Unsafely|     3546|
|         Following Too Clo...|     2479|
|         Passing or Lane U...|     1816|
|          Passing Too Closely|     1502|
|           Turning Improperly|     1416|
|         Traffic Control D...|     1399|
|              Fatigued/Drowsy|     1217|
+-----------------------------+---------+
only showing top 10 rows



In [None]:
# Spark SQL

spark.sql("""
      SELECT CONTRIBUTING_FACTOR_VEHICLE_1, count(*) as freq
      FROM data
      WHERE BOROUGH = 'QUEENS'
      GROUP BY CONTRIBUTING_FACTOR_VEHICLE_1
      ORDER BY 2 DESC LIMIT 10""").show()

+-----------------------------+-----+
|CONTRIBUTING_FACTOR_VEHICLE_1| freq|
+-----------------------------+-----+
|                  Unspecified|25864|
|         Driver Inattentio...|14963|
|         Failure to Yield ...| 6015|
|             Backing Unsafely| 3546|
|         Following Too Clo...| 2479|
|         Passing or Lane U...| 1816|
|          Passing Too Closely| 1502|
|           Turning Improperly| 1416|
|         Traffic Control D...| 1399|
|              Fatigued/Drowsy| 1217|
+-----------------------------+-----+



# Question 4

> _What is the average number or injuries for specific cars driving in specific suburbs_


In [None]:
# Pandas 

nyc_small.groupby(['VEHICLE_TYPE_CODE_1', 'BOROUGH'])['NUMBER_OF_PERSONS_INJURED'].mean().sort_values(ascending=False).head(10)

VEHICLE_TYPE_CODE_1  BOROUGH      
MTA B                BROOKLYN         7.0
CHEVY EXPR           QUEENS           3.0
Van Camper           BROOKLYN         3.0
comme                BRONX            2.5
SELF                 MANHATTAN        2.0
WAGON                STATEN ISLAND    2.0
SEMI                 MANHATTAN        2.0
usps                 BROOKLYN         2.0
50cc Scoot           BROOKLYN         2.0
VAN T                BRONX            2.0
Name: NUMBER_OF_PERSONS_INJURED, dtype: float64

In [None]:
# Spark API

sdf_small.filter((sdf_small.BOROUGH != '')).groupby('VEHICLE TYPE CODE 1', 'BOROUGH').agg(f.mean('NUMBER OF PERSONS INJURED').alias('mean')).orderBy('mean', ascending=False).show(10)

+-------------------+-------------+----+
|VEHICLE TYPE CODE 1|      BOROUGH|mean|
+-------------------+-------------+----+
|              MTA B|     BROOKLYN| 7.0|
|         CHEVY EXPR|       QUEENS| 3.0|
|         Van Camper|     BROOKLYN| 3.0|
|              comme|        BRONX| 2.5|
|              VAN T|        BRONX| 2.0|
|               SEMI|    MANHATTAN| 2.0|
|              MOTOR|     BROOKLYN| 2.0|
|              WAGON|STATEN ISLAND| 2.0|
|         School Bus|        BRONX| 2.0|
|         Cement Tru|       QUEENS| 2.0|
+-------------------+-------------+----+
only showing top 10 rows



In [None]:
# Spark SQL

spark.sql("""
      SELECT VEHICLE_TYPE_CODE_1, BOROUGH, 
      AVG(NUMBER_OF_PERSONS_INJURED) as mean
      FROM data
      WHERE BOROUGH != ''
      GROUP BY VEHICLE_TYPE_CODE_1, BOROUGH
      ORDER BY mean DESC LIMIT 10""").show()

+-------------------+---------+----+
|VEHICLE_TYPE_CODE_1|  BOROUGH|mean|
+-------------------+---------+----+
|              MTA B| BROOKLYN| 7.0|
|         Van Camper| BROOKLYN| 3.0|
|         CHEVY EXPR|   QUEENS| 3.0|
|              comme|    BRONX| 2.5|
|         50cc Scoot| BROOKLYN| 2.0|
|               SELF|MANHATTAN| 2.0|
|              VAN T|    BRONX| 2.0|
|              MOTOR| BROOKLYN| 2.0|
|         Cement Tru|   QUEENS| 2.0|
|         School Bus|    BRONX| 2.0|
+-------------------+---------+----+

