In [1]:
import pandas as pd
import os 
from datetime import date, datetime, timedelta
import json
import yaml
from arcgis.features import FeatureLayer, GeoAccessor, GeoSeriesAccessor, Table

In [17]:
with open ("./airflow/data/sources.yml", "r") as yaml_file:
  data = yaml.safe_load(yaml_file)

# crashes_fl = FeatureLayer(data['crashes']['mapserver'])
# crash_details_fl = FeatureLayer(data['crash_details']['mapserver'])

crash_details = pd.read_csv("crash_details.csv")
crashes = pd.read_csv("crashes.csv")

# for f in crashes_fl.properties.fields:
#   print(f)
# for f in crash_details_fl.properties.fields:
#   print(f)

  exec(code_obj, self.user_global_ns, self.user_ns)
  exec(code_obj, self.user_global_ns, self.user_ns)


The most frequent update interval for our datasets is daily. Most are updated weekly or irregularly, but we will schedule our DAG to check for daily updates. 

In [3]:
# The query syntax in the REST API for ARCGIS is inflexible -- we will work around the function we know works. 
# Simply subtracting a day misses entries which occurred yesterday but earlier in the day than the current time.
# Subtracting 2 days includes these but also some dates from the day before.
# We start with 2 and then pare down to just yesterday.
result = crashes_fl.query(where="REPORTDATE >= CURRENT_TIMESTAMP - 8") 

In [4]:
import datetime as dt 

def ms(t):
  if isinstance(t, dt.timedelta):
    return t.total_seconds() * 1000
  elif isinstance(t, dt.datetime):
    return (t - dt.datetime(1970,1,1)).total_seconds() * 1000

def midnight(date:dt.date):
  return dt.datetime.combine(date, dt.datetime.min.time()) 
# Get midnight today and midnight yesterday in miliseconds from 1970-01-01
t1 = ms(midnight(date.today()-timedelta(days=7)))
t2 = ms(midnight(date.today()))

# Check against REPORTDATE in x.features
incidents = [{'geometry':f.geometry, 'attributes':f.attributes} 
      for f in result.features if t1 < f.attributes['REPORTDATE'] < t2] 

# Push to XCOM
# ----------------------------------------
# Read array from XCOM 

# Convert to Dataframe
df = pd.json_normalize(incidents)
df.columns = [col.split('.')[1] for col in df] # remove column prefixes 
df['REPORTDATE'] = df['REPORTDATE'].map(lambda v: datetime(1970,1,1) + timedelta(seconds=v/1000))
# Push to BIGQUERY
df.sort_values('REPORTDATE')

Unnamed: 0,x,y,spatialReference,spatialReference.1,OBJECTID,CRIMEID,CCN,REPORTDATE,ROUTEID,MEASURE,...,LASTUPDATEDATE,MPDLATITUDE,MPDLONGITUDE,MPDGEOX,MPDGEOY,FATALPASSENGER,MAJORINJURIESPASSENGER,MINORINJURIESPASSENGER,UNKNOWNINJURIESPASSENGER,MAR_ID
0,-8.575090e+06,4.709375e+06,102100,3857,163781896,41495789684,23019695,2023-02-06 00:19:00,11077672,1825.20,...,,38.914265,-77.031344,,,0.0,0.0,0.0,0.0,312327.0
1,-8.578991e+06,4.712509e+06,102100,3857,163781897,41495888427,23019714,2023-02-06 00:36:00,11003402,2566.34,...,,38.935994,-77.066027,,,0.0,0.0,0.0,0.0,221063.0
2,-8.573784e+06,4.709237e+06,102100,3857,163781898,41495927066,23019691,2023-02-06 00:46:00,11075462,1003.48,...,,38.913341,-77.019711,,,0.0,0.0,0.0,0.0,242865.0
3,-8.570630e+06,4.707387e+06,102100,3857,163781899,41496189099,23019733,2023-02-06 01:54:00,12042442,1543.16,...,,38.900410,-76.991274,,,0.0,0.0,0.0,0.0,74429.0
4,-8.564015e+06,4.707570e+06,102100,3857,163781900,41496407713,23019676,2023-02-06 02:54:00,12049722,1320.47,...,,38.901696,-76.931853,,,0.0,0.0,0.0,0.0,16640.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
162,-8.569546e+06,4.709229e+06,102100,3857,163782058,41682722145,23021753,2023-02-09 17:30:00,12001502,1185.00,...,,38.913217,-76.981150,,,0.0,0.0,0.0,0.0,301201.0
163,-8.568031e+06,4.701525e+06,102100,3857,163782059,41683803403,23021627,2023-02-09 17:48:00,13009362,3648.34,...,,38.859080,-76.967712,,,0.0,0.0,0.0,0.0,278238.0
164,-8.575920e+06,4.712086e+06,102100,3857,163782060,41687531171,23021327,2023-02-09 19:46:00,11001702,4375.09,...,,38.933040,-77.039100,,,0.0,0.0,0.0,0.0,233668.0
165,-8.575244e+06,4.711798e+06,102100,3857,163782061,41687765223,23021820,2023-02-09 19:59:00,11001402,4533.94,...,,38.931029,-77.032972,,,0.0,0.0,0.0,0.0,300786.0


