# Motor Vehicle Collision Data ETL Pipeline

Run the cell below to import the needed modules and functions. `numpy`, `pandas`, and `toolz` may need to be installed if you do not already have them.

In [169]:
from functools import partial
import numpy as np
import pandas as pd
import re
from toolz import compose_left as compose

escape = partial(re.compile(r'\'').sub, r'\'\'')

make_postgresql_array_literal = compose(pd.Series.dropna,
                                        partial(map, compose(escape, '\'{}\''.format)),
                                        ', '.join,
                                        '{{{}}}'.format,
                                        lambda result: np.nan
                                                       if result == '{}'
                                                       else result)

Run the cell below to query the NYC OpenData servers for the data. The query is written so that it extracts *only* the required columns.

In [148]:
data_frames = {'c': pd.read_csv("https://data.cityofnewyork.us/resource/h9gi-nx95.csv?$select=collision_id,crash_date,crash_time,latitude,longitude&$where=crash_date%20between%20'2020-01-01'%20and%20'2020-01-31'&$limit=100000", dtype='str'),
               'v': pd.read_csv("https://data.cityofnewyork.us/resource/bm4k-52h4.csv?$select=unique_id,collision_id,state_registration,vehicle_type,vehicle_make,vehicle_model,vehicle_year,travel_direction,vehicle_occupants,driver_sex,driver_license_status,driver_license_jurisdiction,pre_crash,point_of_impact,vehicle_damage_1,vehicle_damage_2,vehicle_damage_3,public_property_damage,public_property_damage_type,contributing_factor_1,contributing_factor_2&$where=crash_date%20between%20'2020-01-01'%20and%20'2020-01-31'&$limit=100000", dtype='str'),
               'p': pd.read_csv("https://data.cityofnewyork.us/resource/f55k-p6yu.csv?$select=unique_id,collision_id,vehicle_id,person_type,person_injury,person_age,ejection,emotional_status,bodily_injury,position_in_vehicle,safety_equipment,ped_location,ped_action,complaint,ped_role,contributing_factor_1,contributing_factor_2,person_sex&$where=crash_date%20between%20'2020-01-01'%20and%20'2020-01-31'&$limit=100000", dtype='str')}

Run the below cell again to get a fresh copy of the data frames without hitting the servers again.

In [170]:
c = data_frames['c'].copy(deep=True)
v = data_frames['v'].copy(deep=True)
p = data_frames['p'].copy(deep=True)

Run the cell below to transform the data frames. Some statements change the column names and some aggregate them horizontally.

In [171]:
c = c.rename(columns=dict(zip('collision_id,crash_date,crash_time,latitude,longitude'.split(','),
                              'id,date,time,latitude,longitude'.split(','))))
c = c.set_index('id')
c.fillna('\\N', inplace=True)

v = v.rename(columns=dict(zip('unique_id,collision_id,state_registration,vehicle_type,vehicle_make,vehicle_model,vehicle_year,travel_direction,vehicle_occupants,driver_sex,driver_license_status,driver_license_jurisdiction,pre_crash,point_of_impact,vehicle_damage_1,vehicle_damage_2,vehicle_damage_3,public_property_damage,public_property_damage_type,contributing_factor_1,contributing_factor_2'.split(','),
                              'id,collision_id,state_registration,type,make,model,year,travel_direction,occupants,driver_sex,driver_license_status,driver_license_jurisdiction,pre_crash,point_of_impact,damage_1,damage_2,damage_3,public_property_damage,public_property_damage_type,contributing_factor_1,contributing_factor_2'.split(','))))
v = v.set_index('id')
v['damage_1'] = v.get(['damage_1', 'damage_2', 'damage_3']) \
                 .agg(make_postgresql_array_literal, axis=1)
v.rename(columns={'damage_1': 'damages'}, inplace=True)
v.drop(columns=['damage_2', 'damage_3'], inplace=True)

v['contributing_factor_1'] = v.get(['contributing_factor_1', 'contributing_factor_2']) \
                              .agg(make_postgresql_array_literal, axis=1)
v.rename(columns={'contributing_factor_1': 'contributing_factors'}, inplace=True)
v.drop(columns=['contributing_factor_2'], inplace=True)
v.fillna('\\N', inplace=True)

p = p.rename(columns=dict(zip('unique_id,collision_id,vehicle_id,person_type,person_injury,person_age,ejection,emotional_status,bodily_injury,position_in_vehicle,safety_equipment,ped_location,ped_action,complaint,ped_role,contributing_factor_1,contributing_factor_2,person_sex'.split(','),
                              'id,collision_id,vehicle_id,type,injury,age,ejection,emotional_status,bodily_injury,position_in_vehicle,safety_equipment,location,action,complaint,role,contributing_factor_1,contributing_factor_2,sex'.split(','))))
p = p.set_index('id')
p['contributing_factor_1'] = p.get(['contributing_factor_1', 'contributing_factor_2']) \
                              .agg(make_postgresql_array_literal, axis=1)
p.rename(columns={'contributing_factor_1': 'contributing_factors'}, inplace=True)
p.drop(columns=['contributing_factor_2'], inplace=True)
p.insert(p.columns.get_loc('vehicle_id') + 1, 'dangling_vehicle_id', np.where(p.vehicle_id.isin(v.index), np.nan, p.vehicle_id))
p['vehicle_id'] = np.where(p.vehicle_id.isin(v.index), p.vehicle_id, np.nan)
p.fillna('\\N', inplace=True)

Run the cells below to copy the results to the clipboard.

In [165]:
c.to_clipboard()

In [173]:
v.to_clipboard()

In [172]:
p.to_clipboard()