# Exploring Spark with Pandas


Using pandas examples, convert the analysis to pyspark. This is useful if you discover your data grows too large for your tooling.

The purpose of this notebook is to familiarise yourself you the pyspark API. You are welcome to use the R version of this if you wish. As long as you are able to obtain the correct results. We will be using python in this notebook as it is quite widely used through data science and the community is very large.



#### Firstly, let's get our spark session

In [3]:
!lsb_release -a

No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 22.04.2 LTS
Release:	22.04
Codename:	jammy


In [4]:
!apt-get update

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 14.2 kB/110                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [498 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:8 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [1,014 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Get:10 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [

In [5]:
# Install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [6]:
# get spark
VERSION='3.5.0'
!wget https://dlcdn.apache.org/spark/spark-$VERSION/spark-$VERSION-bin-hadoop3.tgz

--2023-09-21 18:14:01--  https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.0-bin-hadoop3.tgz’


2023-09-21 18:14:07 (68.0 MB/s) - ‘spark-3.5.0-bin-hadoop3.tgz’ saved [400395283/400395283]



In [7]:
# decompress spark
!tar xf spark-$VERSION-bin-hadoop3.tgz

# install python package to help with system paths
!pip install -q findspark

In [8]:
# Let Colab know where the java and spark folders are

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{VERSION}-bin-hadoop3"

In [11]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName("local[*]").getOrCreate()

### Overview


* Joining two dataframes/data sets
* Simple aggregations
* Persisting

#### JOIN: Pandas

We won't use this more in this notebook, but observe how the joins work.

We what happens if you change from the default inner join to outer joins.

In [None]:
customer_raw = [(1, 'bob', 3462543658686),
           (2, 'rob', 9087567565439),
           (3, 'tim', 5436586999467),
           (4, 'tom', 8349756853250)]

customer_cols = ['id', 'name', 'credit_card_number']



orders_raw = [(1, 'ketchup', 'bob', 1.20),
           (2, 'rutabaga', 'bob', 3.35),
           (3, 'fake vegan meat', 'rob', 13.99),
           (4, 'cheesey poofs', 'tim', 3.99),
           (5, 'ice cream', 'tim', 4.95),
           (6, 'protein powder', 'tom', 49.95)]

orders_cols = ['id', 'product_name', 'customer', 'price']

In [None]:
customer_df = pd.DataFrame(customer_raw, columns=customer_cols)
orders_df = pd.DataFrame(orders_raw, columns=orders_cols)

customer_df

joined_df = pd.merge(customer_df, orders_df, how='inner', left_on='name', right_on='customer')
joined_df

## For self study. What happens if (4, 'tom', 8349756853250) in valuesA becomes (4, 'tod', 8349756853250)
## How do the results change?
## More sensibly; what if customers have not made any orders but we still require them in the result set?

#### JOIN: Spark

In [None]:
customersDF = spark.createDataFrame(customer_raw, customer_cols)

ordersDF = spark.createDataFrame(orders_raw, orders_cols)

# Show tables
customersDF.show()
ordersDF.show()

In [None]:
joinedDF = customersDF.join(ordersDF, customersDF.name == ordersDF.customer)
joinedDF.show()

## Simple Aggregations

Now let's explore simple aggregations. You will be using these often when doing exploratory work in big data. Remember, the intention here is that you grow familiar with the way the API works, and how to translate inquiries into that API.

> _How much did each person spend?_

In [None]:
joined_df.groupby('name').agg({"price": ["sum"]})

In [None]:
import pyspark.sql.functions as f

joinedDF.groupby('name').agg(f.sum('price').alias('total')).show()

Let's use bigger data
  * NYC crash data

In [12]:
# save to the filesystem to prevent another load
! curl -o rows.csv https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  411M    0  411M    0     0  3136k      0 --:--:--  0:02:14 --:--:-- 2563k


In [13]:
import pandas as pd
nyc_df = pd.read_csv('rows.csv')

  nyc_df = pd.read_csv('rows.csv')


In [14]:
# number or rows

print(len(nyc_df))

# this is quite large so we will work with a sample while we experiment in pandas as least.

2026884


We'll take a random sample at 20% of the original data

In [15]:
nyc_small = nyc_df.sample(frac=0.2, replace=False, random_state=1)

In [16]:
# we are also going to limit the columns to those we are going to work with

nyc_small = nyc_small[['CRASH DATE', 'CONTRIBUTING FACTOR VEHICLE 1',
                       'BOROUGH', 'VEHICLE TYPE CODE 1',
                       'NUMBER OF PERSONS INJURED']]

In [17]:
nyc_small.head(2)

Unnamed: 0,CRASH DATE,CONTRIBUTING FACTOR VEHICLE 1,BOROUGH,VEHICLE TYPE CODE 1,NUMBER OF PERSONS INJURED
1424989,11/28/2014,Turning Improperly,BROOKLYN,PASSENGER VEHICLE,0.0
1422316,12/19/2014,Driver Inattention/Distraction,BROOKLYN,LARGE COM VEH(6 OR MORE TIRES),0.0


Now, let's create the pyspark dataframe. Now we two frames with the same content
  * nyc_small: pandas
  * sdf_small: pyspark

In [18]:
from pyspark.sql import SQLContext


# there are nan's in the frame with strings, and spark can't 'infer the schema', so we have to help it out
# by replacing them with empty strings and forcing the column to be a string

sdf_small = SQLContext(spark).createDataFrame(nyc_small.fillna('').astype('str'))


# Lets check the schema quickly

print(sdf_small.schema)



StructType([StructField('CRASH DATE', StringType(), True), StructField('CONTRIBUTING FACTOR VEHICLE 1', StringType(), True), StructField('BOROUGH', StringType(), True), StructField('VEHICLE TYPE CODE 1', StringType(), True), StructField('NUMBER OF PERSONS INJURED', StringType(), True)])


# Questions

Answer the following questions by porting the pandas code to the Spark API



# Question 1


> On what day do most crashes occcur?

In [None]:
# Pandas
nyc_small.groupby('CRASH DATE')['CRASH DATE'].count().sort_values(ascending=False).head(5)

In [25]:
### Spark?

from pyspark.sql.functions import col

# Group by 'CRASH DATE', count, and order by count in descending order
result = sdf_small.groupBy(['CRASH DATE']).agg({'CRASH DATE': 'count'})\
                  .orderBy(col('count(CRASH DATE)').desc()) \
                  .limit(5)

# Show the result
result.show()


+----------+-----------------+
|CRASH DATE|count(CRASH DATE)|
+----------+-----------------+
|11/15/2018|              228|
|01/21/2014|              219|
|05/19/2017|              205|
|06/21/2018|              196|
|12/15/2017|              194|
+----------+-----------------+



# Question 2

> _Where do most crashes occur?_

In [None]:
nyc_small.groupby('BOROUGH')['BOROUGH'].count().sort_values(ascending=False).head(5)

In [26]:
## Spark?

from pyspark.sql.functions import col

result = sdf_small.groupBy('BOROUGH').agg({'BOROUGH': 'count'}) \
                  .orderBy(col('count(BOROUGH)').desc()) \
                  .limit(5)

result.show()


+---------+--------------+
|  BOROUGH|count(BOROUGH)|
+---------+--------------+
|         |        126088|
| BROOKLYN|         88569|
|   QUEENS|         74822|
|MANHATTAN|         62911|
|    BRONX|         41229|
+---------+--------------+



 # Question 3

 > What is the most common cause of accident in 'QUEENS'

In [None]:
nyc_small[(nyc_small.BOROUGH == 'QUEENS')]['CONTRIBUTING FACTOR VEHICLE 1'].value_counts()

# you can also use a group by (to avoid the pandas value_counts function)

nyc_small[(nyc_small.BOROUGH == 'QUEENS')].groupby(
    'CONTRIBUTING FACTOR VEHICLE 1'
)['CONTRIBUTING FACTOR VEHICLE 1'].count().sort_values(ascending=False).head(5)

In [27]:
## Spark?

from pyspark.sql.functions import col, count

result = sdf_small.filter(col('BOROUGH') == 'QUEENS') \
                  .groupBy('CONTRIBUTING FACTOR VEHICLE 1') \
                  .agg(count('*').alias('count')) \
                  .orderBy(col('count').desc())

result.show()



+-----------------------------+-----+
|CONTRIBUTING FACTOR VEHICLE 1|count|
+-----------------------------+-----+
|                  Unspecified|26554|
|         Driver Inattentio...|15937|
|         Failure to Yield ...| 6322|
|             Backing Unsafely| 3673|
|         Following Too Clo...| 2708|
|         Passing or Lane U...| 1954|
|          Passing Too Closely| 1758|
|         Traffic Control D...| 1590|
|           Turning Improperly| 1544|
|              Fatigued/Drowsy| 1275|
|              Other Vehicular| 1199|
|          Driver Inexperience| 1068|
|          Alcohol Involvement|  975|
|                 Unsafe Speed|  954|
|         Unsafe Lane Changing|  678|
|            Pavement Slippery|  668|
|         Prescription Medi...|  582|
|           Lost Consciousness|  577|
|         View Obstructed/L...|  569|
|          Physical Disability|  551|
+-----------------------------+-----+
only showing top 20 rows



# Question 4

> _What is the average number or injuries for specific cars driving in specific suburbs_


In [None]:
nyc_small.groupby(['VEHICLE TYPE CODE 1', 'BOROUGH'])['NUMBER OF PERSONS INJURED'].mean().sort_values(ascending=False).head(3)

In [28]:
## Spark?

from pyspark.sql.functions import col, mean

result = sdf_small.groupBy('VEHICLE TYPE CODE 1', 'BOROUGH') \
                  .agg(mean('NUMBER OF PERSONS INJURED').alias('avg_injuries')) \
                  .orderBy(col('avg_injuries').desc()) \
                  .limit(3)

result.show()

+-------------------+--------+------------+
|VEHICLE TYPE CODE 1| BOROUGH|avg_injuries|
+-------------------+--------+------------+
|                rmb|  QUEENS|        11.0|
|                 PC|BROOKLYN|         5.0|
|          AMBULANVE|   BRONX|         4.0|
+-------------------+--------+------------+

