# Question 5, Option C, Step II: Post Process Simulation Files

Author's Workday ID: C175799, Initials: RPR

## Imports and Util Functions

In [1]:
import pandas as pd
import numpy as np
import time
import datetime
from math import radians, cos, sin, asin, sqrt
from haversine import haversine
import math
import random
from multiprocessing import Process, current_process
import hashlib
import glob

def get_time( output=True ):
    
    temp = time.time()
    if output:
        now = datetime.datetime.now()
        print( now.strftime( "%Y.%m.%d %H:%M" ) )
        
    return temp

foo = get_time()

def print_time( start_time, end_time, interval="seconds" ):
    
    if interval == "hours":
        print ( "Time to process: [%s] hours" % ( str( ( end_time - start_time ) / 60 / 60 ) ) )
    else:
        print ( "Time to process: [%s] seconds" % ( str( end_time - start_time ) ) )

print_time( 0, 3600, interval="hours" )

2018.05.09 14:10
Time to process: [1.0] hours


## Concatenate, Summarize and Write Hits Files

In [2]:
all_files = glob.glob( "data/uber-simulation/*.csv" )

hits_files = []
misses_files = []

for file in all_files:
    
    if ( file.find( "points-process-" ) > -1 ) and ( file.find( "-error" ) > 0 ):
        misses_files.append( file )
    elif file.find( "points-process-" ) > -1:
        hits_files.append( file )
    else:
        print( "Wuh?!?" )
        
print( "hits_files", len( hits_files ) )
#print( hits_files )
        
# setup for concatenation
all_summaries = pd.DataFrame()
all_summaries_list = []

start_all_time = get_time()

for file in hits_files:
    
    print( "==========================================================================================" )
    print( "Loading HITS file [%s]..." % ( file ) )
    print( "==========================================================================================" )
    start_time = get_time()
    one_tenth = pd.read_csv( file )
    print_time( start_time, get_time() )
    
    start_time = get_time()
    # turns out "hash" as a column name collides w/ reserved words :-|
    one_tenth.rename( columns={ "hash":"hashed_id" }, inplace=True )
    
    agg_funcs = { "hashed_id":[ "count" ], "total_mins":[ "min", "max", "mean" ], "delta":[ "min", "max", "mean" ], "my_lat":[ "mean" ], "my_lon":[ "mean" ], "my_bearing":[ "mean" ] }
    nearby_taxis_by_hash_grp = one_tenth.groupby( "hashed_id" ).agg( agg_funcs )
    
    # use ravel() to unite min, max, median w/ their parent column: 
    # https://www.shanelynn.ie/summarising-aggregation-and-grouping-data-in-python-pandas/
    nearby_taxis_by_hash_grp.columns = [ "_".join( col_name ) for col_name in nearby_taxis_by_hash_grp.columns.ravel() ]
    
    # finish flattening: https://intoli.com/blog/pandas-aggregation/
    nearby_taxis_by_hash_grp.reset_index( inplace=True )
    nearby_taxis_by_hash_grp.rename( columns={ "hashed_id_count":"taxis_nearby", "my_lat_mean":"my_lat", "my_lon_mean":"my_lon", "my_bearing_mean":"my_bearing" }, inplace=True)
    nearby_taxis_by_hash_grp.sort_values( by="taxis_nearby", ascending=False, inplace=True )
    print( "nearby_taxis_by_hash_grp", len( nearby_taxis_by_hash_grp ) )
    
    lat_lon_bins = 989 * 7 * 24
    cores = 10
    hours_to_complete = 25 / 60

    queries_per_core = lat_lon_bins / cores
    successful_queries = len( nearby_taxis_by_hash_grp )
    print( "Queries w/ hits [%d] out of ~[%d] = [%.2f]%% hit rate" % ( successful_queries, queries_per_core, successful_queries / queries_per_core * 100 ) )

    # calculate queries per sec on one core
    queries_per_sec = queries_per_core / hours_to_complete / 60 /60
    print( "Queries [%.2f] sec per core. Total queries ~[%.2f] sec on [%d] cores" % ( queries_per_sec, queries_per_sec * cores, cores ) )

    taxis_nearby = nearby_taxis_by_hash_grp.taxis_nearby.sum()
    taxis_nearby_mean = nearby_taxis_by_hash_grp.taxis_nearby.mean()
    print( "Taxis nearby [%d], mean taxis [%.2f] per successful query" % (  taxis_nearby, taxis_nearby_mean, ) )

    all_summaries_list.append( nearby_taxis_by_hash_grp )
    print_time( start_time, get_time() )

    
all_summaries = pd.concat( all_summaries_list )
all_summaries.to_csv( "data/taxis-nearby-hits-2.csv", index=False )

