# Data Processing

Goal: To create the segment_time_levels table which joins traffic level data with segments/time cross product.

## Load Raw Data

In [1]:
import numpy as np
import pandas as pd
import psycopg2 as pg
import datetime as dt
import os
#import shapely
import ast
import yaml
from sqlalchemy import create_engine
from sklearn import preprocessing
#from shapely.geometry import LineString, shape
#from shapely.wkb import dumps, loads
#from shapely.wkt import dumps, loads

#import lib.segments
#import lib.times
#import lib.sqlqueries


%matplotlib inline
import matplotlib.pyplot as plt

### Inputs and connection string

In [2]:
# model configurations

file_args = yaml.load(open('./conf/pipeline_args.yml','r'))

In [14]:
# create db connection objects

conn_str_file = './conf/db_conn_str.txt'

pg_conn_str = open(conn_str_file, 'r').read()
conn = pg.connect(pg_conn_str)

sqlalchemy_conn_str = open('./conf/sqlalchemy_conn_str.txt', 'r').read()
engine = create_engine(sqlalchemy_conn_str)

OperationalError: could not connect to server: Connection refused (0x0000274D/10061)
	Is the server running on host "localhost" (::1) and accepting
	TCP/IP connections on port 5432?
could not connect to server: Connection refused (0x0000274D/10061)
	Is the server running on host "localhost" (127.0.0.1) and accepting
	TCP/IP connections on port 5432?


In [3]:
time_bucket = str(file_args['time_resolution'])
filepath = './data/raw/'
#cum_ts = 100/file_args['time_queries']['cum_ts_pct']
#cum_seg = 100/file_args['segment_queries']['cum_seg_pct']

## Load Waze data in to data frame

In [4]:
# allow processing the raw fcsv ile directly or importing the csv into the db first 
# then re-pulling the data for transformation.
import_type = 'csv' # db or csv

if import_type == 'csv':
    csv_file = filepath + 'waze_data.csv'
    waze_raw_df = pd.read_csv(csv_file)
elif import_type == 'db':  # assumes postgresql
    # assume connection file is always present
    conn_str_file = '../conf/db_conn_str.txt'
    conn = pg.connect(conn_str)
    waze_raw_df = pd.read_sql('select  id, uuid, waze_timestamp, street, \
                                       start_node, end_node, city, length, delay, \
                                       speed, level, road_type, geom, \
                                       ST_AsText(geom) as linestring, \
                                       ST_NumPoints(geom) as linestring_length \
                               from waze_data', con=conn)

# make a copy of waze_raw_df
waze_processed_df = waze_raw_df.copy()

In [7]:
waze_raw_df['geom'].nunique()

117780

In [6]:
waze_raw_df

Unnamed: 0,uuid,waze_timestamp,street,start_node,end_node,city,length,delay,speed,level,road_type,geom
0,348279374,2017-02-24 08:47:01,Rosecrans St,,Lytton St,"San Diego, CA",934,464,1.755556,4,6,0102000020E610000015000000EF39B01C214D5DC015C6...
1,348764673,2017-02-24 08:47:01,to SR-163 N,,SR-163 N,"San Diego, CA",487,101,3.922222,3,4,0102000020E61000000E000000D15B3CBCE7495DC05F29...
2,348536095,2017-02-24 08:47:01,Via Rancho Pkwy,,to I-15 N,"Escondido, CA",637,90,4.541667,2,7,0102000020E610000005000000FE8172DBBE445DC04DA0...
3,40791027,2017-02-24 08:51:49,Hoover Ave,,,"National City, CA",100,-1,0.000000,5,1,0102000020E610000002000000FE7E315BB2465DC0DD43...
4,36971633,2017-02-24 08:51:49,Hoover Ave,,,"National City, CA",100,-1,0.000000,5,1,0102000020E610000002000000C66D3480B7465DC08196...
5,353993967,2017-02-24 08:51:49,Civic Center Dr,,to I-5 N,"National City, CA",303,114,1.850000,3,7,0102000020E61000000400000091D6187442475DC062D9...
6,349622483,2017-02-24 08:51:49,Civic Center Dr,,Harbor Dr,"National City, CA",252,98,1.908333,3,7,0102000020E610000003000000CC5D4BC807475DC0F224...
7,345243533,2017-02-24 08:51:49,I-5 N,,I-5 N,"National City, CA",3118,302,7.672222,3,3,0102000020E610000020000000439259BDC3465DC08D7C...
8,349775633,2017-02-24 08:51:49,Beyer Blvd,,to SR-905 W,"San Diego, CA",475,128,2.797222,3,7,0102000020E610000007000000AA6400A8E2435DC03E24...
9,349685702,2017-02-24 08:51:49,La Media Rd,,Birch Rd,"Chula Vista, CA",1462,118,6.416667,2,2,0102000020E610000014000000F530B43A393F5DC01495...


