Skip to content

Commit

Permalink
fetch streets names and such from intersection db
Browse files Browse the repository at this point in the history
  • Loading branch information
johnclary committed Sep 16, 2016
1 parent 1bee106 commit 7786553
Showing 1 changed file with 142 additions and 44 deletions.
186 changes: 142 additions & 44 deletions signal-status/signal_status_data_pusher.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
# checking for stale but not doing anything about it.
# status duration
# dont upload if no data
# enable request verification
# logging and stuff
# fieldnames! e.g. atd_intersection_id
# dodgy error handling in change detection
# use ATD intersection ID as row identifier
# append new intersections to historical dataset?

import pymssql
import pyodbc
import arrow
import requests
import json
import email_alert
from secrets import KITS_CREDENTIALS
from secrets import SOCRATA_CREDENTIALS
from secrets import ALERTS_DISTRIBUTION
from secrets import IDB_PROD_CREDENTIALS



SOCRATA_SIGNAL_STATUS = 'https://data.austintexas.gov/resource/5zpr-dehc.json'
SOCRATA_SIGNAL_STATUS_HISTORICAL = 'https://data.austintexas.gov/resource/kn2s-yypv.json'
SOCRATA_SIGNAL_STATUS_HISTORICAL = 'https://data.austintexas.gov/resource/x62n-vjpq.json'
SOCRATA_SIGNAL_STATUS_LOGS = 'https://data.austintexas.gov/resource/n5kp-f8k4.json'

IGNORE_INTESECTIONS =['959']

EMAIL_FOOTER = '''
\n
This is an automated message generated by Austin Transportation's Arterial Management Division. To unsubscribe, contact john.clary@austintexas.gov.
Expand All @@ -30,8 +31,11 @@
then = arrow.now()
logfile_filename = 'logs/signals-on-flash/{}.csv'.format(then.format('YYYY-MM-DD'))



def fetch_kits_data():
print('fetch kits data')

conn = pymssql.connect(
server=KITS_CREDENTIALS['server'],
user=KITS_CREDENTIALS['user'],
Expand All @@ -42,16 +46,15 @@ def fetch_kits_data():
cursor = conn.cursor(as_dict=True)

search_string = '''
SELECT i.INTID as intid
, e.INTNAME as intname
, e.DATETIME as intstatusdatetime
, e.STATUS as intstatus
, i.POLLST as pollstatus
, e.OPERATION as operationstate
, e.PLANID as planid
, i.STREETN1 as streetn1
, i.STREETN2 as streetn2
, i.ASSETNUM as assetnum
SELECT i.INTID as database_id
, e.DATETIME as status_datetime
, e.STATUS as intersection_status
, i.POLLST as poll_status
, e.OPERATION as operation_state
, e.PLANID as plan_id
, i.STREETN1 as primary_street
, i.STREETN2 as secondary_street
, i.ASSETNUM as atd_intersection_id
, i.LATITUDE as latitude
, i.LONGITUDE as longitude
FROM [KITS].[INTERSECTION] i
Expand All @@ -78,7 +81,7 @@ def fetch_published_data():



def reformat_sql_data(dataset):
def reformat_kits_data(dataset):
print('reformat data')

reformatted_data = []
Expand All @@ -98,8 +101,8 @@ def reformat_sql_data(dataset):


def group_data(dataset, key):

print('group data')

grouped_data = {}

for row in dataset:
Expand All @@ -116,8 +119,8 @@ def check_for_stale_data(dataset):
status_times = []

for record in dataset:
if record['intstatusdatetime']:
compare = arrow.get(record['intstatusdatetime'])
if record['status_datetime']:
compare = arrow.get(record['status_datetime'])
status_times.append(compare)

oldest_record = arrow.get(max(status_times)).replace(tzinfo='US/Central') # have to swap TZ info here because the database query is incorrectly storing datetimes as UTC
Expand All @@ -137,6 +140,7 @@ def check_for_stale_data(dataset):
email_alert.send_email(ALERTS_DISTRIBUTION, subject, body)



def detect_changes(new, old):
print('detect changes')

Expand All @@ -149,27 +153,24 @@ def detect_changes(new, old):
upsert_historical = []

for record in new: # compare KITS to socrata data
lookup = str(new[record]['intid'])
lookup = str(new[record]['database_id'])

if lookup in IGNORE_INTESECTIONS:
continue

if lookup in old:
new_status = str(new[record]['intstatus'])
new_status = str(new[record]['intersection_status'])

try:
old_status = str(old[lookup]['intstatus'])
old_status = str(old[lookup]['intersection_status'])

except:
not_processed.append(new[record]['intid'])
not_processed.append(new[record]['database_id'])
continue

if new_status == old_status:
no_update += 1

else:
update += 1
new[record]['intstatusprevious'] = old_status
new[record]['intersection_status_previous'] = old_status
upsert.append(new[record])
upsert_historical.append(old[lookup])

Expand All @@ -178,13 +179,13 @@ def detect_changes(new, old):
upsert.append(new[record])

for record in old: # compare socrata to KITS to idenify deleted records
lookup = old[record]['intid']
lookup = old[record]['database_id']

