# Impute Home and Work Locations
## Veraset Data - 05/2019 & 06/2019

In [13]:
# date: 3/20/2022
# author: Ziwen
# task: identify home and work location with Veraset rawpings
import os, sys, gc
import pandas as pd
import numpy as np
import yaml
import os
import glob
import ipystata

from google.cloud import storage
from google.cloud import bigquery

path = os.path.expanduser('~')
# set up env credential variable
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path + '/Dropbox/Amenity/source/analysis/veraset_gravy_gps_sample/firm-exit-3608acd14b06.json'
client = bigquery.Client()

import matplotlib.pyplot as plt, seaborn as sns
%matplotlib inline
%config InlineBackend.figure_format='retina'

## Load data from GCS to BQ

### Delete old data

## Impute home and work locations

In [22]:
# label overnight and weekday-daytime records
default_dataset = 'firm-exit.veraset_visits'
destination_table = 'firm-exit.veraset_visits.time_converted_raw_ping_week'
job_config = bigquery.QueryJobConfig(write_disposition='WRITE_TRUNCATE', # overwrite
                                     destination=destination_table,
                                     default_dataset = default_dataset)

query_label = f'''
    with time_converted_table as
    (select 
        -- take necessary variables and conversions
        caid as caid, 
        datetime_sub(timestamp_seconds(utc_timestamp), interval 7 hour) as PDT_time,
        extract(hour from datetime_sub(timestamp_seconds(utc_timestamp), interval 7 hour)) as hofd,
        format_datetime('%A', datetime_sub(timestamp_seconds(utc_timestamp), interval 7 hour)) as weekday,
        format_datetime('%r', datetime_sub(timestamp_seconds(utc_timestamp), interval 7 hour)) as time_of_day,
        substr(geo_hash, 1, 7) as geohash7,
        latitude as latitude,
        longitude as longitude
    from `firm-exit.veraset_visits.new_raw_ping`), week_dummy as
    -- generate a week dummy
    (select time_converted_table.*, 
        case 
            -- keep away from the April 30th to May 5th
            when (PDT_time < '2019-05-06 00:00:00') then '0'
            when (PDT_time >= '2019-05-06 00:00:00') and (PDT_time < '2019-05-13 00:00:00') then '1'
            when (PDT_time >= '2019-05-13 00:00:00') and (PDT_time < '2019-05-20 00:00:00') then '2'
            when (PDT_time >= '2019-05-20 00:00:00') and (PDT_time < '2019-05-27 00:00:00') then '3'
            when (PDT_time >= '2019-05-27 00:00:00') and (PDT_time < '2019-06-03 00:00:00') then '4'
            when (PDT_time >= '2019-06-03 00:00:00') and (PDT_time < '2019-06-10 00:00:00') then '5'
            when (PDT_time >= '2019-06-10 00:00:00') and (PDT_time < '2019-06-17 00:00:00') then '6'
            when (PDT_time >= '2019-06-17 00:00:00') and (PDT_time < '2019-06-24 00:00:00') then '7'
            when (PDT_time >= '2019-06-24 00:00:00') and (PDT_time < '2019-07-01 00:00:00') then '8'
            else NULL
        end
        as week
    from time_converted_table), day as
    (select week_dummy.*, 
        case 
            -- overnight indicator: 7pm to 7am of the next day
            when (hofd >= 19 and hofd <= 23) or (hofd >= 0 and hofd < 7) then 'overnight'
            -- weekday daytime indicator: 8am to 6pm from mon to fri
            when (weekday = 'Monday' or weekday = 'Tuesday' or weekday = 'Wednesday' or weekday = 'Thursday' or weekday = 'Friday') and (hofd >= 8) and (hofd < 18) then 'weekday_daytime'
            else NULL
        end
        as time_indicator
    from week_dummy
    )
    select day.*,
    --calculate a total num_overnight_pings first
        countif(time_indicator = 'overnight') OVER (PARTITION BY day.caid, day.geohash7) as num_records_overnight, 
        countif(time_indicator = 'weekday_daytime') OVER (PARTITION BY day.caid, day.geohash7) as num_records_weekday_daytime,
        avg(hofd) OVER (PARTITION BY caid, geohash7) as avg_hofd,
        avg(latitude) OVER (PARTITION BY caid, geohash7) as avg_lat, 
        avg(longitude) OVER (PARTITION BY caid, geohash7) as avg_lng,
    from day
'''

