In [1]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://heartspringhealth.com/wp-content/uploads/2014/05/cars-c.jpg", width=1000, height=250)

The Motor Vehicle Collisions crash table contains details on the crash event. Each row represents a crash event. The Motor Vehicle Collisions data tables contain information from all police reported motor vehicle collisions in NYC. The police report (MV104-AN) is required to be filled out for collisions where someone is injured or killed, or where there is at least $1000 worth of damage

Below are the steps to perform our ETL processes:
1. Extract data using Python daily from NYC Open Data (https://opendata.cityofnewyork.us/) via Socrata API
2. For our initial load, we will only be using data from June 2022 - now to populate our table
3. For our daily load, we will perform upsert functions against our data to only inserting newer data to the table
4. Data will be stored in the Amazon S3 bucket
5. Read and write metadata to the AWS Glue Data Catalog
6. Setup AWS Athena to perform ad-hoc queries
7. Convert our csv files into parquet format
8. We then proceed to perform necessary transformations then load to our Data Warehouse in AWS SQL Server 

In [2]:
#Install required libraries
!pip install sodapy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sodapy
  Downloading sodapy-2.1.1-py2.py3-none-any.whl (14 kB)
Installing collected packages: sodapy
Successfully installed sodapy-2.1.1


In [3]:
#Import required Libraries 
import pandas as pd 
from sodapy import Socrata 

#**Data Extraction**#

In [4]:
client = Socrata("data.cityofnewyork.us", None)

# Get 10000 results for initial load, then 1000 each day for delta load, returned as JSON from API / converted to Python list of
# dictionaries by sodapy.
results = client.get("h9gi-nx95", order='crash_date DESC', limit=10000)

# Convert to pandas DataFrame
df = pd.DataFrame.from_records(results)



In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 29 columns):
 #   Column                         Non-Null Count  Dtype 
---  ------                         --------------  ----- 
 0   crash_date                     10000 non-null  object
 1   crash_time                     10000 non-null  object
 2   borough                        6736 non-null   object
 3   zip_code                       6733 non-null   object
 4   latitude                       8997 non-null   object
 5   longitude                      8997 non-null   object
 6   location                       8997 non-null   object
 7   on_street_name                 7208 non-null   object
 8   off_street_name                4724 non-null   object
 9   number_of_persons_injured      10000 non-null  object
 10  number_of_persons_killed       10000 non-null  object
 11  number_of_pedestrians_injured  10000 non-null  object
 12  number_of_pedestrians_killed   10000 non-null  object
 13  nu

In [6]:
#See all the columns in the dataframe
pd.set_option('display.max_columns', None)

In [7]:
#Check all the column names 
df.columns

Index(['crash_date', 'crash_time', 'borough', 'zip_code', 'latitude',
       'longitude', 'location', 'on_street_name', 'off_street_name',
       'number_of_persons_injured', 'number_of_persons_killed',
       'number_of_pedestrians_injured', 'number_of_pedestrians_killed',
       'number_of_cyclist_injured', 'number_of_cyclist_killed',
       'number_of_motorist_injured', 'number_of_motorist_killed',
       'contributing_factor_vehicle_1', 'contributing_factor_vehicle_2',
       'collision_id', 'vehicle_type_code1', 'vehicle_type_code2',
       'cross_street_name', 'contributing_factor_vehicle_3',
       'contributing_factor_vehicle_4', 'vehicle_type_code_3',
       'vehicle_type_code_4', 'contributing_factor_vehicle_5',
       'vehicle_type_code_5'],
      dtype='object')

In [8]:
#FUNCTIONS



In [9]:
#Sort result dataframe 
sorted_df = df.sort_values(by=['crash_date', 'crash_time'], ascending=True)

#transform dates
sorted_df['crash_time'] = sorted_df['crash_time'].apply(lambda x: '0' + x if len(x) != 5 else x)
sorted_df['crash_date_part'] = sorted_df['crash_date'].apply(lambda x: x.split('T')[0])
sorted_df['crash_datetime'] = sorted_df['crash_date_part'] + 'T' + sorted_df['crash_time'] + ':00.000'

