## Merge OD and AUX data with Block Centroid and Calculate Distance and Stats

In [1]:
import pandas as pd
import geopandas as gp
import fiona
import shapely.geometry as geom

from pathlib import Path
import timeit
from IPython.display import clear_output
from pyproj import Geod
import numpy as np

#### Function - Convert decimal time to minutes and seconds

In [2]:
def get_time(timer):
    minutes, seconds = int(np.floor(timer)), round(np.asscalar(timer % 1)*60)
    return[minutes,  seconds]

#### Funtion to calculate distance between work and home

In [3]:
def Distance(lat1,lon1,lat2,lon2):
  az12,az21,dist = wgs84_geod.inv(lon1,lat1,lon2,lat2) 
  return dist

#### Funtion to process the locations with range of employees and merge with centroids

In [4]:
def chunk_process_distance(df):
    # create a group of all workplace geoids
    df_group = df.groupby(['w_geocode']).sum()
    
    # include only the records with # employees in the block
    df_group_limited = df_group[(df_group['S000'] >= 100) & (df_group['S000'] <= 999)]
    
    # create a dataframe with only blocks with 1000+ employees
    df2 = pd.merge(df, df_group_limited, on='w_geocode', suffixes=('','_del'))
    # keep the sum of the S000 to use in later filtering
    df2.rename(columns={'S000_del': 'w_group_count'}, inplace=True)
    df2.drop(list(df2.filter(regex='_del')), axis=1, inplace=True)
    
    
    # create list of all block with > 1000 workers
    #top_block_list = df_group_limited.index.tolist()
    # create a dataframe with only block with > 1000
    #df2 = df[df['w_geocode'].isin(top_block_list)]
    
    # sort the dataframe
    #df2_sort= df2.sort_values(['w_geocode','h_geocode'], ascending=[True, True])
    # reorder df2_sort dataframe
    #df2_sort = df2_sort.reset_index(drop=True)
    # reorder the columns of the dataframe
    #cols = df2_sort.columns.tolist()
    #cols = cols[-2:] + cols[:-2]
    #df2_sort = df2_sort[cols ]
    # group the dataframe by the worker and home block geoid
    #df2_group = df2_sort.groupby(['block_w_geoid','block_h_geoid']).sum().reset_index()
    #df2_sort.drop(['S000','SA01','SA02','SA03','SE01','SE02','SE03','SI01','SI02','SI03'], axis = 1, inplace=True)
    # load the centroid file
    centroids = pd.read_csv(block_file, dtype={'block_geoid': 'object', 'lat': 'float', 'lon': 'float'})
    # merge the centroid data frame with the grouped dataframe
    #merge_result = pd.merge(df2_sort, centroids,left_on=  ['w_geocode'],right_on= ['block_geoid'],how = 'left')
    merge_result = pd.merge(df2, centroids,left_on=  ['w_geocode'],right_on= ['block_geoid'],how = 'left')
    merge_result.drop(['block_geoid'], axis=1, inplace=True)
    merge_result.rename(columns={'lat': 'w_lat', 'lon': 'w_lon'}, inplace=True)
    # merge the centroid data frame on the Fayette county h_geocode column
    merge_result = pd.merge(merge_result, centroids,left_on=  ['h_geocode'],right_on= ['block_geoid'],how = 'left')
    merge_result.drop(['block_geoid'], axis=1, inplace=True)
    merge_result.rename(columns={'lat': 'h_lat', 'lon': 'h_lon'}, inplace=True)
    merge_result['distance'] = Distance(merge_result['w_lat'].tolist(),merge_result['w_lon'].tolist(),merge_result['h_lat'].tolist(),merge_result['h_lon'].tolist())
    merge_result['distance'] = merge_result['distance'].apply(lambda x: round(x, decimals))
    #merge_limited = merge_result[(merge_result['distance'] < 100000) & (merge_result['distance'] > 15000)]
    #merge_limited['geometry'] = merge_limited.apply(lambda x: geom.LineString([(x['w_lon'], x['w_lat'] ), (x['h_lon'],x['h_lat'])]), axis = 1)
    return merge_result

#### Setup output location and file name

In [5]:
ODpath = Path("../data/OD/")
OD_file = ODpath.joinpath("od_aux.csv.gz")
if OD_file.exists ():
    print ("OD file exist")
else:
    print ("OD file does not exist")
    
blockPath = Path("../data/blocks/")
block_file = blockPath.joinpath("block_centroids.csv.gz")
if block_file.exists ():
    print ("Block Centroid file exist")
else:
    print ("Block Centroid file does not exist")

OD file exist
Block Centroid file exist


#### Read the Origin Destination (OD) data into chunk dataframes

