<a href="https://colab.research.google.com/github/FMaligavhada/us-ie-big-data-technologies/blob/master/q3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
# Exploring Spark with Pandas

# Using pandas examples, convert the analysis to pyspark.
# The purpose of this notebook is to familiarise yourself with the pyspark API.

from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as f
from pyspark.sql import SQLContext

spark = SparkSession.builder.appName('panda-and-spark').getOrCreate()


In [10]:
customer_raw = [(1, 'bob', 3462543658686),
           (2, 'rob', 9087567565439),
           (3, 'tim', 5436586999467),
           (4, 'tom', 8349756853250)]

customer_cols = ['id', 'name', 'credit_card_number']

orders_raw = [(1, 'ketchup', 'bob', 1.20),
           (2, 'rutabaga', 'bob', 3.35),
           (3, 'fake vegan meat', 'rob', 13.99),
           (4, 'cheesey poofs', 'tim', 3.99),
           (5, 'ice cream', 'tim', 4.95),
           (6, 'protein powder', 'tom', 49.95)]

orders_cols = ['id', 'product_name', 'customer', 'price']

customer_df = pd.DataFrame(customer_raw, columns=customer_cols)
orders_df = pd.DataFrame(orders_raw, columns=orders_cols)

joined_df = pd.merge(customer_df, orders_df, how='inner', left_on='name', right_on='customer')
joined_df


Unnamed: 0,id_x,name,credit_card_number,id_y,product_name,customer,price
0,1,bob,3462543658686,1,ketchup,bob,1.2
1,1,bob,3462543658686,2,rutabaga,bob,3.35
2,2,rob,9087567565439,3,fake vegan meat,rob,13.99
3,3,tim,5436586999467,4,cheesey poofs,tim,3.99
4,3,tim,5436586999467,5,ice cream,tim,4.95
5,4,tom,8349756853250,6,protein powder,tom,49.95


In [11]:
customersDF = spark.createDataFrame(customer_raw, customer_cols)
ordersDF = spark.createDataFrame(orders_raw, orders_cols)

customersDF.show()
ordersDF.show()

joinedDF = customersDF.join(ordersDF, customersDF.name == ordersDF.customer)
joinedDF.show()


+---+----+------------------+
| id|name|credit_card_number|
+---+----+------------------+
|  1| bob|     3462543658686|
|  2| rob|     9087567565439|
|  3| tim|     5436586999467|
|  4| tom|     8349756853250|
+---+----+------------------+

+---+---------------+--------+-----+
| id|   product_name|customer|price|
+---+---------------+--------+-----+
|  1|        ketchup|     bob|  1.2|
|  2|       rutabaga|     bob| 3.35|
|  3|fake vegan meat|     rob|13.99|
|  4|  cheesey poofs|     tim| 3.99|
|  5|      ice cream|     tim| 4.95|
|  6| protein powder|     tom|49.95|
+---+---------------+--------+-----+

+---+----+------------------+---+---------------+--------+-----+
| id|name|credit_card_number| id|   product_name|customer|price|
+---+----+------------------+---+---------------+--------+-----+
|  1| bob|     3462543658686|  1|        ketchup|     bob|  1.2|
|  1| bob|     3462543658686|  2|       rutabaga|     bob| 3.35|
|  2| rob|     9087567565439|  3|fake vegan meat|     rob|13.99

In [12]:
# Pandas
joined_df.groupby('name').agg({"price": ["sum"]})


Unnamed: 0_level_0,price
Unnamed: 0_level_1,sum
name,Unnamed: 1_level_2
bob,4.55
rob,13.99
tim,8.94
tom,49.95


In [13]:
# Spark
joinedDF.groupby('name').agg(f.sum('price').alias('total')).show()


+----+-----------------+
|name|            total|
+----+-----------------+
| bob|             4.55|
| rob|            13.99|
| tim|8.940000000000001|
| tom|            49.95|
+----+-----------------+



In [14]:
# Download dataset into Colab
!curl -o rows.csv https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD

nyc_df = pd.read_csv('rows.csv')
print(len(nyc_df))  # number of rows


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  443M    0  443M    0     0  3870k      0 --:--:--  0:01:57 --:--:-- 2478k


  nyc_df = pd.read_csv('rows.csv')


2205838


In [16]:
# --------------------------
# Load dataset with Pandas (sample 20%)
# --------------------------
nyc_df = pd.read_csv("rows.csv")

# Sample 20% for quick testing
nyc_small = nyc_df.sample(frac=0.2, random_state=1)[[
    "CRASH DATE",
    "CONTRIBUTING FACTOR VEHICLE 1",
    "BOROUGH",
    "VEHICLE TYPE CODE 1",
    "NUMBER OF PERSONS INJURED"
]]

nyc_small.head(5)


  nyc_df = pd.read_csv("rows.csv")