if lookup not in new:
delete += 1

upsert.append({
'intid': lookup,
'database_id': lookup,
':deleted': True
})

Expand All @@ -198,15 +199,104 @@ def detect_changes(new, old):
'upsert_historical': upsert_historical
}

def prepare_socrata_payload(upsert_data):


def connect_int_db(credentials):
print('connecting to intersection database')

conn = pyodbc.connect(
'DRIVER={{SQL Server}};'
'SERVER={};'
'PORT=1433;'
'DATABASE={};'
'UID={};'
'PWD={}'
.format(
credentials['server'],
credentials['database'],
credentials['user'],
credentials['password']
))

cursor = conn.cursor()

return conn



def prep_int_db_query(upsert_data):
ids = []

print('prep intersection database query')

for row in upsert_data:
ids.append(row['atd_intersection_id'])

where = str(ids).translate(None, "[]")

query = '''
SELECT * FROM GIS_QUERY
WHERE GIS_QUERY.ATD_INTERSECTION_ID IN ({})
'''.format(where)

return query



def get_int_db_data_as_dict(connection, query, key):
print('get intersection database data')

results = []

grouped_data = {}

cursor = connection.cursor()

cursor.execute(query)

columns = [column[0] for column in cursor.description]

for row in cursor.fetchall():
results.append(dict(zip(columns, row)))

for row in results:
new_key = str(int(row[key]))
grouped_data[new_key] = row

return grouped_data



def prepare_socrata_payload(upsert_data, int_db_data):
print('prepare socrata payload')

not_found = []

now = arrow.now()


for row in upsert_data:
row['processeddatetime'] = now.format('YYYY-MM-DD HH:mm:ss')
row['recordid'] = '{}_{}'.format(row['intid'], str(now.timestamp))
atd_intersection_id = row['atd_intersection_id']

return upsert_data
if atd_intersection_id in int_db_data:
row['processed_datetime'] = now.format('YYYY-MM-DD HH:mm:ss')
row['record_id'] = '{}_{}'.format(row['database_id'], str(now.timestamp))
row['primary_street'] = int_db_data[atd_intersection_id]['STREET_SEGMENTS.FULL_STREET_NAME']

if int_db_data[atd_intersection_id]['STREET_SEGMENTS_1.FULL_STREET_NAME']:
row['cross_street'] = int_db_data[atd_intersection_id]['STREET_SEGMENTS_1.FULL_STREET_NAME']
row['intersection_name'] = row['primary_street'] + " / " + row['cross_street']
else:
row['intersection_name'] = row['primary_street'] + " (NO CROSS ST)"

row['latitude'] = int_db_data[atd_intersection_id]['LATITUDE']
row['latitude'] = int_db_data[atd_intersection_id]['LONGITUDE']

else:
not_found.append(atd_intersection_id)
upsert_data.remove(row)

return (upsert_data, not_found)



Expand Down Expand Up @@ -280,22 +370,28 @@ def main(date_time):

try:
new_data = fetch_kits_data()
new_data_reformatted = reformat_sql_data(new_data)

new_data_reformatted = reformat_kits_data(new_data)

check_for_stale_data(new_data)

new_data_grouped = group_data(new_data_reformatted, 'intid')
new_data_grouped = group_data(new_data_reformatted, 'database_id')

old_data = fetch_published_data()

old_data_grouped = group_data(old_data, 'intid')
old_data_grouped = group_data(old_data, 'database_id')

change_detection_results = detect_changes(new_data_grouped, old_data_grouped)

socrata_payload = prepare_socrata_payload(change_detection_results['upsert'])

socrata_response = upsert_open_data(socrata_payload, SOCRATA_SIGNAL_STATUS)
conn = connect_int_db(IDB_PROD_CREDENTIALS)

int_db_query = prep_int_db_query(change_detection_results['upsert'])

int_db_data = get_int_db_data_as_dict(conn, int_db_query, 'ATD_INTERSECTION_ID')

socrata_payload = prepare_socrata_payload(change_detection_results['upsert'], int_db_data)

socrata_response = upsert_open_data(socrata_payload[0], SOCRATA_SIGNAL_STATUS)

socrata_response_historical = upsert_open_data(change_detection_results['upsert_historical'], SOCRATA_SIGNAL_STATUS_HISTORICAL)

Expand All @@ -306,17 +402,19 @@ def main(date_time):
return {
'res': socrata_response,
'res_historical': socrata_response_historical,
'payload': socrata_payload,
'logfile': logfile_data
'payload': socrata_payload[0],
'logfile': logfile_data,
'not_found': socrata_payload[1]
}

except Exception as e:
print('Failed to process data for {}'.format(date_time))
print(e)
email_alert.send_email(ALERTS_DISTRIBUTION, 'DATA PROCESSING ALERT: Signal Status Update Failure', e + EMAIL_FOOTER)
email_alert.send_email(ALERTS_DISTRIBUTION, 'DATA PROCESSING ALERT: Signal Status Update Failure', str(e) + EMAIL_FOOTER)
raise e



results = main(then)

print(results['res'])
Expand Down

0 comments on commit 7786553

Please sign in to comment.