There's no time-related field in the `crash_details` dataset by which to filter our query. However, the `OBJECTID` field is an incrementing id column we can use to check against incoming query results.

EDIT: It doesn't seem like we can query against these row ids!

In [5]:
print(min(crashes.OBJECTID), max(crashes.OBJECTID))
print(min(crash_details.OBJECTID), max(crash_details.OBJECTID)) # as you can see, OBJECTID is a unique code for each row, and doesn't connect across tables -- that's `CCN`

161243183 161525101
429021378 429762166


In [6]:
df = crashes[['CCN', 'REPORTDATE']].set_index('CCN')
crash_details.merge(df, how='left', left_on='CCN', right_index=True).sort_values('OBJECTID', ascending=False)

Unnamed: 0,OBJECTID,CRIMEID,CCN,PERSONID,PERSONTYPE,AGE,FATAL,MAJORINJURY,MINORINJURY,VEHICLEID,INVEHICLETYPE,TICKETISSUED,LICENSEPLATESTATE,IMPAIRED,SPEEDING,REPORTDATE
740788,429762166,41210773731,23013970,41210882166,Pedestrian,29.0,N,N,N,,0,0,0,N,N,2023/01/26 23:29:00+00
740787,429762165,41210773731,23013970,41210820253,Driver,73.0,N,N,N,41210820205,0,0,0,N,N,2023/01/26 23:29:00+00
740786,429762164,41210375948,23013936,41210698306,Driver,0.0,N,N,N,41210698267,0,0,0,N,N,2023/01/26 22:22:00+00
740785,429762163,41210299023,23013802,41210571660,Driver,0.0,N,N,N,41210372095,0,0,0,N,N,2023/01/26 22:09:00+00
740784,429762162,41210449406,23013927,41210569995,Passenger,36.0,N,N,Y,41210493202,0,0,0,N,N,2023/01/26 22:34:00+00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4,429021382,25179840,14173859,84559773,Driver,23.0,N,N,N,1005530,Passenger Car/automobile,N,,N,N,
3,429021381,25179840,14173859,84782320,Driver,,N,N,N,1005531,Passenger Car/automobile,N,,N,N,
2,429021380,24655519,13091942,84848622,Passenger,,N,N,N,967938,Firearms,N,MD,N,N,
1,429021379,24655519,13091942,84616638,Driver,25.0,N,N,N,967938,Firearms,N,MD,N,N,


In [7]:
crash_details.sort_values('CRIMEID', ascending=False)

Unnamed: 0,OBJECTID,CRIMEID,CCN,PERSONID,PERSONTYPE,AGE,FATAL,MAJORINJURY,MINORINJURY,VEHICLEID,INVEHICLETYPE,TICKETISSUED,LICENSEPLATESTATE,IMPAIRED,SPEEDING
740788,429762166,41210773731,23013970,41210882166,Pedestrian,29.0,N,N,N,,0,0,0,N,N
740787,429762165,41210773731,23013970,41210820253,Driver,73.0,N,N,N,41210820205,0,0,0,N,N
740784,429762162,41210449406,23013927,41210569995,Passenger,36.0,N,N,Y,41210493202,0,0,0,N,N
740782,429762160,41210449406,23013927,41210493225,Driver,30.0,N,N,Y,41210493202,0,0,0,N,N
740783,429762161,41210449406,23013927,41210515205,Driver,22.0,N,N,N,41210515141,0,0,0,N,N
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
450865,429472243,23411642,10116936,84698334,Driver,23.0,N,N,Y,868347,Passenger Car/automobile,N,MD,N,N
629806,429651184,23411387,10116771,84861086,Passenger,0.0,N,N,N,868597,Passenger Car/automobile,N,DC,N,N
370962,429392340,23411387,10116771,84908764,Driver,41.0,N,N,N,868596,Passenger Car/automobile,N,MD,N,N
364938,429386316,23411387,10116771,84997163,Driver,28.0,N,N,N,868597,Passenger Car/automobile,N,DC,N,Y


In [8]:
crash_details['CCN'][crash_details['CCN'].apply(isinstance, args=(str,))].iloc[0]

'21038461'

In [9]:
def tryCast(ccn:str) -> int: 
  try: 
    return int(ccn)
  except: 
    return None


In [27]:
df = crashes.copy()
df['CCN'] = df['CCN'].map(tryCast)

# df.sort_values('CCN', ascending=True)

