<a href="https://colab.research.google.com/github/Student-1469/day3-tutorial/blob/main/14690357_TUT3_simple_pandas_pyspark_agg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Exploring Spark with Pandas


Using pandas examples, convert the analysis to pyspark. This is useful if you discover your data grows too large for your tooling.

The purpose of this notebook is to familiarise yourself you the pyspark API. You are welcome to use the R version of this if you wish. As long as you are able to obtain the correct results. We will be using python in this notebook as it is quite widely used through data science and the community is very large.



In [None]:
# Step 1: Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Step 2: Download Spark 3.3.2 from a valid archive link
VERSION="3.3.2"
!wget -q https://archive.apache.org/dist/spark/spark-$VERSION/spark-$VERSION-bin-hadoop3.tgz -O spark-$VERSION-bin-hadoop3.tgz

# Step 3: Decompress Spark tar file
!tar xf spark-$VERSION-bin-hadoop3.tgz

# Step 4: Install findspark
!pip install -q findspark

# Step 5: Set environment variables for Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{VERSION}-bin-hadoop3"

# Step 6: Initialize findspark and create a Spark session
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Verify Spark version
print(spark.version)


3.3.2


#### Firstly, let's get our spark session

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()

### Overview


* Joining two dataframes/data sets
* Simple aggregations
* Persisting

#### JOIN: Pandas

We won't use this more in this notebook, but observe how the joins work.

We what happens if you change from the default inner join to outer joins.

In [None]:
customer_raw = [(1, 'bob', 3462543658686),
           (2, 'rob', 9087567565439),
           (3, 'tim', 5436586999467),
           (4, 'tom', 8349756853250)]

customer_cols = ['id', 'name', 'credit_card_number']



orders_raw = [(1, 'ketchup', 'bob', 1.20),
           (2, 'rutabaga', 'bob', 3.35),
           (3, 'fake vegan meat', 'rob', 13.99),
           (4, 'cheesey poofs', 'tim', 3.99),
           (5, 'ice cream', 'tim', 4.95),
           (6, 'protein powder', 'tom', 49.95)]

orders_cols = ['id', 'product_name', 'customer', 'price']

In [None]:
customer_df = pd.DataFrame(customer_raw, columns=customer_cols)
orders_df = pd.DataFrame(orders_raw, columns=orders_cols)

customer_df

joined_df = pd.merge(customer_df, orders_df, how='inner', left_on='name', right_on='customer')
joined_df

## For self study. What happens if (4, 'tom', 8349756853250) in valuesA becomes (4, 'tod', 8349756853250)
## How do the results change?
## More sensibly; what if customers have not made any orders but we still require them in the result set?

Unnamed: 0,id_x,name,credit_card_number,id_y,product_name,customer,price
0,1,bob,3462543658686,1,ketchup,bob,1.2
1,1,bob,3462543658686,2,rutabaga,bob,3.35
2,2,rob,9087567565439,3,fake vegan meat,rob,13.99
3,3,tim,5436586999467,4,cheesey poofs,tim,3.99
4,3,tim,5436586999467,5,ice cream,tim,4.95
5,4,tom,8349756853250,6,protein powder,tom,49.95


#### JOIN: Spark

In [None]:
customersDF = spark.createDataFrame(customer_raw, customer_cols)

ordersDF = spark.createDataFrame(orders_raw, orders_cols)

# Show tables
customersDF.show()
ordersDF.show()

+---+----+------------------+
| id|name|credit_card_number|
+---+----+------------------+
|  1| bob|     3462543658686|
|  2| rob|     9087567565439|
|  3| tim|     5436586999467|
|  4| tom|     8349756853250|
+---+----+------------------+

+---+---------------+--------+-----+
| id|   product_name|customer|price|
+---+---------------+--------+-----+
|  1|        ketchup|     bob|  1.2|
|  2|       rutabaga|     bob| 3.35|
|  3|fake vegan meat|     rob|13.99|
|  4|  cheesey poofs|     tim| 3.99|
|  5|      ice cream|     tim| 4.95|
|  6| protein powder|     tom|49.95|
+---+---------------+--------+-----+