In [8]:
waze_raw_df[waze_raw_df['waze_timestamp'].apply(lambda x: x[0:10] == '2017-03-10')]

Unnamed: 0,uuid,waze_timestamp,street,start_node,end_node,city,length,delay,speed,level,road_type,geom
236951,701952516,2017-03-10 00:26:21,Hoover Ave,,,National City,100,-1,0.000000,5,1,0102000020E610000002000000C66D3480B7465DC08196...
237099,702552447,2017-03-10 01:37:14,W B St,,State St,San Diego,83,-1,0.000000,5,1,0102000020E610000002000000BE892139994A5DC06406...
237273,698332403,2017-03-10 01:37:14,W B St,,Union St,San Diego,83,-1,0.000000,5,1,0102000020E6100000020000001B47ACC5A74A5DC0C30E...
237328,697029395,2017-03-10 01:37:14,Lindo Paseo,,College Ave,San Diego,42,-1,0.000000,5,1,0102000020E61000000200000058569A9482445DC07D76...
237729,700391508,2017-03-10 01:37:14,Lindo Paseo,,Montezuma Pl,San Diego,42,-1,0.000000,5,1,0102000020E610000002000000F1BDBF417B445DC01E6E...
237752,816971842,2017-03-10 01:46:49,Texas St,,,San Diego,68,-1,0.000000,5,2,0102000020E610000002000000570A815CE2485DC00A85...
238214,702552447,2017-03-10 02:07:09,W B St,,State St,San Diego,83,-1,0.000000,5,1,0102000020E610000002000000BE892139994A5DC06406...
238285,698332403,2017-03-10 02:07:09,W B St,,Union St,San Diego,83,-1,0.000000,5,1,0102000020E6100000020000001B47ACC5A74A5DC0C30E...
238677,697029395,2017-03-10 02:07:09,Lindo Paseo,,College Ave,San Diego,42,-1,0.000000,5,1,0102000020E61000000200000058569A9482445DC07D76...
238741,700391508,2017-03-10 02:07:09,Lindo Paseo,,Montezuma Pl,San Diego,42,-1,0.000000,5,1,0102000020E610000002000000F1BDBF417B445DC01E6E...


## Create Processed DataFrame with Additional Columns