#Create individual tables based on the denormalized data 
collisions_df = sorted_df[['crash_datetime', 'number_of_persons_injured', 'number_of_persons_killed', 'number_of_pedestrians_injured',              
                      'number_of_pedestrians_killed', 'number_of_cyclist_injured', 'number_of_cyclist_killed', 'number_of_motorist_injured',                             
                      'number_of_motorist_killed', 'contributing_factor_vehicle_1', 'contributing_factor_vehicle_2', 'vehicle_type_code1', 
                      'vehicle_type_code2', 'latitude', 'longitude', 'on_street_name', 'borough', 'zip_code',]].reset_index(drop=True)

contributing_factor_vehicle_df = collisions_df[['contributing_factor_vehicle_1', 'contributing_factor_vehicle_2']].drop_duplicates().reset_index(drop=True)
vehicle_type_code_df = collisions_df[['vehicle_type_code1', 'vehicle_type_code2']].drop_duplicates().reset_index(drop=True)
address_df = collisions_df[['latitude', 'longitude', 'on_street_name','borough', 'zip_code']].drop_duplicates().reset_index(drop=True)
                                                              

In [10]:
collisions_df.head()

Unnamed: 0,crash_datetime,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,number_of_motorist_injured,number_of_motorist_killed,contributing_factor_vehicle_1,contributing_factor_vehicle_2,vehicle_type_code1,vehicle_type_code2,latitude,longitude,on_street_name,borough,zip_code
0,2022-06-11T00:00:00.000,3,0,0,0,0,0,3,0,Unspecified,Unspecified,Sedan,Sedan,40.718273,-74.00533,CHURCH STREET,MANHATTAN,10013.0
1,2022-06-11T00:00:00.000,0,0,0,0,0,0,0,0,Passing or Lane Usage Improper,Unspecified,Sedan,Sedan,40.718075,-73.97511,FDR DRIVE,,
2,2022-06-11T00:00:00.000,0,0,0,0,0,0,0,0,Passing or Lane Usage Improper,Unspecified,Station Wagon/Sport Utility Vehicle,Sedan,40.67172,-73.850975,NORTH CONDUIT AVENUE,QUEENS,11417.0
3,2022-06-11T00:05:00.000,1,0,0,0,0,0,1,0,Fatigued/Drowsy,Unspecified,Station Wagon/Sport Utility Vehicle,Pick-up Truck,40.70882,-73.751854,FRANCIS LEWIS BOULEVARD,QUEENS,11412.0
4,2022-06-11T00:10:00.000,0,0,0,0,0,0,0,0,Oversized Vehicle,,Sedan,,40.697433,-73.93123,MYRTLE AVENUE,,


In [13]:
#Convert dataframes to CSV
collisions_df.to_csv('collisions.csv', index=False)
contributing_factor_vehicle_df.to_csv('contributing_factor_vehicle.csv', index=False)
vehicle_type_code_df.to_csv('vehicle_type_code.csv', index=False)
address_df.to_csv('address.csv', index=False)

In [12]:
# #Create id column for tables
# contributing_factor_vehicle_df['contributing_factor_vehicle_id'] = contributing_factor_vehicle_df.index + 1
# vehicle_type_code_df['vehicle_type_code_id'] = vehicle_type_code_df.index + 1

# crashes_df = sorted_df[['crash_date', 'crash_time', 'number_of_persons_injured', 'number_of_persons_killed', 'number_of_pedestrians_injured',              
#                       'number_of_pedestrians_killed', 'number_of_cyclist_injured', 'number_of_cyclist_killed', 'number_of_motorist_injured',                             
#                       'number_of_motorist_killed']].reset_index(drop=True)

# collisions_df['collisions_id'] = collisions_df.index + 1

# #Move columns around so the id would be in the first position
# collisions_df = collisions_df[['collision_id', 'crash_date', 'crash_time', 'number_of_persons_injured',
#        'number_of_persons_killed', 'number_of_pedestrians_injured',
#        'number_of_pedestrians_killed', 'number_of_cyclist_injured',
#        'number_of_cyclist_killed', 'number_of_motorist_injured',
#        'number_of_motorist_killed']]
# contributing_factor_vehicle_df = contributing_factor_vehicle_df[['contributing_factor_vehicle_id', 'contributing_factor_vehicle_1', 'contributing_factor_vehicle_2']]
# vehicle_type_code_df = vehicle_type_code_df[['vehicle_type_code_id', 'vehicle_type_code1', 'vehicle_type_code2']] 