missing_crimes = list(df[df['CCN'].isnull()]['CRIMEID'])
crashes[crashes['CRIMEID'].isin(missing_crimes)]

Unnamed: 0,X,Y,OBJECTID,CRIMEID,CCN,REPORTDATE,ROUTEID,MEASURE,OFFSET,STREETSEGID,...,LASTUPDATEDATE,MPDLATITUDE,MPDLONGITUDE,MPDGEOX,MPDGEOY,FATALPASSENGER,MAJORINJURIESPASSENGER,MINORINJURIESPASSENGER,UNKNOWNINJURIESPASSENGER,MAR_ID
65456,-77.032732,38.938297,161308911,26595355,082-567,2008/06/16 05:00:00+00,11001402,5383.18,0.04,3364.0,...,,,,,,0,0,0,0,803759
107261,-77.024396,38.898324,161350716,26652300,_1407439,2014/05/25 05:00:00+00,11038612,1031.56,37.78,-9.0,...,,,,,,0,0,0,0,310133
107759,-76.988307,38.899558,161351214,26653007,_1316876,2013/11/23 05:00:00+00,12001302,1057.04,0.0,5753.0,...,,,,,,0,0,0,0,905515
110828,-77.008245,38.904698,161354283,26703633,EEH16503,2015/09/22 23:03:55+00,12070262,0.0,0.08,12858.0,...,,,,399284.91,137468.44,0,0,0,0,149791


In [31]:
df['CCN'].dropna(inplace=True)
df['CCN'].drop_duplicates(inplace=True)

In [34]:
df.sort_values('CCN', ascending=False)

Unnamed: 0,X,Y,OBJECTID,CRIMEID,CCN,REPORTDATE,ROUTEID,MEASURE,OFFSET,STREETSEGID,...,LASTUPDATEDATE,MPDLATITUDE,MPDLONGITUDE,MPDGEOX,MPDGEOY,FATALPASSENGER,MAJORINJURIESPASSENGER,MINORINJURIESPASSENGER,UNKNOWNINJURIESPASSENGER,MAR_ID
58840,-77.073190,38.928272,161302295,25368761,99999999.0,2012/04/16 05:00:00+00,11094052,3061.98,44.27,-9.0,...,2020/12/02 15:38:51+00,38.928271,-77.073698,,,0,0,0,0,262610
9031,-77.074039,38.928277,161252486,24369855,99999991.0,2009/07/24 13:15:00+00,Route not found,0.00,0.00,-9.0,...,,,,,,0,0,0,0,262610
77657,-76.967934,38.859211,161321112,26612863,99112222.0,2012/01/24 05:00:00+00,13009362,3723.44,23.43,-9.0,...,,38.859080,-76.967712,,,0,0,0,0,278238
83652,-76.931002,38.876036,161327107,26620806,99038708.0,2012/03/20 05:00:00+00,13081512,8108.23,0.04,10913.0,...,,,,,,0,0,0,0,905094
83630,-76.958647,38.890089,161327085,26620779,99036047.0,2012/03/15 05:00:00+00,12031912,226.98,0.03,4592.0,...,,,,,,0,0,0,0,900520
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65672,-76.990469,38.937668,161309127,26595657,808.0,2008/06/24 05:00:00+00,12001202,3118.85,0.03,7568.0,...,,,,,,0,0,0,0,156250
65456,-77.032732,38.938297,161308911,26595355,,2008/06/16 05:00:00+00,11001402,5383.18,0.04,3364.0,...,,,,,,0,0,0,0,803759
107261,-77.024396,38.898324,161350716,26652300,,2014/05/25 05:00:00+00,11038612,1031.56,37.78,-9.0,...,,,,,,0,0,0,0,310133
107759,-76.988307,38.899558,161351214,26653007,,2013/11/23 05:00:00+00,12001302,1057.04,0.00,5753.0,...,,,,,,0,0,0,0,905515


In [81]:
x = crashes[['CRIMEID', 'CCN']].copy()
y = crash_details[['CRIMEID', 'CCN']].copy()



x['CCN'] = x['CCN'].map(tryCast)
y['CCN'] = y['CCN'].map(tryCast)

In [103]:
outer = x.merge(y, how='outer', indicator=True)
outer

Unnamed: 0,CRIMEID,CCN,_merge
0,23648084,11055200.0,both
1,23648084,11055200.0,both
2,23648084,11055200.0,both
3,23648084,11055200.0,both
4,23648264,11055556.0,both
...,...,...,...
743033,37341376845,22125449.0,right_only
743034,37341376845,22125449.0,right_only
743035,37341376845,22125449.0,right_only
743036,37132024111,22144291.0,right_only


In [109]:
c_only = outer[outer['_merge'] == 'left_only'].drop('_merge', axis=1)
cd_only = outer[outer['_merge'] == 'right_only'].drop('_merge', axis=1)