query_job = client.query(query_label, job_config=job_config)
query_job.result() 

<google.cloud.bigquery.table.RowIterator at 0x20b7fee79a0>

In [23]:
# count number of records at each geohash7 region, and take the average of lat & lng
# take only the most common overnight location as a candidate for home
# and take the first and second most common weekday daytime location as candidates for work
destination_table = 'firm-exit.veraset_visits.time_converted_raw_ping_week'
job_config = bigquery.QueryJobConfig(write_disposition='WRITE_TRUNCATE', # overwrite
                                     destination=destination_table,
                                     default_dataset = default_dataset)


query_group = f'''
    with grouped_raw_ping as
    -- collapse data to week-device-geohash7 region,select the weekly raw_ping for first round selection
    (select 
        num_records_overnight as num_records_overnight,
        num_records_weekday_daytime as num_records_weekday_daytime,
        avg_hofd as avg_hofd,
        caid as caid, 
        countif(time_indicator = 'overnight') OVER (PARTITION BY caid, geohash7,week) as num_records_overnight_week, 
        countif(time_indicator = 'weekday_daytime') OVER (PARTITION BY caid, geohash7,week) as num_records_weekday_daytime_week,
        week as week,
        avg_lat as avg_lat,
        avg_lng as avg_lng,
        avg(hofd) OVER (PARTITION BY caid, geohash7,week) as avg_hofd_week, 
        geohash7 as geohash7
    from `firm-exit.veraset_visits.time_converted_raw_ping_week`
    ), ranked_raw_ping as
    -- rank counted number of records by weeks (there could be ties)
    (select
        grouped_raw_ping.*,
        dense_rank() over (partition by grouped_raw_ping.caid,grouped_raw_ping.week order by grouped_raw_ping.num_records_overnight_week DESC) as rank_home_week,
        dense_rank() over (partition by grouped_raw_ping.caid,grouped_raw_ping.week order by grouped_raw_ping.num_records_weekday_daytime_week DESC) as rank_work_week
     from grouped_raw_ping
    )
    select 
        *
    from ranked_raw_ping
    -- keep only top ranked home location and top and second ranked work location (in case top work is already identified as home)
    where (rank_home_week = 1 and num_records_overnight != 0) or (rank_work_week = 1 and num_records_weekday_daytime !=0) or (rank_work_week = 2 and num_records_weekday_daytime !=0)
    order by caid
'''


query_job = client.query(query_group, job_config=job_config)
query_job.result() 

<google.cloud.bigquery.table.RowIterator at 0x20b7ff37b80>

In [26]:
destination_table = 'firm-exit.veraset_visits.time_converted_raw_ping_week'
job_config = bigquery.QueryJobConfig(write_disposition='WRITE_TRUNCATE', # overwrite
                                     destination=destination_table,
                                     default_dataset = default_dataset)
query_group = f'''
    select distinct*
    FROM `firm-exit.veraset_visits.time_converted_raw_ping_week`
'''

query_job = client.query(query_group, job_config=job_config)
query_job.result() 


<google.cloud.bigquery.table.RowIterator at 0x20b7feb9b80>

In [31]:
# obtain data from gcs
# export to gcs and download
project = "firm-exit"
dataset_id = "veraset_visits"
table_id = "time_converted_raw_ping_week"

destination_uri = "gs://{}/{}".format('veraset_temp', "home_work_week_candidates*.csv")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
    location="US",
)  # API request
extract_job.result()  # Waits for job to complete.

