<a href="https://colab.research.google.com/github/SlyFox579/day_3-tutorial/blob/main/simple_pandas_pyspark_agg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Exploring Spark with Pandas


Using pandas examples, convert the analysis to pyspark. This is useful if you discover your data grows too large for your tooling.

The purpose of this notebook is to familiarise yourself you the pyspark API. You are welcome to use the R version of this if you wish. As long as you are able to obtain the correct results. We will be using python in this notebook as it is quite widely used through data science and the community is very large.



#### Firstly, let's get our spark session

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pandas as pd 
spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()

In [None]:
# save to the filesystem to prevent another load
! curl -o rows.csv https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv

In [None]:
import pandas as pd
nyc_df = pd.read_csv('rows.csv')

In [None]:
# number or rows

print(len(nyc_df))

# this is quite large so we will work with a sample while we experiment in pandas as least.

We'll take a random sample at 20% of the original data

In [None]:
nyc_small = nyc_df.sample(frac=0.2, replace=False, random_state=1)

In [None]:
# we are also going to limit the columns to those we are going to work with

nyc_small = nyc_small[['CRASH DATE', 'CONTRIBUTING FACTOR VEHICLE 1', 
                       'BOROUGH', 'VEHICLE TYPE CODE 1', 
                       'NUMBER OF PERSONS INJURED']]

In [None]:
nyc_small.head(2)

Now, let's create the pyspark dataframe. Now we two frames with the same content
  * nyc_small: pandas
  * sdf_small: pyspark

In [None]:
from pyspark.sql import SQLContext


# there are nan's in the frame with strings, and spark can't 'infer the schema', so we have to help it out 
# by replacing them with empty strings and forcing the column to be a string

sdf_small = SQLContext(spark).createDataFrame(nyc_small.fillna('').astype('str'))


# Lets check the schema quickly

print(sdf_small.schema)

# Questions

Answer the following questions by porting the pandas code to the Spark API



# Question 1


> On what day do most crashes occcur?

In [None]:
# Pandas
nyc_small.groupby('CRASH DATE')['CRASH DATE'].count().sort_values(ascending=False).head(5)

In [None]:
### Spark?

sdf_small.groupBy('CRASH DATE').count().orderBy('count', ascending=False).show(5)

# Question 2

> _Where do most crashes occur?_

In [None]:
nyc_small.groupby('BOROUGH')['BOROUGH'].count().sort_values(ascending=False).head(5)

In [None]:
## Spark?

sdf_small.groupBy('BOROUGH').count().orderBy('count', ascending=False).show(5)

 # Question 3
 
 > What is the most common cause of accident in 'QUEENS'

In [None]:
nyc_small[(nyc_small.BOROUGH == 'QUEENS')]['CONTRIBUTING FACTOR VEHICLE 1'].value_counts()

# you can also use a group by (to avoid the pandas value_counts function)

nyc_small[(nyc_small.BOROUGH == 'QUEENS')].groupby(
    'CONTRIBUTING FACTOR VEHICLE 1'
)['CONTRIBUTING FACTOR VEHICLE 1'].count().sort_values(ascending=False).head(5)

In [None]:
## Spark?

df = sdf_small.filter(sdf_small.BOROUGH  == "QUEENS").groupBy('CONTRIBUTING FACTOR VEHICLE 1')\
.agg(f.count(f.lit(1)).alias('total_freq'))\
.orderBy('total_freq', ascending=False)

df.show(10)


# Question 4

> _What is the average number or injuries for specific cars driving in specific suburbs_


In [None]:
nyc_small.groupby(['VEHICLE TYPE CODE 1', 'BOROUGH'])['NUMBER OF PERSONS INJURED'].mean().sort_values(ascending=False).head(3)

In [None]:
## Spark?

df = sdf_small.groupBy('VEHICLE TYPE CODE 1', 'BOROUGH')\
.agg(f.avg('NUMBER OF PERSONS INJURED').alias('avg_injuries'))\
.orderBy('avg_injuries', ascending=False)

df.show(3)