In [6]:
%time df_chunk = pd.read_csv(OD_file, compression='gzip', dtype={'w_geocode': str,'h_geocode':str}, chunksize=10000000)

Wall time: 287 ms


#### Read the data in chunks. Filter data by calling function 'chunk_process_distance'

In [7]:
chunk_list = []

wgs84_geod = Geod(ellps='WGS84') 
#Distance will be measured on this ellipsoid - more accurate than a spherical method

chunk_num = 1
decimals = 0   

print('Reading in the chunk dataframe')
start = timeit.default_timer()
for df in df_chunk:
    print('Starting processing for chunk #', chunk_num)
    # call function to group and filter the data 
    filter_chunk = chunk_process_distance(df)
    # append the filtered data to list
    chunk_list.append(filter_chunk)
    
    # get the current time on timer
    stop = timeit.default_timer()
    timer = np.array([(stop-start)/60])
    min_sec = get_time(timer)
    minutes, seconds = min_sec[0], min_sec[1]
    
    clear_output(wait=True)
    print('Chunk number:', chunk_num)
    print('Length of dataframe:',"{:,}".format(len(filter_chunk)),'\n')
    print('Timer:', minutes, 'minutes', seconds, 'seconds')
    chunk_num += 1

print('\nData merge complete.')

Chunk number: 12
Length of dataframe: 5,119,956 

Timer: 15 minutes 11 seconds

Data merge complete.


#### Concatenate the chunk list into a dataframe

In [8]:
# concat the list into dataframe 
%time df_concat = pd.concat(chunk_list)
print('Length of concatenated dataframe:',"{:,}".format(len(df_concat)),'\n')

Wall time: 15.2 s
Length of concatenated dataframe: 86,960,731 



In [9]:
outputZip = 'od_distance_unclean.csv.gz'

#### Create full path with zip file

In [10]:
out_Zip = ODpath.joinpath(outputZip)

#### Write all the unclean OD line data to compressed csv file

In [11]:
# start a timer
start = timeit.default_timer()
print ('Compressing dataframe. Please be patient.')
df_concat.to_csv(out_Zip, compression='gzip', index=None)
clear_output(wait=True)
# get the current time on timer
stop = timeit.default_timer()
timer = np.array([(stop-start)/60])
min_sec = get_time(timer)
minutes, seconds = min_sec[0], min_sec[1]
print('\nData compression complete.\nTotal time:', minutes, 'minutes', seconds, 'seconds')


Data compression complete.
Total time: 39 minutes 33 seconds


#### Arrange the columns

In [12]:
df_concat = df_concat[['w_geocode','h_geocode','distance','w_group_count', 'S000', 'SA01', 'SA02', 'SA03', 'SE01', 'SE02', 'SE03', 'SI01', 'SI02', 'SI03','w_lat','w_lon','h_lat','h_lon']]
df_concat.head()

Unnamed: 0,w_geocode,h_geocode,distance,w_group_count,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03,w_lat,w_lon,h_lat,h_lon
0,20130001001101,20130001001101,,34,1,1,0,0,1,0,0,1,0,0,,,,
1,20130001001101,20130001001200,,34,1,0,0,1,1,0,0,1,0,0,,,,
2,20130001001101,20130001001371,,34,7,1,6,0,4,2,1,7,0,0,,,,
3,20130001001101,20130001002236,,34,5,0,4,1,2,2,1,3,0,2,,,,
4,20130001001101,20130001002319,,34,1,1,0,0,0,1,0,1,0,0,,,,


#### Identify null records

In [13]:
df_null = df_concat[df_concat.isnull().any(axis=1)]
print ('\nthe number of null records:', "{:,}".format(len(df_null)),'\n\n')
df_null.head()


the number of null records: 513,848 




Unnamed: 0,w_geocode,h_geocode,distance,w_group_count,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03,w_lat,w_lon,h_lat,h_lon
0,20130001001101,20130001001101,,34,1,1,0,0,1,0,0,1,0,0,,,,
1,20130001001101,20130001001200,,34,1,0,0,1,1,0,0,1,0,0,,,,
2,20130001001101,20130001001371,,34,7,1,6,0,4,2,1,7,0,0,,,,
3,20130001001101,20130001002236,,34,5,0,4,1,2,2,1,3,0,2,,,,
4,20130001001101,20130001002319,,34,1,1,0,0,0,1,0,1,0,0,,,,


#### Remove null records

In [None]:
df_clean = df_concat.dropna(how='any')
df_null = df_concat[df_concat.isnull().any(axis=1)]
print ('Length of cleaned dataframe:',"{:,}".format(len(df_clean)),'\n')
df_clean.head(3)

#### Remove the precision on the distance