print(
    "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)

# download data from gcs to our dropbox, first download 1 - 10 and then 10 - 43
file_index = [i for i in range(17,26)]

for x in file_index:
    # Initialise a client
    storage_client = storage.Client("firm-exit")
    # Create a bucket object for our bucket
    bucket = storage_client.get_bucket("veraset_temp")
    # Create a blob object from the filepath
    blob = bucket.blob("home_work_week_candidates0000000000{}.csv".format(str(x)))
    # Download the file to a destination
    blob.download_to_filename(path + '/Dropbox/Amenity/data/derived/veraset_gravy_gps_sample/veraset/home_work_week_candidates_{}.csv'.format(str(x)))
    # Delete after exporting, otherwise costs storage fees
    blob.delete()

Exported firm-exit:veraset_visits.time_converted_raw_ping_week to gs://veraset_temp/home_work_week_candidates*.csv


In [None]:
#download to local and run stata then run

In [12]:
# import home_work_candidates
read_path = path + '/Dropbox/Amenity/data/derived/veraset_gravy_gps_sample/veraset'
all_files = glob.glob(read_path + "/home_work_candidates_*.csv")

temp = []

for filename in all_files:
    df = pd.read_csv(filename, index_col=None, header=0)
    temp.append(df)

home_work_candidates = pd.concat(temp, axis=0, ignore_index=True)
home_work_candidates.head()

Unnamed: 0,caid,num_records_overnight,num_records_weekday_daytime,avg_lat,avg_lng,avg_hofd,geohash7,rank_home,rank_work
0,0000025006594b60185fd44d02aa45689a6c63e47b7037...,0,2,34.0617,-118.400051,16.5,9q5cc8x,10,1
1,0000025006594b60185fd44d02aa45689a6c63e47b7037...,30,0,33.7908,-118.3248,19.818182,9q5b5rq,1,2
2,00000488029787044d3751df4d476a7a0ea1c5b1e8bb66...,0,1,33.963962,-118.33493,13.0,9q5c5nz,2,1
3,00000488029787044d3751df4d476a7a0ea1c5b1e8bb66...,10,0,34.045853,-118.32173,10.133333,9q5cetb,1,2
4,0000096512b9cce231abdec8d6ea460d2e750123161cb5...,1,2,34.68609,-120.43683,15.666667,9q4m76k,2,2


In [13]:
# take home candidates from home_work_candidates
# there could be multiple home location candidates for a single device
home_candidates = home_work_candidates[(home_work_candidates['rank_home']==1) & (home_work_candidates['num_records_overnight']!=0)]
display(home_candidates.count(), home_candidates.caid.nunique())

# take work candidates from home_work_candidates
work_candidates = home_work_candidates[(home_work_candidates['rank_work'] <= 2) & (home_work_candidates['num_records_weekday_daytime']!=0)]
display(work_candidates.count(), work_candidates.caid.nunique())

caid                           6616967
num_records_overnight          6616967
num_records_weekday_daytime    6616967
avg_lat                        6616967
avg_lng                        6616967
avg_hofd                       6616967
geohash7                       6616967
rank_home                      6616967
rank_work                      6616967
dtype: int64

6060053

caid                           12647428
num_records_overnight          12647428
num_records_weekday_daytime    12647428
avg_lat                        12647428
avg_lng                        12647428
avg_hofd                       12647428
geohash7                       12647428
rank_home                      12647428
rank_work                      12647428
dtype: int64

5761051

In [14]:
home_candidates = home_candidates.assign(ties = home_candidates.duplicated(subset=['caid'], keep=False)) # mark ties

# for ties, take the one with avg_hofd closest to 1am (mid of 7pm to 7am overnight window)
# not the median/mean hofd of all home candidates because the mean/median is in daytime
home_candidates['dev_1am'] = home_candidates['avg_hofd'].apply(lambda x: abs(x-1) if x <= 13 else abs(25-x))
home_candidates['rank_ties'] = home_candidates[home_candidates['ties']==True].groupby('caid')['dev_1am'].rank(method="min", ascending=True)

home_candidates = home_candidates[(home_candidates['rank_ties']==1)|(home_candidates['ties']==False)]

In [15]:
# for remaining ties, no reason to prefer one to another, select any [only a very small proportion]
home_candidates['still_ties']=home_candidates.duplicated(subset=['caid'], keep=False) # mark remaining ties
home_candidates = home_candidates.drop_duplicates(subset=['caid'], keep='first')

display(home_candidates.caid.nunique())
home_locations = home_candidates

home_locations.to_csv(path+'/Dropbox/Amenity/data/analysis/veraset_gravy_gps_sample/new_veraset_home_locations.csv', sep=',', mode='w')

6060053

In [16]:
# join identified home locations to work candidates, and remove these already identified home locations
identified_homes = home_locations[['caid', 'geohash7']]
identified_homes.columns = ['caid','home_geohash7']
work_candidates=work_candidates.join(identified_homes.set_index('caid'), on='caid')

In [17]:
# count the number of work candidates that are ranked top, second
num_top = work_candidates[work_candidates['rank_work']==1].groupby('caid')['caid'].count().to_frame('num_top')
work_candidates = work_candidates.join(num_top, on='caid')

num_second = work_candidates[work_candidates['rank_work']==2].groupby('caid')['caid'].count().to_frame('num_second')
work_candidates = work_candidates.join(num_second, on='caid')
work_candidates['num_second'] = work_candidates['num_second'].fillna(0)

In [18]:
# narrow down the candidate pool a bit, still allowing ties
# remove home locations, except for when there's no other back-up work locations
work_candidates = work_candidates[(work_candidates['geohash7']!=work_candidates['home_geohash7']) 
                                  | ((work_candidates['geohash7']==work_candidates['home_geohash7']) & (work_candidates['num_top']==1) & (work_candidates['num_second']==0))]

# now that work candidates no more overlap with home locations (except for potential WFH)
# let's re-rank work candidates and keep only new top ranked work candidates
work_candidates['new_rank_work'] = work_candidates.groupby('caid')['num_records_weekday_daytime'].rank(method="min",ascending=False)
work_candidates = work_candidates[work_candidates['new_rank_work']==1]

In [19]:
# now do the same done with home candidates to select among ties
work_candidates['ties'] = work_candidates.duplicated(subset=['caid'], keep=False) # mark ties

# for ties, take the one with avg_hofd closest to 1pm (mid of 8am to 6pm overnight window)
# very close to median/mean hofd of all work candidates
work_candidates['dev_1pm'] = work_candidates['avg_hofd'].apply(lambda x: abs(x-13))
work_candidates['rank_ties'] = work_candidates[work_candidates['ties']==True].groupby('caid')['dev_1pm'].rank(method="min", ascending=True)

work_candidates = work_candidates[(work_candidates['rank_ties']==1)|(work_candidates['ties']==False)]

In [20]:
# for remaining ties, no reason to prefer one to another, select any [only a very small proportion]
work_candidates['still_ties']=work_candidates.duplicated(subset=['caid'], keep=False) # mark remaining ties
display(work_candidates.caid.nunique(), work_candidates[work_candidates['still_ties']==True].count())

work_candidates = work_candidates.drop_duplicates(subset=['caid'], keep='first')
work_locations = work_candidates
work_locations.to_csv(path+'/Dropbox/Amenity/data/analysis/veraset_gravy_gps_sample/new_veraset_work_locations.csv', sep=',', mode='w')

5761051

caid                           210012
num_records_overnight          210012
num_records_weekday_daytime    210012
avg_lat                        210012
avg_lng                        210012
avg_hofd                       210012
geohash7                       210012
rank_home                      210012
rank_work                      210012
home_geohash7                  192066
num_top                        210012
num_second                     210012
new_rank_work                  210012
ties                           210012
dev_1pm                        210012
rank_ties                      210012
still_ties                     210012
dtype: int64

In [21]:
# potential WFH, i.e., the home location is the same as the work location
work_candidates[work_candidates['geohash7']==work_candidates['home_geohash7']].count()

caid                           424989
num_records_overnight          424989
num_records_weekday_daytime    424989
avg_lat                        424989
avg_lng                        424989
avg_hofd                       424989
geohash7                       424989
rank_home                      424989
rank_work                      424989
home_geohash7                  424989
num_top                        424989
num_second                     424989
new_rank_work                  424989
ties                           424989
dev_1pm                        424989
rank_ties                           0
still_ties                     424989
dtype: int64

In [22]:
num_devices = 6566991 # number of unique devices from raw ping in ca
num_home_identified = home_locations.caid.nunique() # exactly the number of devices in home_work_candidates
num_work_identified = work_locations.caid.nunique() # exactly the number of devices in home_work_candidates
percent_home = "{:.2%}".format(num_home_identified/num_devices)
percent_work = "{:.2%}".format(num_work_identified/num_devices)

# for those we can identify both their home and work locations
home_work_locations = home_locations.join(work_locations.set_index('caid'), on='caid', how='inner', lsuffix='_home', rsuffix='_work')
num_home_work_identified = home_work_locations.caid.nunique()
percent_home_work = "{:.2%}".format(num_home_work_identified/num_devices) 

print("The total number of unique IDs in raw ping data that have have been to at least one non-home & non-work POI in CA is {}".format(num_devices))
print("The percentage of home locations identified is {}".format(percent_home))
print("The percentage of work locations identified is {}".format(percent_work))
print("The percentage of both home & work locations identified is {}".format(percent_home_work))

The total number of unique IDs in raw ping data that have have been to at least one non-home & non-work POI in CA is 6566991
The percentage of home locations identified is 92.28%
The percentage of work locations identified is 87.73%
The percentage of both home & work locations identified is 81.80%