print_time( start_all_time, get_time() )

hits_files 10
2018.05.09 14:10
Loading HITS file [data/uber-simulation/points-process-32.csv]...
2018.05.09 14:10
2018.05.09 14:10
Time to process: [1.9337279796600342] seconds
2018.05.09 14:10
nearby_taxis_by_hash_grp 10051
Queries w/ hits [10051] out of ~[16615] = [60.49]% hit rate
Queries [11.08] sec per core. Total queries ~[110.77] sec on [10] cores
Taxis nearby [1645353], mean taxis [163.70] per successful query
2018.05.09 14:10
Time to process: [0.25353336334228516] seconds
Loading HITS file [data/uber-simulation/points-process-34.csv]...
2018.05.09 14:10
2018.05.09 14:10
Time to process: [1.889521837234497] seconds
2018.05.09 14:10
nearby_taxis_by_hash_grp 10968
Queries w/ hits [10968] out of ~[16615] = [66.01]% hit rate
Queries [11.08] sec per core. Total queries ~[110.77] sec on [10] cores
Taxis nearby [1664653], mean taxis [151.77] per successful query
2018.05.09 14:10
Time to process: [0.2442188262939453] seconds
Loading HITS file [data/uber-simulation/points-process-39.csv

## Concatenate Misses Files

In [3]:
all_misses = pd.DataFrame()
all_misses_list = []

start_all_time = get_time()

for file in misses_files:#[ 0:1 ]:
    
    print( "==========================================================================================" )
    print( "Loading MISSES file [%s]..." % ( file ) )
    print( "==========================================================================================" )
    start_time = get_time()
    misses = pd.read_csv( file, names=[ "day", "hour", "lat_bin", "lon_bin", "bearing", "lat", "lon" ] ) #, usecols=[ 0, 1, 2 ]
    
    print( "Rows", len( misses ) )
    print( misses.columns )
    all_misses_list.append( misses )
    print_time( start_time, get_time() )

all_misses = pd.concat( all_misses_list )
all_misses.to_csv( "data/taxis-nearby-misses-2.csv", index=False )

print_time( start_all_time, get_time() )

2018.05.09 14:11
Loading MISSES file [data/uber-simulation/points-process-38-errors.csv]...
2018.05.09 14:11
Rows 4759
Index(['day', 'hour', 'lat_bin', 'lon_bin', 'bearing', 'lat', 'lon'], dtype='object')
2018.05.09 14:11
Time to process: [0.0038557052612304688] seconds
Loading MISSES file [data/uber-simulation/points-process-39-errors.csv]...
2018.05.09 14:11
Rows 4266
Index(['day', 'hour', 'lat_bin', 'lon_bin', 'bearing', 'lat', 'lon'], dtype='object')
2018.05.09 14:11
Time to process: [0.003268718719482422] seconds
Loading MISSES file [data/uber-simulation/points-process-32-errors.csv]...
2018.05.09 14:11
Rows 5538
Index(['day', 'hour', 'lat_bin', 'lon_bin', 'bearing', 'lat', 'lon'], dtype='object')
2018.05.09 14:11
Time to process: [0.003841876983642578] seconds
Loading MISSES file [data/uber-simulation/points-process-37-errors.csv]...
2018.05.09 14:11
Rows 4985
Index(['day', 'hour', 'lat_bin', 'lon_bin', 'bearing', 'lat', 'lon'], dtype='object')
2018.05.09 14:11
Time to process: [

In [4]:
print( len( all_summaries ) )
print( len( all_misses ) )
print( lat_lon_bins )
print( "Percent of hits [%.2f]" % ( len( all_summaries ) / ( len( all_summaries ) + len( all_misses ) ) * 100 ) )

105728
50856
166152
Percent of hits [67.52]


In [5]:
all_summaries.taxis_nearby.sum()

16439112

In [6]:
lat_lon_bins = 989 * 7 * 24
cores = 10
hours_to_complete = 25 / 60

queries_per_core = lat_lon_bins / cores
successful_queries = len( all_summaries )
print( "Queries w/ hits [%d] out of ~[%d] = [%.2f]%% hit rate" % ( successful_queries, queries_per_core, successful_queries / queries_per_core * 100 ) )

# calculate queries per sec on one core
queries_per_sec = queries_per_core / hours_to_complete / 60 /60
print( "Queries [%.2f] sec per core. Total queries ~[%.2f] sec on [%d] cores" % ( queries_per_sec, queries_per_sec * cores, cores ) )


Queries w/ hits [105728] out of ~[16615] = [636.33]% hit rate
Queries [11.08] sec per core. Total queries ~[110.77] sec on [10] cores