In [None]:
df_clean.distance =  df_clean.distance.map(lambda x: '%.0f' % x)

#### Reduce the precision on the lat & lon columns

In [None]:
df_clean.w_lat =  df_clean.w_lat.map(lambda x: '%.5f' % x)
df_clean.w_lon =  df_clean.w_lon.map(lambda x: '%.5f' % x)
df_clean.h_lat =  df_clean.h_lat.map(lambda x: '%.5f' % x)
df_clean.h_lon =  df_clean.h_lon.map(lambda x: '%.5f' % x)

In [None]:
df_clean.head(3)

#### Name zip file

In [None]:
outputZip = 'od_distance.csv.gz'

#### Create full path with zip file

In [None]:
out_Zip = ODpath.joinpath(outputZip)

#### Write all the OD line data to compressed csv file

In [None]:
# start a timer
start = timeit.default_timer()
print ('Compressing dataframe. Please be patient.')
df_clean.to_csv(out_Zip, compression='gzip', index=None)
clear_output(wait=True)
# get the current time on timer
stop = timeit.default_timer()
timer = np.array([(stop-start)/60])
min_sec = get_time(timer)
minutes, seconds = min_sec[0], min_sec[1]
print('\nData compression complete.\nTotal time:', minutes, 'minutes', seconds, 'seconds')

#### Identify all state FIPS codes in data

In [18]:
state_list = df_clean['w_geocode'].str.slice(0,2).unique().tolist()
print (len(state_list))

48


#### Create a copy of the dataframe

In [None]:
df_all_states = df_clean.copy()

#### Create a column to contain the state FIPS

In [23]:
df_all_states['state'] = df_all_states['w_geocode'].str.slice(0,2)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [24]:
df_all_states.head()

Unnamed: 0,w_geocode,h_geocode,distance,w_group_count,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03,w_lat,w_lon,h_lat,h_lon,state
33968,10010205001001,10010201001016,5896,1076,1,1,0,0,1,0,0,0,1,0,32.45674,-86.41503,32.46662,-86.47665,1
33969,10010205001001,10010201001025,6964,1076,1,1,0,0,0,1,0,0,1,0,32.45674,-86.41503,32.4598,-86.489,1
33970,10010205001001,10010201002016,7088,1076,2,0,2,0,0,0,2,0,2,0,32.45674,-86.41503,32.47418,-86.48756,1
33971,10010205001001,10010201002017,6830,1076,3,3,0,0,1,2,0,0,1,2,32.45674,-86.41503,32.47724,-86.48353,1
33972,10010205001001,10010201002022,7304,1076,1,0,1,0,0,1,0,0,1,0,32.45674,-86.41503,32.488,-86.48342,1


#### Funtion to write each state file to csv

In [25]:
def write_state(df_state):
    df_state.to_csv(out_state, compression='gzip', index=None)

#### Loop through each State FIPS code and call function to write csv

In [26]:
for index in range(0,len(state_list)):
    state_id = state_list[index]
    df_state = df_all_states[df_all_states == state_id]
    outputstate = state_id + '_od_distance.csv.gz'
    out_state = ODpath.joinpath(outputstate)
    write_state(df_state)

## Create the Statistics File

#### Function - Process the statistics 

In [15]:
def chunk_process_stats(df):
    # create a group of all workplace geoids
    #df.drop(['h_geocode'], axis = 1, inplace = True)
    df_group = df.groupby(['w_geocode']).sum()
    return df_group

In [16]:
%time df_chunk = pd.read_csv(OD_file, compression='gzip', dtype={'w_geocode': str,'h_geocode':str}, chunksize=10000000)

CPU times: user 3.72 ms, sys: 18.6 ms, total: 22.3 ms
Wall time: 50.3 ms


In [17]:
chunk_num = 1
decimals = 0   
chunk_stats_list=[]


print('Reading in the chunk dataframe')
start = timeit.default_timer()
for df in df_chunk:
    print('Starting processing for chunk #', chunk_num)
    # call function to group and filter the data 
    stats_filter_chunk = chunk_process_stats(df)
    # append the filtered data to list
    chunk_stats_list.append(stats_filter_chunk)
    
    # get the current time on timer
    stop = timeit.default_timer()
    timer = np.array([(stop-start)/60])
    min_sec = get_time(timer)
    minutes, seconds = min_sec[0], min_sec[1]
    
    clear_output(wait=True)
    print('Chunk number:', chunk_num)
    print('Length of dataframe:',"{:,}".format(len(stats_filter_chunk)),'\n')
    print('Timer:', minutes, 'minutes', seconds, 'seconds')
    chunk_num += 1

print('\nData group stats complete.')

Chunk number: 12
Length of dataframe: 660,179 

