From 778655306fbb8df3b3754fd64e78061cee7c9799 Mon Sep 17 00:00:00 2001 From: John Clary Date: Fri, 16 Sep 2016 10:24:34 -0500 Subject: [PATCH] fetch streets names and such from intersection db --- signal-status/signal_status_data_pusher.py | 186 ++++++++++++++++----- 1 file changed, 142 insertions(+), 44 deletions(-) diff --git a/signal-status/signal_status_data_pusher.py b/signal-status/signal_status_data_pusher.py index f277c140..c1f3f875 100644 --- a/signal-status/signal_status_data_pusher.py +++ b/signal-status/signal_status_data_pusher.py @@ -1,12 +1,13 @@ -# checking for stale but not doing anything about it. +# status duration +# dont upload if no data # enable request verification -# logging and stuff # fieldnames! e.g. atd_intersection_id # dodgy error handling in change detection # use ATD intersection ID as row identifier # append new intersections to historical dataset? import pymssql +import pyodbc import arrow import requests import json @@ -14,14 +15,14 @@ from secrets import KITS_CREDENTIALS from secrets import SOCRATA_CREDENTIALS from secrets import ALERTS_DISTRIBUTION +from secrets import IDB_PROD_CREDENTIALS + SOCRATA_SIGNAL_STATUS = 'https://data.austintexas.gov/resource/5zpr-dehc.json' -SOCRATA_SIGNAL_STATUS_HISTORICAL = 'https://data.austintexas.gov/resource/kn2s-yypv.json' +SOCRATA_SIGNAL_STATUS_HISTORICAL = 'https://data.austintexas.gov/resource/x62n-vjpq.json' SOCRATA_SIGNAL_STATUS_LOGS = 'https://data.austintexas.gov/resource/n5kp-f8k4.json' -IGNORE_INTESECTIONS =['959'] - EMAIL_FOOTER = ''' \n This is an automated message generated by Austin Transportation's Arterial Management Division. To unsubscribe, contact john.clary@austintexas.gov. @@ -30,8 +31,11 @@ then = arrow.now() logfile_filename = 'logs/signals-on-flash/{}.csv'.format(then.format('YYYY-MM-DD')) + + def fetch_kits_data(): print('fetch kits data') + conn = pymssql.connect( server=KITS_CREDENTIALS['server'], user=KITS_CREDENTIALS['user'], @@ -42,16 +46,15 @@ def fetch_kits_data(): cursor = conn.cursor(as_dict=True) search_string = ''' - SELECT i.INTID as intid - , e.INTNAME as intname - , e.DATETIME as intstatusdatetime - , e.STATUS as intstatus - , i.POLLST as pollstatus - , e.OPERATION as operationstate - , e.PLANID as planid - , i.STREETN1 as streetn1 - , i.STREETN2 as streetn2 - , i.ASSETNUM as assetnum + SELECT i.INTID as database_id + , e.DATETIME as status_datetime + , e.STATUS as intersection_status + , i.POLLST as poll_status + , e.OPERATION as operation_state + , e.PLANID as plan_id + , i.STREETN1 as primary_street + , i.STREETN2 as secondary_street + , i.ASSETNUM as atd_intersection_id , i.LATITUDE as latitude , i.LONGITUDE as longitude FROM [KITS].[INTERSECTION] i @@ -78,7 +81,7 @@ def fetch_published_data(): -def reformat_sql_data(dataset): +def reformat_kits_data(dataset): print('reformat data') reformatted_data = [] @@ -98,8 +101,8 @@ def reformat_sql_data(dataset): def group_data(dataset, key): - print('group data') + grouped_data = {} for row in dataset: @@ -116,8 +119,8 @@ def check_for_stale_data(dataset): status_times = [] for record in dataset: - if record['intstatusdatetime']: - compare = arrow.get(record['intstatusdatetime']) + if record['status_datetime']: + compare = arrow.get(record['status_datetime']) status_times.append(compare) oldest_record = arrow.get(max(status_times)).replace(tzinfo='US/Central') # have to swap TZ info here because the database query is incorrectly storing datetimes as UTC @@ -137,6 +140,7 @@ def check_for_stale_data(dataset): email_alert.send_email(ALERTS_DISTRIBUTION, subject, body) + def detect_changes(new, old): print('detect changes') @@ -149,19 +153,16 @@ def detect_changes(new, old): upsert_historical = [] for record in new: # compare KITS to socrata data - lookup = str(new[record]['intid']) + lookup = str(new[record]['database_id']) - if lookup in IGNORE_INTESECTIONS: - continue - if lookup in old: - new_status = str(new[record]['intstatus']) + new_status = str(new[record]['intersection_status']) try: - old_status = str(old[lookup]['intstatus']) + old_status = str(old[lookup]['intersection_status']) except: - not_processed.append(new[record]['intid']) + not_processed.append(new[record]['database_id']) continue if new_status == old_status: @@ -169,7 +170,7 @@ def detect_changes(new, old): else: update += 1 - new[record]['intstatusprevious'] = old_status + new[record]['intersection_status_previous'] = old_status upsert.append(new[record]) upsert_historical.append(old[lookup]) @@ -178,13 +179,13 @@ def detect_changes(new, old): upsert.append(new[record]) for record in old: # compare socrata to KITS to idenify deleted records - lookup = old[record]['intid'] + lookup = old[record]['database_id'] if lookup not in new: delete += 1 upsert.append({ - 'intid': lookup, + 'database_id': lookup, ':deleted': True }) @@ -198,15 +199,104 @@ def detect_changes(new, old): 'upsert_historical': upsert_historical } -def prepare_socrata_payload(upsert_data): + + +def connect_int_db(credentials): + print('connecting to intersection database') + + conn = pyodbc.connect( + 'DRIVER={{SQL Server}};' + 'SERVER={};' + 'PORT=1433;' + 'DATABASE={};' + 'UID={};' + 'PWD={}' + .format( + credentials['server'], + credentials['database'], + credentials['user'], + credentials['password'] + )) + + cursor = conn.cursor() + + return conn + + + +def prep_int_db_query(upsert_data): + ids = [] + + print('prep intersection database query') + + for row in upsert_data: + ids.append(row['atd_intersection_id']) + + where = str(ids).translate(None, "[]") + + query = ''' + SELECT * FROM GIS_QUERY + WHERE GIS_QUERY.ATD_INTERSECTION_ID IN ({}) + '''.format(where) + + return query + + + +def get_int_db_data_as_dict(connection, query, key): + print('get intersection database data') + + results = [] + + grouped_data = {} + + cursor = connection.cursor() + + cursor.execute(query) + + columns = [column[0] for column in cursor.description] + + for row in cursor.fetchall(): + results.append(dict(zip(columns, row))) + + for row in results: + new_key = str(int(row[key])) + grouped_data[new_key] = row + + return grouped_data + + + +def prepare_socrata_payload(upsert_data, int_db_data): print('prepare socrata payload') + + not_found = [] + now = arrow.now() + for row in upsert_data: - row['processeddatetime'] = now.format('YYYY-MM-DD HH:mm:ss') - row['recordid'] = '{}_{}'.format(row['intid'], str(now.timestamp)) + atd_intersection_id = row['atd_intersection_id'] - return upsert_data + if atd_intersection_id in int_db_data: + row['processed_datetime'] = now.format('YYYY-MM-DD HH:mm:ss') + row['record_id'] = '{}_{}'.format(row['database_id'], str(now.timestamp)) + row['primary_street'] = int_db_data[atd_intersection_id]['STREET_SEGMENTS.FULL_STREET_NAME'] + + if int_db_data[atd_intersection_id]['STREET_SEGMENTS_1.FULL_STREET_NAME']: + row['cross_street'] = int_db_data[atd_intersection_id]['STREET_SEGMENTS_1.FULL_STREET_NAME'] + row['intersection_name'] = row['primary_street'] + " / " + row['cross_street'] + else: + row['intersection_name'] = row['primary_street'] + " (NO CROSS ST)" + + row['latitude'] = int_db_data[atd_intersection_id]['LATITUDE'] + row['latitude'] = int_db_data[atd_intersection_id]['LONGITUDE'] + + else: + not_found.append(atd_intersection_id) + upsert_data.remove(row) + + return (upsert_data, not_found) @@ -280,22 +370,28 @@ def main(date_time): try: new_data = fetch_kits_data() - - new_data_reformatted = reformat_sql_data(new_data) + + new_data_reformatted = reformat_kits_data(new_data) check_for_stale_data(new_data) - new_data_grouped = group_data(new_data_reformatted, 'intid') + new_data_grouped = group_data(new_data_reformatted, 'database_id') old_data = fetch_published_data() - - old_data_grouped = group_data(old_data, 'intid') + + old_data_grouped = group_data(old_data, 'database_id') change_detection_results = detect_changes(new_data_grouped, old_data_grouped) - socrata_payload = prepare_socrata_payload(change_detection_results['upsert']) - - socrata_response = upsert_open_data(socrata_payload, SOCRATA_SIGNAL_STATUS) + conn = connect_int_db(IDB_PROD_CREDENTIALS) + + int_db_query = prep_int_db_query(change_detection_results['upsert']) + + int_db_data = get_int_db_data_as_dict(conn, int_db_query, 'ATD_INTERSECTION_ID') + + socrata_payload = prepare_socrata_payload(change_detection_results['upsert'], int_db_data) + + socrata_response = upsert_open_data(socrata_payload[0], SOCRATA_SIGNAL_STATUS) socrata_response_historical = upsert_open_data(change_detection_results['upsert_historical'], SOCRATA_SIGNAL_STATUS_HISTORICAL) @@ -306,17 +402,19 @@ def main(date_time): return { 'res': socrata_response, 'res_historical': socrata_response_historical, - 'payload': socrata_payload, - 'logfile': logfile_data + 'payload': socrata_payload[0], + 'logfile': logfile_data, + 'not_found': socrata_payload[1] } except Exception as e: print('Failed to process data for {}'.format(date_time)) print(e) - email_alert.send_email(ALERTS_DISTRIBUTION, 'DATA PROCESSING ALERT: Signal Status Update Failure', e + EMAIL_FOOTER) + email_alert.send_email(ALERTS_DISTRIBUTION, 'DATA PROCESSING ALERT: Signal Status Update Failure', str(e) + EMAIL_FOOTER) raise e + results = main(then) print(results['res'])