In [None]:
joinedDF = customersDF.join(ordersDF, customersDF.name == ordersDF.customer)
joinedDF.show()

+---+----+------------------+---+---------------+--------+-----+
| id|name|credit_card_number| id|   product_name|customer|price|
+---+----+------------------+---+---------------+--------+-----+
|  1| bob|     3462543658686|  1|        ketchup|     bob|  1.2|
|  1| bob|     3462543658686|  2|       rutabaga|     bob| 3.35|
|  2| rob|     9087567565439|  3|fake vegan meat|     rob|13.99|
|  3| tim|     5436586999467|  4|  cheesey poofs|     tim| 3.99|
|  3| tim|     5436586999467|  5|      ice cream|     tim| 4.95|
|  4| tom|     8349756853250|  6| protein powder|     tom|49.95|
+---+----+------------------+---+---------------+--------+-----+



## Simple Aggregations

Now let's explore simple aggregations. You will be using these often when doing exploratory work in big data. Remember, the intention here is that you grow familiar with the way the API works, and how to translate inquiries into that API.

> _How much did each person spend?_

In [None]:
joined_df.groupby('name').agg({"price": ["sum"]})

Unnamed: 0_level_0,price
Unnamed: 0_level_1,sum
name,Unnamed: 1_level_2
bob,4.55
rob,13.99
tim,8.94
tom,49.95


In [None]:
import pyspark.sql.functions as f

joinedDF.groupby('name').agg(f.sum('price').alias('total')).show()

+----+-----------------+
|name|            total|
+----+-----------------+
| bob|             4.55|
| rob|            13.99|
| tim|8.940000000000001|
| tom|            49.95|
+----+-----------------+



Let's use bigger data
  * NYC crash data

In [None]:
# save to the filesystem to prevent another load
! curl -o rows.csv https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  427M    0  427M    0     0  4044k      0 --:--:--  0:01:48 --:--:-- 2542k


In [None]:
import pandas as pd
nyc_df = pd.read_csv('rows.csv')

  nyc_df = pd.read_csv('rows.csv')


In [None]:
# number or rows

print(len(nyc_df))

# this is quite large so we will work with a sample while we experiment in pandas as least.

2117343


We'll take a random sample at 20% of the original data

In [None]:
nyc_small = nyc_df.sample(frac=0.2, replace=False, random_state=1)

In [None]:
# we are also going to limit the columns to those we are going to work with

nyc_small = nyc_small[['CRASH DATE', 'CONTRIBUTING FACTOR VEHICLE 1',
                       'BOROUGH', 'VEHICLE TYPE CODE 1',
                       'NUMBER OF PERSONS INJURED']]

In [None]:
nyc_small.head(2)

Unnamed: 0,CRASH DATE,CONTRIBUTING FACTOR VEHICLE 1,BOROUGH,VEHICLE TYPE CODE 1,NUMBER OF PERSONS INJURED
1261810,09/18/2015,Unspecified,,PASSENGER VEHICLE,2.0
1615108,01/05/2014,Alcohol Involvement,BROOKLYN,PASSENGER VEHICLE,0.0


Now, let's create the pyspark dataframe. Now we two frames with the same content
  * nyc_small: pandas
  * sdf_small: pyspark

In [None]:
# Load the CSV file directly into PySpark DataFrame
sdf_small = spark.read.csv('rows.csv', header=True, inferSchema=True)

# Print schema to verify
sdf_small.printSchema()

# Show data
sdf_small.show(5)



root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: strin

# Questions

Answer the following questions by porting the pandas code to the Spark API



# Question 1


> On what day do most crashes occcur?

In [None]:
# Pandas
nyc_small.groupby('CRASH DATE')['CRASH DATE'].count().sort_values(ascending=False).head(5)

Unnamed: 0_level_0,CRASH DATE
CRASH DATE,Unnamed: 1_level_1
01/21/2014,235
05/19/2017,219
11/15/2018,218
02/03/2014,208
02/14/2014,192


In [None]:
### Spark?
from pyspark.sql import functions as F