Unnamed: 0,CRASH DATE,CONTRIBUTING FACTOR VEHICLE 1,BOROUGH,VEHICLE TYPE CODE 1,NUMBER OF PERSONS INJURED
979713,10/01/2017,Driver Inattention/Distraction,BROOKLYN,Sedan,0.0
592366,05/29/2019,Unsafe Lane Changing,,Sedan,0.0
739435,10/05/2018,Reaction to Uninvolved Vehicle,MANHATTAN,Sedan,0.0
801798,06/13/2018,Reaction to Uninvolved Vehicle,QUEENS,Sedan,0.0
1050524,06/01/2017,Driver Inattention/Distraction,BROOKLYN,Sedan,0.0


In [None]:
#Q1. On what day do most crashes occur?

In [17]:
sdf_small = SQLContext(spark).createDataFrame(nyc_small.fillna('').astype('str'))
sdf_small.printSchema()
sdf_small.show(5)




root
 |-- CRASH DATE: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)

+----------+-----------------------------+---------+-------------------+-------------------------+
|CRASH DATE|CONTRIBUTING FACTOR VEHICLE 1|  BOROUGH|VEHICLE TYPE CODE 1|NUMBER OF PERSONS INJURED|
+----------+-----------------------------+---------+-------------------+-------------------------+
|10/01/2017|         Driver Inattentio...| BROOKLYN|              Sedan|                      0.0|
|05/29/2019|         Unsafe Lane Changing|         |              Sedan|                      0.0|
|10/05/2018|         Reaction to Uninv...|MANHATTAN|              Sedan|                      0.0|
|06/13/2018|         Reaction to Uninv...|   QUEENS|              Sedan|                      0.0|
|06/01/2017|         Driver Inattentio...| BROOKLYN

In [19]:
# Pandas
nyc_small.groupby("CRASH DATE")["CRASH DATE"].count().sort_values(ascending=False).head(5)


Unnamed: 0_level_0,CRASH DATE
CRASH DATE,Unnamed: 1_level_1
11/15/2018,240
12/15/2017,226
01/21/2014,225
09/30/2016,187
03/02/2018,186


In [None]:
#Q2. Where do most crashes occur?

In [20]:
# Pandas
nyc_small.groupby('BOROUGH')['BOROUGH'].count().sort_values(ascending=False).head(5)


Unnamed: 0_level_0,BOROUGH
BOROUGH,Unnamed: 1_level_1
BROOKLYN,97615
QUEENS,81855
MANHATTAN,67995
BRONX,45287
STATEN ISLAND,12820


In [21]:
# Spark
sdf_small.groupBy('BOROUGH').count().orderBy(f.desc('count')).show(5)


+---------+------+
|  BOROUGH| count|
+---------+------+
|         |135596|
| BROOKLYN| 97615|
|   QUEENS| 81855|
|MANHATTAN| 67995|
|    BRONX| 45287|
+---------+------+
only showing top 5 rows



In [None]:
#Q3. What is the most common cause of accident in QUEENS?

In [23]:
# Pandas
nyc_small.groupby("BOROUGH")["BOROUGH"].count().sort_values(ascending=False).head(5)


Unnamed: 0_level_0,BOROUGH
BOROUGH,Unnamed: 1_level_1
BROOKLYN,97615
QUEENS,81855
MANHATTAN,67995
BRONX,45287
STATEN ISLAND,12820


In [25]:
# Spark
sdf_small.groupBy("BOROUGH").count().orderBy(F.desc("count")).show(5)


+---------+------+
|  BOROUGH| count|
+---------+------+
|         |135596|
| BROOKLYN| 97615|
|   QUEENS| 81855|
|MANHATTAN| 67995|
|    BRONX| 45287|
+---------+------+
only showing top 5 rows



In [None]:
#Q4. Average number of injuries for specific cars in specific suburbs

In [29]:
# Pandas
nyc_small.groupby(["VEHICLE TYPE CODE 1", "BOROUGH"])["NUMBER OF PERSONS INJURED"].mean().sort_values(ascending=False).head(3)


Unnamed: 0_level_0,Unnamed: 1_level_0,NUMBER OF PERSONS INJURED
VEHICLE TYPE CODE 1,BOROUGH,Unnamed: 2_level_1
FRONT,BROOKLYN,12.0
TOWER,BRONX,5.0
Amb,BROOKLYN,4.0


In [30]:
# Spark
sdf_small.groupBy('VEHICLE TYPE CODE 1', 'BOROUGH') \
         .agg(f.avg('NUMBER OF PERSONS INJURED').alias('avg_injuries')) \
         .orderBy(f.desc('avg_injuries')).show(10)


+-------------------+---------+------------+
|VEHICLE TYPE CODE 1|  BOROUGH|avg_injuries|
+-------------------+---------+------------+
|              FRONT| BROOKLYN|        12.0|
|              TOWER|    BRONX|         5.0|
|                Amb| BROOKLYN|         4.0|
|                TRL|   QUEENS|         3.0|
|              POWER|         |         3.0|
|         FDNY LADDE|    BRONX|         3.0|
|              HEAVY|         |         3.0|
|                 rv|MANHATTAN|         3.0|
|              PASS-|         |         3.0|
|         Commercial|         |         3.0|
+-------------------+---------+------------+
only showing top 10 rows