Timer: 3 minutes 24 seconds

Data group stats complete.


#### Create a dataframe from the chunk list

In [18]:
# concat the list into dataframe 
%time df_concat_stats = pd.concat(chunk_stats_list,ignore_index=False)

print ('\nrecords loaded to dataframe:', "{:,}".format(len(df_concat_stats)),'\n\n')
df_concat_stats.head(1)

CPU times: user 205 ms, sys: 100 ms, total: 305 ms
Wall time: 328 ms

records loaded to dataframe: 2,815,940 




Unnamed: 0_level_0,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03
w_geocode,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
10010201001000,6,1,3,2,3,3,0,3,0,3


In [19]:
df_concat_stats.reset_index(drop=False, inplace=True)
df_concat_stats.head()

Unnamed: 0,w_geocode,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03
0,10010201001000,6,1,3,2,3,3,0,3,0,3
1,10010201001016,17,1,13,3,5,1,11,0,0,17
2,10010201001018,24,3,12,9,0,12,12,17,7,0
3,10010201001022,267,23,167,77,67,66,134,0,0,267
4,10010201001023,6,2,1,3,1,3,2,0,0,6


#### Group the concatentated dataframe

In [20]:
df_group = df_concat_stats.groupby(['w_geocode']).sum()
print ('\nrecords loaded to dataframe:', "{:,}".format(len(df_group)),'\n\n')


records loaded to dataframe: 2,085,972 




#### Add centroid info for the work block location

In [21]:
# load the centroid file
centroids = pd.read_csv(block_file, dtype={'block_geoid': 'object', 'lat': 'float', 'lon': 'float'})
# merge the centroid data frame with the grouped dataframe
merge_result = pd.merge(df_group, centroids,left_on=  ['w_geocode'],right_on= ['block_geoid'],how = 'left')
#merge_result.drop(['block_geoid'], axis=1, inplace=True)
merge_result.rename(columns={'block_geoid': 'w_geocode', 'lat': 'w_lat', 'lon': 'w_lon'}, inplace=True)
merge_result = merge_result[['w_geocode','S000','SA01','SA02','SA03','SE01','SE02','SE03','SI01','SI02','SI03','w_lat','w_lon']]
print ('\nrecords in dataframe:', "{:,}".format(len(merge_result)),'\n')
df_stats = merge_result[(merge_result['S000'] >= 1000)]
print ('records in dataframe with 1000+ employees:', "{:,}".format(len(df_stats)),'\n\n')
df_stats.reset_index(drop=True, inplace=True)
df_stats.head(1)


records in dataframe: 2,085,972 

records in dataframe with 1000+ employees: 17,355 




Unnamed: 0,w_geocode,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03,w_lat,w_lon
0,10010205001001,1100,536,426,138,534,412,154,13,540,547,32.45674,-86.415025


#### Are there any nulls?

In [22]:
df_null = df_stats[df_stats.isnull().any(axis=1)]
print ('\nthe number of null records:', "{:,}".format(len(df_null)),'\n\n')
df_null.head()


the number of null records: 122 




Unnamed: 0,w_geocode,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03,w_lat,w_lon
179,,1530,257,924,349,599,522,409,1189,308,33,,
180,,1042,161,551,330,581,312,149,956,49,37,,
181,,1572,819,585,168,563,789,220,4,1056,512,,
182,,2857,300,1854,703,85,330,2442,46,6,2805,,
183,,1937,273,1163,501,45,143,1749,1328,0,609,,


#### Remove rows with any nulls

In [23]:
df_stats_clean = df_stats.dropna(how='any')
df_null = df_stats[df_stats.isnull().any(axis=1)]
df_stats_clean.head()

Unnamed: 0,w_geocode,S000,SA01,SA02,SA03,SE01,SE02,SE03,SI01,SI02,SI03,w_lat,w_lon
0,10010205001001,1100,536,426,138,534,412,154,13,540,547,32.45674,-86.415025
1,10030107032109,1009,498,406,105,517,332,160,0,584,425,30.667644,-87.849564
2,10030112023027,1088,173,656,259,93,413,582,0,0,1088,30.518815,-87.88825
3,10030115021041,1280,560,522,198,719,397,164,0,805,475,30.372959,-87.68456
4,10059505002038,1448,432,793,223,105,1020,323,1448,0,0,31.801006,-85.332896


In [24]:
outputZip2 = 'od_stats.csv.gz'

In [25]:
out_Zip = ODpath.joinpath(outputZip2)

In [26]:
%time df_stats_clean.to_csv(out_Zip, compression='gzip', index=False)

CPU times: user 640 ms, sys: 68.4 ms, total: 709 ms
Wall time: 957 ms