sdf_small.filter(sdf_small['CRASH DATE'].isNotNull()).groupBy('CRASH DATE').count().orderBy(F.desc('count')).show(5)



+----------+-----+
|CRASH DATE|count|
+----------+-----+
|2014-01-21| 1161|
|2018-11-15| 1065|
|2017-12-15|  999|
|2017-05-19|  974|
|2015-01-18|  961|
+----------+-----+
only showing top 5 rows



# Question 2

> _Where do most crashes occur?_

In [None]:
nyc_small.groupby('BOROUGH')['BOROUGH'].count().sort_values(ascending=False).head(5)

Unnamed: 0_level_0,BOROUGH
BOROUGH,Unnamed: 1_level_1
BROOKLYN,464859
QUEENS,391299
MANHATTAN,325510
BRONX,215944
STATEN ISLAND,61178


In [None]:
## Spark?
sdf_small.filter(sdf_small['BOROUGH'].isNotNull()).groupBy('BOROUGH').count().orderBy(F.desc('count')).show(5)


+-------------+------+
|      BOROUGH| count|
+-------------+------+
|     BROOKLYN|464859|
|       QUEENS|391299|
|    MANHATTAN|325510|
|        BRONX|215944|
|STATEN ISLAND| 61178|
+-------------+------+
only showing top 5 rows



 # Question 3

 > What is the most common cause of accident in 'QUEENS'

In [None]:
nyc_small[(nyc_small.BOROUGH == 'QUEENS')]['CONTRIBUTING FACTOR VEHICLE 1'].value_counts()

# you can also use a group by (to avoid the pandas value_counts function)

nyc_small[(nyc_small.BOROUGH == 'QUEENS')].groupby(
    'CONTRIBUTING FACTOR VEHICLE 1'
)['CONTRIBUTING FACTOR VEHICLE 1'].count().sort_values(ascending=False).head(5)

Unnamed: 0_level_0,CONTRIBUTING FACTOR VEHICLE 1
CONTRIBUTING FACTOR VEHICLE 1,Unnamed: 1_level_1
Unspecified,135958
Driver Inattention/Distraction,83779
Failure to Yield Right-of-Way,33637
Backing Unsafely,19137
Following Too Closely,14360


In [None]:
## Spark?
sdf_small.filter(sdf_small['BOROUGH'] == 'QUEENS').groupBy('CONTRIBUTING FACTOR VEHICLE 1').count().orderBy(F.desc('count')).show(5)

+-----------------------------+------+
|CONTRIBUTING FACTOR VEHICLE 1| count|
+-----------------------------+------+
|                  Unspecified|135958|
|         Driver Inattentio...| 83779|
|         Failure to Yield ...| 33637|
|             Backing Unsafely| 19137|
|         Following Too Clo...| 14360|
+-----------------------------+------+
only showing top 5 rows



# Question 4

> _What is the average number or injuries for specific cars driving in specific suburbs_


In [None]:
nyc_small.groupby(['VEHICLE TYPE CODE 1', 'BOROUGH'])['NUMBER OF PERSONS INJURED'].mean().sort_values(ascending=False).head(3)

Unnamed: 0_level_0,Unnamed: 1_level_0,NUMBER OF PERSONS INJURED
VEHICLE TYPE CODE 1,BOROUGH,Unnamed: 2_level_1
Limo,MANHATTAN,13.0
FRONT,BROOKLYN,12.0
rmb,QUEENS,11.0


In [None]:
## Spark?
sdf_small.groupBy('VEHICLE TYPE CODE 1', 'BOROUGH').agg(F.mean('NUMBER OF PERSONS INJURED').alias('Number')).orderBy(F.desc('Number')).show(4)


+-------------------+---------+------+
|VEHICLE TYPE CODE 1|  BOROUGH|Number|
+-------------------+---------+------+
|               Limo|MANHATTAN|  13.0|
|              FRONT| BROOKLYN|  12.0|
|              PASSE|    BRONX|  11.0|
|                rmb|   QUEENS|  11.0|
+-------------------+---------+------+
only showing top 4 rows