In [None]:
# extract day of week, date, time, and timestamp rounded to 15 minute interval
if import_type == 'csv':
    waze_processed_df['dow'] = pd.to_datetime(waze_processed_df['waze_timestamp']).dt.dayofweek
    waze_processed_df['month'] = pd.to_datetime(waze_processed_df['waze_timestamp']).dt.month
    waze_processed_df['date'] = pd.to_datetime(waze_processed_df['waze_timestamp']).dt.date
    waze_processed_df['time'] = pd.to_datetime(waze_processed_df['waze_timestamp']).dt.time
    waze_processed_df['timestamp_round'] = pd.to_datetime(waze_processed_df['waze_timestamp']).apply(lambda dt: datetime.datetime(dt.year, dt.month, dt.day, dt.hour,5*(dt.minute // 5)))
    waze_processed_df['time_round'] = pd.to_datetime(waze_processed_df['timestamp_round']).dt.time
elif import_type == 'db':
    waze_processed_df['dow'] = waze_processed_df['waze_timestamp'].dt.dayofweek
    waze_processed_df['month'] = waze_processed_df['waze_timestamp'].dt.month
    waze_processed_df['date'] = waze_processed_df['waze_timestamp'].dt.date
    waze_processed_df['time'] = waze_processed_df['waze_timestamp'].dt.time
    waze_processed_df['timestamp_round'] = waze_processed_df['waze_timestamp'].apply(lambda dt: datetime.datetime(dt.year, dt.month, dt.day, dt.hour,5*(dt.minute // 5)))
    waze_processed_df['time_round'] = waze_processed_df['timestamp_round'].dt.time 

In [None]:
waze_processed_df['is_weekend'] = np.where(((waze_processed_df['dow']==5) | (waze_processed_df['dow']==6)), 1, None)

holidays = [datetime.date(2017,1,2), datetime.date(2017,1,16), datetime.date(2017,2,20), datetime.date(2017,5,29)]
holidays_df = pd.DataFrame(holidays, columns = ['date'])
holidays_df['is_holiday'] = 1
waze_processed_df = pd.merge(waze_processed_df, holidays_df, how = 'left', on= 'date')

waze_processed_df['waze_timestamp_tmp'] = pd.to_datetime(waze_processed_df['waze_timestamp'])
waze_processed_df['is_rushhour'] = np.where((waze_processed_df.waze_timestamp_tmp.dt.strftime('%H:%M:%S').between('07:00:00','09:00:00')) | (waze_processed_df.waze_timestamp_tmp.dt.strftime('%H:%M:%S').between('16:00:00','19:00:00')), 1, None)
waze_processed_df = waze_processed_df.drop(columns = ['waze_timestamp_tmp'])

In [None]:
def linestring_length(row):
    _,ls = row.split('(')
    linestring = ls[:-1]
    segments = linestring.split(',')
    return len(segments)

In [None]:
if import_type == 'csv':
    waze_processed_df['linestring'] = \
        waze_processed_df['geom'].apply(lambda x: shapely.wkb.loads(x, hex=True).wkt)

    waze_processed_df['linestring_length'] =  waze_processed_df['linestring'].apply(linestring_length)
waze_processed_df.head()

## Create Time Table

In [None]:
time_cols = ['timestamp_round','date','time_round','dow','month','is_weekend','is_holiday','is_rushhour']
time_df = waze_processed_df.loc[:,time_cols].drop_duplicates().sort_values(by='timestamp_round').reset_index()
time_df.drop('index', axis=1, inplace=True)
time_df['time_id'] = time_df.index + 1
time_df.columns = ['timestamp_round', 'date', 'time', 'day_of_week', 'month', 'is_weekend', 'is_holiday', 'is_rushhour', 'time_id']
time_df = time_df[['date','day_of_week','month','is_weekend','is_holiday','is_rushhour','time','timestamp_round','time_id']]
time_df.head()

## Create Segment Table

In [4]:
# helper function
def extract_segments(ls):
    '''function to extract individual lonlat segments from a linestring'''
    ls_0 = ls.split('(')[-1:][0]
    lonlats_str = ls_0[:-1].split(',')
    lons = [float(ll.split()[0]) for ll in lonlats_str]
    lats = [float(ll.split()[1]) for ll in lonlats_str]
    lonlats = list(zip(lons, lats))
    segments = [(lonlats[i],lonlats[i+1]) for i in range(len(lonlats)-1)]
    return segments

In [None]:
# extract segments from linestrings
linestrings = np.array(waze_processed_df['linestring'].values)
waze_segments = [extract_segments(x) for x in linestrings]

In [None]:
# check to make sure lengths align
# segment_lengths = np.array(map(len, waze_segments)) + 1
# linestring_lengths = np.array(waze_raw_df['linestring_length'].values)

# print 'should be zero: {}'.format(np.average(linestring_lengths - segment_lengths))

In [None]:
# add segments to processed dataframe
waze_processed_df['segments'] = waze_segments

In [None]:
# get list of individual segments
segments_list = waze_processed_df['segments'].values

In [None]:
# flatten list
flat_segments_list = [segment for segments in segments_list for segment in segments]

In [None]:
# get unique segments
unique_segments = np.array(list(set(flat_segments_list)))

In [None]:
# create dict for dataframe
segments_dict = {
    'segment_id': np.array(range(len(unique_segments))) + 1,
    'segment': [s for s in unique_segments],
    'lat1': [s[0][1] for s in unique_segments],
    'lon1': [s[0][0] for s in unique_segments],
    'lat2': [s[1][1] for s in unique_segments],
    'lon2': [s[1][0] for s in unique_segments]
}

In [None]:
# create segment dataframe
segment_df = pd.DataFrame(segments_dict)

In [None]:
segment_df_tmp = segment_df[['lat1','lon1','lat2','lon2','segment_id']]

## Create UUID Table

In [None]:
waze_processed_df['uuid_instance_id'] = waze_processed_df.index
uuid_df = waze_processed_df[['uuid', 'uuid_instance_id', 'street','start_node','end_node','waze_timestamp','city','length','delay','speed','level','road_type']]
uuid_df.columns = ['uuid', 'uuid_instance_id', 'street_original', 'start_node', 'end_node', 'waze_timestamp', 'city_original', 'length_original', 'delay', 'speed', 'level', 'road_type_original']
uuid_df.head()

## Create Time/Segment/UUID Mapping Table

In [None]:
def split_data_frame_list(df, 
                       target_column,
                      output_type=float):
    ''' 
    Accepts a column with multiple types and splits list variables to several rows.

    df: dataframe to split
    target_column: the column containing the values to split
    output_type: type of all outputs
    returns: a dataframe with each entry for the target column separated, with each element moved into a new row. 
    The values in the other columns are duplicated across the newly divided rows.
    '''
    row_accumulator = []

    def split_list_to_rows(row):
        split_row = row[target_column]
        if isinstance(split_row, list):
            for s in split_row:
                new_row = row.to_dict()
                new_row[target_column] = output_type(s)
                row_accumulator.append(new_row)
        else:
            new_row = row.to_dict()
            new_row[target_column] = output_type(split_row)
            row_accumulator.append(new_row)
  
    df.apply(split_list_to_rows, axis=1)
    new_df = pd.DataFrame(row_accumulator)
  
    return new_df

In [None]:
time_df.to_sql(name='time', con=engine, if_exists='replace', dtype={'is_rushhour': sqlalchemy.types.Boolean, 
                             'is_weekend':  sqlalchemy.types.Boolean,
                             'is_holiday': sqlalchemy.types.Boolean,
                             'day_of_week': sqlalchemy.types.String})

uuid_df.to_sql(name='uuid', con=engine, if_exists='replace')

for i in range(23):
    
    matrix_df = pd.DataFrame(columns=['uuid','uuid_instance_id','segments','lat1','lon1','lat2','lon2','path'])
    
    split_df = split_data_frame_list(waze_processed_df[i*100000:100000+i*100000], 'segments', output_type=tuple)
    split_df['lon1'] = split_df['segments'].apply(lambda x: x[0][0])
    split_df['lat1'] = split_df['segments'].apply(lambda x: x[0][1])
    split_df['lon2'] = split_df['segments'].apply(lambda x: x[1][0])
    split_df['lat2'] = split_df['segments'].apply(lambda x: x[1][1])
    
    split_df.insert(0, 'path', range(len(split_df)))
            
    matrix_df = matrix_df.append(split_df)
            
    matrix_df_tmp = pd.merge(matrix_df, time_df, on='timestamp_round')
    matrix_df_tmp2 = pd.merge(matrix_df_tmp, segment_df, on=['lat1','lon1','lat2','lon2'])
    matrix_df = matrix_df_tmp2[['uuid_instance_id', 'path', 'time_id', 'segment_id']]

    uuid_withseg = pd.merge(uuid_df, matrix_df[['segment_id','uuid_instance_id']], on=['uuid_instance_id'])
    segfields = uuid_withseg[['segment_id','street_original','city_original','road_type_original']].drop_duplicates()
    segments_df = pd.merge(segment_df_tmp, segfields, on='segment_id')
    segments_df.columns = ['lat1','lon1','lat2','lon2','segment_id','street','city','road_type']
    

    segments_df.to_sql(name='segments_all', con=engine, if_exists='append')
    matrix_df.to_sql(name='matrix', con=engine, if_exists='append')
    
    print(i+" done")

## Write Padres Games DataFrame to Postgres

In [None]:
padres_df = pd.read_csv(filepath+'padreswindow.csv')
padres_df = padres_df[['Date','start_time','Time','Attendance']]

In [None]:
def fix_date(d):
    mon, day = d.split()[1:]
    if len(day) == 1:
        day = '0'+day
    return '{} {} 2017'.format(mon, day)

In [None]:
def create_timedelta(duration):
    hrs_mins = [int(x) for x in duration.split(':')]
    mins = 60*hrs_mins[0] + hrs_mins[1]
    return pd.Timedelta(minutes = mins)

In [None]:
fixed_dates = padres_df['Date'].apply(fix_date)
padres_df.loc[:,'Date'] = fixed_dates

date_col = pd.to_datetime(padres_df['Date'])
padres_df.loc[:,'Date'] = date_col

start_time_col = pd.to_datetime(padres_df['start_time'], format='%H:%M').dt.time
padres_df.loc[:,'start_time'] = start_time_col

game_duration = padres_df['Time'].apply(create_timedelta)
padres_df.loc[:,'game_duration'] = game_duration

padres_df['game_start'] = padres_df[['Date','start_time']].apply(lambda row: datetime.datetime.combine(row['Date'], row['start_time']), axis=1)
padres_df['game_end'] = padres_df['game_start'] + padres_df['game_duration']

padres_df = padres_df[['game_start','game_end','Attendance']]
padres_df.columns = ['game_start','game_end','attendance']

padres_df.head()

In [None]:
padres_df.to_sql(name='padres_games', con=engine, if_exists='replace', index=False)

## Write CHP DataFrame to Postgres

In [9]:
collisions_df = pd.read_csv(filepath+'CollisionRecords.txt',sep=',')

  interactivity=interactivity, compiler=compiler, result=result)


In [10]:
collisions_df = collisions_df.loc[(collisions_df['LATITUDE'] < 33.5) & (collisions_df['LONGITUDE'] > 116)]
collisions_df = collisions_df.loc[(collisions_df['COLLISION_DATE'] > 20170131) & (collisions_df['COLLISION_DATE'] < 20170701)]

In [11]:
collisions_df.to_sql(name='collisions', con=engine, if_exists='replace', index=False)

## SQL Queries to process data

Please check SQLQueries.py module for full SQL statements. The module can be found in the lib folder.

In [7]:
cur = conn.cursor()

In [8]:
# Use rollback if any of the SQL queries below don't run properly.
cur.execute('ROLLBACK')

In [9]:
# Drop index field from existing tables if they exist
cur.execute(sqlqueries.SQL_drop_indexes)

In [10]:
# Update segments table with additional fields relating to direction and geometry
cur.execute(sqlqueries.SQL_update_segments)

In [11]:
# Create events tables (events and events_dictionary) 
# Import data from special_events_list_datasd.csv and special_events_listings_dictionary.csv
cur.execute(sqlqueries.SQL_create_events_table)

In [12]:
# Create time bucketing table matrix_## based on time_bucket parameter.
# Table will contain 
cur.execute(sqlqueries.SQL_Time_Bucketing)

In [13]:
# Create empty segments_selected and times_selected tables for later use below.
cur.execute(sqlqueries.SQL_create_segments_times_selected)

In [14]:
# Create tables to indicate most active segments
cur.execute(sqlqueries.SQL_pct_segments)

In [15]:
# Create tables to indicate most active time periods
cur.execute(sqlqueries.SQL_pct_time)

In [16]:
# Create unique segments from segments table. Segments table created above has many duplicates which greatly adds to size.
cur.execute(sqlqueries.SQL_unique_segments)

In [17]:
conn.commit()

# Create segments/times table

## Create Selected Segments

In [18]:
segments_obj = segments(conn, file_args['segment_queries_to_run'], file_args)
segments_obj.run_queries()

0 bounding_box
TRUNCATE segments_selected; 

WITH segments_to_keep AS ( SELECT segment_id FROM segments WHERE geom @ ST_MakeEnvelope (-117.81, 32.770, -117.11, 32.709) and ST_Length(geom) > 0 ) 
INSERT INTO segments_selected (SELECT lat1, lon1, lat2, lon2, segment_id, street, city, road_type, geom, direction, seg_length, cum_seg_pct FROM segments WHERE segment_id IN (SELECT segment_id from segments_to_keep)) 

1 cum_seg_pct
with segments_to_keep as ( SELECT segment_id FROM seg_cum_pct WHERE cum_pos_pct <= 95 ) 
DELETE from segments_selected WHERE segment_id NOT IN (SELECT segment_id from segments_to_keep); 



## Create Selected Times

In [19]:
times_obj = times(conn, file_args['time_queries_to_run'], file_args)
times_obj.run_queries()

0 cum_ts_pct
TRUNCATE times_selected; 

WITH times_to_keep AS ( SELECT time_id FROM ts_cum_pct WHERE cum_pos_pct <= 100 ) 
INSERT INTO times_selected (SELECT * FROM time_30 WHERE time_id IN (SELECT time_id from times_to_keep)) 



## SQL cross join to create cartesian product of segments/times

In [33]:
cur.execute('ROLLBACK')

cur.execute("""
SELECT count(*) FROM segments_selected
""")

results = cur.fetchone()
segments_count = results[0]
print('Number of segments selected: '+str(segments_count))

cur.execute("""
SELECT count(*) FROM times_selected
""")

results = cur.fetchone()
time_buckets_count = results[0]
print('Number of time buckets selected: '+str(time_buckets_count))

print('Time/segment cross product: '+str(segments_count*time_buckets_count))

Number of segments selected: 6687
Number of time buckets selected: 5644
Time/segment cross product: 37741428


In [30]:
# query database to get matrix of all segments and times selected
segments_time_selected = """
SELECT segment_id, street, road_type, lat1, lon1, lat2, lon2, time_id, date, day_of_week, month, time
INTO segments_time_selected
FROM segments_selected CROSS JOIN times_selected 
"""

cur.execute('ROLLBACK')
cur.execute('DROP TABLE IF EXISTS segments_time_selected')
cur.execute(segments_time_selected)

print('Done creating segments_times_selected table.')

## Create levels table

In [13]:
# query database to get matrix of positive traffic incidents
levels_selected = """
SELECT m.segment_id, m.time_id, s.street, s.lat1, s.lon1, 
    s.lat2, s.lon2, t.date, t.time, t.day_of_week, 
    s.road_type,
    min(u.level) as level_min,
    max(u.level) as level_max,
    avg(u.level) as level_mean,
    count(u.level) as level_count
INTO levels_selected
FROM matrix_""" + str(file_args['time_resolution']) + """ m, times_selected t, uuid u, segments_selected s
WHERE m.time_id = t.time_id 
    and m.uuid_instance_id = u.uuid_instance_id 
    and s.segment_id = m.segment_id
GROUP BY m.segment_id, m.time_id, s.street, s.lat1, s.lon1, 
    s.lat2, s.lon2, t.date, t.time, t.day_of_week, 
    s.road_type
"""

cur.execute('ROLLBACK')
cur.execute('DROP TABLE IF EXISTS levels_selected')
cur.execute(levels_selected)

print('Done creating levels_selected table.')

Done creating levels_selected table.


## Create segments/time/levels table

In [8]:
# join positive incidents to cartesian product of segments and times to create final table for building training data
segments_time_level_selected = """
SELECT segments_time_selected.*, levels_selected.level_min, levels_selected.level_max, levels_selected.level_mean, levels_selected.level_count
INTO segments_time_level_selected
FROM levels_selected RIGHT JOIN segments_time_selected
ON levels_selected.segment_id = segments_time_selected.segment_id
AND levels_selected.time_id = segments_time_selected.time_id
"""

cur.execute('ROLLBACK')
cur.execute('DROP TABLE IF EXISTS segments_time_level_selected')
cur.execute(segments_time_level_selected)

print('Done creating segments/times/level selected table. End of notebook')

Done creating segments/times/level selected table. End of notebook
