# Process POI visitation data for DiD modeling

In [1]:
%load_ext autoreload
%autoreload 2
%cd D:\nine-euro-ticket-de

D:\nine-euro-ticket-de


In [2]:
# Load libs
import pandas as pd
import os
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd
from tqdm import tqdm
import workers
import sqlalchemy
import numpy as np
import wquantiles
import time
from statsmodels.stats.weightstats import DescrStatsW

In [3]:
# Data location
user = workers.keys_manager['database']['user']
password = workers.keys_manager['database']['password']
port = workers.keys_manager['database']['port']
db_name = workers.keys_manager['database']['name']
engine = sqlalchemy.create_engine(f'postgresql://{user}:{password}@localhost:{port}/{db_name}?gssencmode=disable')

In [4]:
# Pyspark set up
os.environ['JAVA_HOME'] = "C:/Java/jdk-1.8"
from pyspark.sql import SparkSession
import sys
from pyspark.sql.types import *
from pyspark import SparkConf
# Set up pyspark
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Create new context
spark_conf = SparkConf().setMaster("local[18]").setAppName("MobiSeg")
spark_conf.set("spark.executor.heartbeatInterval","3600s")
spark_conf.set("spark.network.timeout","7200s")
spark_conf.set("spark.sql.files.ignoreCorruptFiles","true")
spark_conf.set("spark.driver.memory", "56g")
spark_conf.set("spark.driver.maxResultSize", "0")
spark_conf.set("spark.executor.memory","8g")
spark_conf.set("spark.memory.fraction", "0.6")
spark_conf.set("spark.sql.session.timeZone", "UTC")
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
java_version = spark._jvm.System.getProperty("java.version")
print(f"Java version used by PySpark: {java_version}")
print('Web UI:', spark.sparkContext.uiWebUrl)

Java version used by PySpark: 1.8.0_401
Web UI: http://C19YUEI.net.chalmers.se:4040


## 1. POI visitation data

In [5]:
data_folder = os.path.join('dbs/poi2visits_day_sg/')
paths2stops = {int(x.split('_')[-1].split('.')[0]): os.path.join(data_folder, x)\
               for x in list(os.walk(data_folder))[0][2]}
paths2stops_list = list(paths2stops.values())
paths2stops_list[0]

'dbs/poi2visits_day_sg/stops_0.parquet'

In [6]:
df = pd.read_parquet(paths2stops_list[0])
df.iloc[0]

device_aid    00080961-f0ed-642e-aec2-e9903f704320
date                                    2023-05-02
dur                                          93.85
year                                          2023
week                                            18
weekday                                          1
osm_id                                 141575416.0
label                  Recreation & Sports Centres
theme                  Recreation & Sports Centres
month                                            5
period                                           0
d_h                                       0.853881
wt_p                                     26.494253
grdi_grp                                         M
Name: 0, dtype: object

## 2. Auxiliary data
### 2.1 Precipitation

In [7]:
df_p = pd.read_sql("""SELECT station_id, date, "RS" AS precipitation FROM precipitation.daily;""", con=engine)
df_poi2p = pd.read_sql("""SELECT osm_id, station_id FROM precipitation.poi_station;""", con=engine)
df_p.head()

Unnamed: 0,station_id,date,precipitation
0,6,20190501,0.0
1,6,20190502,3.4
2,6,20190503,1.6
3,6,20190504,7.0
4,6,20190505,0.0


In [8]:
df_p.loc[:, 'date_time'] = pd.to_datetime(df_p['date'].astype(str), format='%Y%m%d')

### 2.2 PT access

In [9]:
df_pt = pd.read_sql("""SELECT osm_id, pt_station_num FROM public_transport.poi_pt_station;""", con=engine)

### 2.3 National holiday

In [10]:
holidays = [np.datetime64(pd.to_datetime(x, format='%Y%m%d')) 
            for x in [20190501, 20190530, 20190610, 
                      20220501, 20220526, 20220606, 
                      20230501, 20230518, 20230529]]

## 3. Processing visitation data

In [25]:
# Time processing
df.loc[:, 'date_time'] = pd.to_datetime(df['date'].astype(str), format='%Y-%m-%d')
print(f'{len(df)} visits on {df.osm_id.nunique()} locations from {df.device_aid.nunique()} devices.')

# Filter out national holidays
df.loc[:, 'date_time'] = df.loc[~df['date_time'].isin(holidays), :]
print(f'After removing holidays, {len(df)} visits on {df.osm_id.nunique()} locations from {df.device_aid.nunique()} devices.')

# Add pt stations
df = pd.merge(df, df_pt, on='osm_id', how='left')
df.dropna(inplace=True)
df.drop_duplicates(subset=['device_aid', 'date', 'dur', 'osm_id'], inplace=True)
print(f'After adding public stops, {len(df)} visits on {df.osm_id.nunique()} locations from {df.device_aid.nunique()} devices.')

328877 visits on 60887 locations from 7648 devices.
After removing holidays, 328877 visits on 60887 locations from 7648 devices.
After adding public stops, 314134 visits on 60328 locations from 7646 devices.


In [17]:
df.head()

Unnamed: 0,device_aid,date,dur,year,week,weekday,osm_id,label,wt_p,theme,month,period,d_h,date_time,pt_station_num
0,001f24b1-26df-4949-a61c-abef1185ea8f,2022-06-20,179.983333,2022,25,0,404916112.0,Restaurant,5.666667,Food & Beverage,6,1,0.053544,2022-06-20,8
1,001f24b1-26df-4949-a61c-abef1185ea8f,2022-07-11,179.983333,2022,28,0,404916112.0,Restaurant,5.666667,Food & Beverage,7,1,0.053544,2022-07-11,8
2,001f24b1-26df-4949-a61c-abef1185ea8f,2022-07-14,186.333333,2022,28,3,695230966.0,Accomodations,5.666667,Outdoor & Recreational areas,7,1,121.309082,2022-07-14,48
3,001f24b1-26df-4949-a61c-abef1185ea8f,2022-07-16,181.583333,2022,28,5,404057116.0,Place of worship,5.666667,Community & Social Services,7,1,3.160769,2022-07-16,16
4,001f24b1-26df-4949-a61c-abef1185ea8f,2022-07-23,180.15,2022,29,5,404973915.0,Supermarket,5.666667,Food & Beverage,7,1,0.836989,2022-07-23,15


In [41]:
# Add precipitation information
df_rain = df.drop_duplicates(subset=['date_time', 'osm_id'])[['date_time', 'osm_id']]
print(f'For precipitation data: {len(df_rain)} pairs on {df_rain.osm_id.nunique()} locations from {df_rain.date_time.nunique()} days.')

For precipitation data: 223456 pairs on 60328 locations from 450 days.


In [42]:
df_rain = pd.merge(df_rain, df_poi2p, on='osm_id', how='left')
df_rain.loc[:, 'station_id'] = df_rain.loc[:, 'station_id'].apply(lambda x: [int(j) for j in x.split(',') if j != ''])
df_rain.head()

Unnamed: 0,date_time,osm_id,station_id
0,2022-06-20,404916112.0,"[15523, 2249, 3939, 15156, 14028, 1251, 14304,..."
1,2022-07-11,404916112.0,"[15523, 2249, 3939, 15156, 14028, 1251, 14304,..."
2,2022-07-14,695230966.0,"[14043, 14079, 14032, 14049, 19100, 161, 13691..."
3,2022-07-16,404057116.0,"[2249, 15523, 3939, 15156, 1251, 19244, 14028,..."
4,2022-07-23,404973915.0,"[2249, 3939, 15156, 1251, 14304, 19244, 13778,..."


In [56]:
df_rec_list = []
for i in tqdm(range(0, 15), desc='Searching for precipitation'):
    df_rain.loc[:, 'station'] = df_rain.loc[:, 'station_id'].apply(lambda x: x[i] if len(x) > i else None)
    df_rec = pd.merge(df_rain[['date_time', 'osm_id', 'station']],
                      df_p[['date_time', 'station_id', 'precipitation']].rename(columns={'station_id': 'station'}),
                      on=['station', 'date_time'], how='left')
    df_rec.dropna(inplace=True)
    df_rec_list.append(df_rec)

Searching for precipitation: 100%|██████████| 15/15 [00:04<00:00,  3.71it/s]


In [58]:
df_rec = pd.concat(df_rec_list)
df_rec.drop_duplicates(subset=['date_time', 'osm_id'], keep='first', inplace=True)
print(f'For precipitation data: {len(df_rec)/len(df_rain)*100} % have records.')

For precipitation data: 98.34445016054354 % have records.


In [59]:
df_rec.head()

Unnamed: 0,date_time,osm_id,station,precipitation
3,2022-07-16,404057116.0,2249.0,0.0
4,2022-07-23,404973915.0,2249.0,0.1
5,2022-08-06,404973915.0,2249.0,0.0
8,2022-08-13,404974714.0,2249.0,0.0
9,2022-08-16,404973915.0,2249.0,0.0


In [60]:
# Add precipitation
df = pd.merge(df, df_rec[['osm_id', 'date_time', 'precipitation']], on=['osm_id', 'date_time'], how='left')
df.dropna(inplace=True)
df.drop_duplicates(subset=['device_aid', 'date', 'dur', 'osm_id'], inplace=True)
print(f'After adding precipitation, {len(df)} visits on {df.osm_id.nunique()} locations from {df.device_aid.nunique()} devices.')

After adding precipitation, 311700 visits on 59814 locations from 7638 devices.


## 4. Scale up the processing

In [11]:
def visitation_enrichment(data=None):
    # Time processing
    data.loc[:, 'date_time'] = pd.to_datetime(data['date'].astype(str), format='%Y-%m-%d')
    #print(f'{len(data)} visits on {data.osm_id.nunique()} locations from {data.device_aid.nunique()} devices.')

    # Filter out national holidays
    data.loc[:, 'date_time'] = data.loc[~data['date_time'].isin(holidays), :]
    #print(f'After removing holidays, {len(data)} visits on {data.osm_id.nunique()} locations from {data.device_aid.nunique()} devices.')
    
    # Add pt stations
    data = pd.merge(data, df_pt, on='osm_id', how='left')
    data.dropna(inplace=True)
    data.drop_duplicates(subset=['device_aid', 'date', 'dur', 'osm_id'], inplace=True)
    #print(f'After adding public stops, {len(data)} visits on {data.osm_id.nunique()} locations from {data.device_aid.nunique()} devices.')
    
    # Add precipitation information
    df_rain = data.drop_duplicates(subset=['date_time', 'osm_id'])[['date_time', 'osm_id']]
    #print(f'For precipitation data: {len(df_rain)} pairs on {df_rain.osm_id.nunique()} locations from {df_rain.date_time.nunique()} days.')
    df_rain = pd.merge(df_rain, df_poi2p, on='osm_id', how='left')
    df_rain.loc[:, 'station_id'] = df_rain.loc[:, 'station_id'].apply(lambda x: [int(j) for j in x.split(',') if j != ''])
    
    df_rec_list = []
    for i in range(0, 15):
        df_rain.loc[:, 'station'] = df_rain.loc[:, 'station_id'].apply(lambda x: x[i] if len(x) > i else None)
        df_rec = pd.merge(df_rain[['date_time', 'osm_id', 'station']],
                          df_p[['date_time', 'station_id', 'precipitation']].rename(columns={'station_id': 'station'}),
                          on=['station', 'date_time'], how='left')
        df_rec.dropna(inplace=True)
        df_rec_list.append(df_rec)
    df_rec = pd.concat(df_rec_list)
    df_rec.drop_duplicates(subset=['date_time', 'osm_id'], keep='first', inplace=True)
    #print(f'For precipitation data: {len(df_rec)/len(df_rain)*100} % have records.')
    # Add precipitation
    data = pd.merge(data, df_rec[['osm_id', 'date_time', 'precipitation']], on=['osm_id', 'date_time'], how='left')
    data.dropna(inplace=True)
    data.drop_duplicates(subset=['device_aid', 'date', 'dur', 'osm_id'], inplace=True)
    #print(f'After adding precipitation, {len(data)} visits on {data.osm_id.nunique()} locations from {data.device_aid.nunique()} devices.')
    return data, list(data.osm_id.unique()), data.device_aid.nunique(), len(data)

In [12]:
osm_id_list = []
devices_count = 0
visits_count = 0
for k, v in tqdm(paths2stops.items(), desc='Processing batches'):
    df = pd.read_parquet(v)
    df_processed, osms, no_devices, no_visits = visitation_enrichment(data=df)
    osm_id_list += osms
    devices_count += no_devices
    visits_count += no_visits
    df_processed.to_parquet(f'dbs/poi2visits_day_did/stops_{k}.parquet', index=False)
osm_id_list = list(set(osm_id_list))

Processing batches: 100%|██████████| 300/300 [52:01<00:00, 10.41s/it]


In [13]:
print(f"In total, {visits_count} visits to {len(osm_id_list)} unique locations from {devices_count} devices are stored.")

In total, 144036305 visits to 740172 unique locations from 3544025 devices are stored.


## 5. Daily visitation calculation
This part of the results work for the DiD prototypes/time-shifted DiD models.

In [40]:
data_folder = os.path.join('dbs/poi2visits_day_did/')
paths2stops = {int(x.split('_')[-1].split('.')[0]): os.path.join(data_folder, x)\
               for x in list(os.walk(data_folder))[0][2]}
paths2stops_list = paths2stops.values()
df = spark.read.parquet(*paths2stops_list)
df.show(5)

+--------------------+----------+------------------+----+----+-------+------------+--------------------+-----------------+--------------------+-----+------+------------------+-------------------+--------------+-------------+
|          device_aid|      date|               dur|year|week|weekday|      osm_id|               label|             wt_p|               theme|month|period|               d_h|          date_time|pt_station_num|precipitation|
+--------------------+----------+------------------+----+----+-------+------------+--------------------+-----------------+--------------------+-----+------+------------------+-------------------+--------------+-------------+
|0002a3c0-8a6c-6dd...|2023-05-02|179.98333333333332|2023|  18|      1|3.48672634E8|Recreation & Spor...|2.227272727272727|Recreation & Spor...|    5|     0| 4.430065336516507|2023-05-02 00:00:00|             7|          2.5|
|0002a3c0-8a6c-6dd...|2023-05-02|16.666666666666668|2023|  18|      1| 2.2235059E8|Automotive and se

In [41]:
df_cat = pd.read_excel('dbs/poi/categories.xlsx').rename(columns={'category': 'theme', 'subcategory': 'label'})
label_list = df_cat['label'].unique()

In [42]:
def visit_patterns(data):
    data.loc[:, 'date'] = data.loc[:, 'date'].astype(str)
    metrics_dict = dict()
    # osm_id info
    for var in ('osm_id', 'date', 'year', 'month', 'weekday', 'theme', 'label', 'precipitation', 'pt_station_num'):
        metrics_dict[var] = data[var].values[0]
    # Visits
    metrics_dict['num_visits'], metrics_dict['num_visits_wt'] = len(data), data['wt_p'].sum()
    metrics_dict['num_unique_device'] = data.device_aid.nunique()
    # Duration
    metrics_dict['dur_total'], metrics_dict['dur_total_wt'] = sum(data['dur']), sum(data['dur'] * data['wt_p'])   # min
    metrics_dict['dur_m'] = wquantiles.median(data['dur'], data['wt_p'])
    # Distance from home
    ## Weighted percentiles
    d, wt = data.loc[data['d_h'] > 0, 'd_h'], data.loc[data['d_h'] > 0, 'wt_p']
    wdf = DescrStatsW(d, weights=wt, ddof=1)
    sts = wdf.quantile([0.25, 0.5, 0.75])
    bds = sts.values
    metrics_dict['d_h25_wt'], metrics_dict['d_h50_wt'], metrics_dict['d_h75_wt'] = bds[0], bds[1], bds[2]
    ## Unweighted percentiles
    metrics_dict['d_h25'] = np.quantile(d, 0.25)
    metrics_dict['d_h50'] = np.quantile(d, 0.5)
    metrics_dict['d_h75'] = np.quantile(d, 0.75)
    ## weighted average
    d_lg = d.apply(lambda x: np.log10(x))
    metrics_dict['d_ha_wt'] = 10**np.average(d_lg, weights=wt)
    ## unweighted average
    metrics_dict['d_ha'] = 10**np.average(d_lg)
    return pd.DataFrame(metrics_dict, index=[0])  # pd.DataFrame(metrics_dict, index=[0])


## Find visitation stats
schema_stats = StructType([
    StructField('osm_id', IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("weekday", IntegerType(), True),
    StructField("theme", StringType(), True),
    StructField("label", StringType(), True),
    StructField("pt_station_num", IntegerType(), True),
    StructField("precipitation", DoubleType(), True),
    StructField("num_visits", IntegerType(), True),
    StructField("num_visits_wt", DoubleType(), True),
    StructField("num_unique_device", IntegerType(), True),
    StructField('dur_total', DoubleType(), True),
    StructField('dur_total_wt', DoubleType(), True),
    StructField('dur_m', DoubleType(), True),
    StructField('d_h25', DoubleType(), True),
    StructField('d_h50', DoubleType(), True),
    StructField('d_h75', DoubleType(), True),
    StructField('d_h25_wt', DoubleType(), True),
    StructField('d_h50_wt', DoubleType(), True),
    StructField('d_h75_wt', DoubleType(), True),
    StructField('d_ha', DoubleType(), True),
    StructField('d_ha_wt', DoubleType(), True)
])

In [44]:
labels4test = ['Restaurant', 'Supermarket', 'Recreation & Sports Centres', 'Retail stores']

In [46]:
for lb in label_list:
    if lb not in labels4test:
        start = time.time()
        df_v = df.filter(df.label == lb).\
            groupby(['osm_id', 'date_time']).applyInPandas(visit_patterns, schema=schema_stats)
        df_v.toPandas().to_parquet(f"dbs/visits_day_did/{lb}.parquet", index=False)
        delta_t = (time.time() - start) // 60
        print(f"Label {lb} processed and saved in {delta_t} minutes.")

Label Automotive and services processed and saved in 33.0 minutes.
Label Home & Lifestyle processed and saved in 18.0 minutes.
Label Office processed and saved in 31.0 minutes.
Label Accomodations processed and saved in 27.0 minutes.
Label Art & Culture processed and saved in 9.0 minutes.
Label Café processed and saved in 14.0 minutes.
Label Entertainment venues processed and saved in 1.0 minutes.
Label Fast food processed and saved in 25.0 minutes.
Label Games and activities processed and saved in 1.0 minutes.
Label Health care and services processed and saved in 34.0 minutes.
Label Historic processed and saved in 32.0 minutes.
Label Information and services processed and saved in 23.0 minutes.
Label Nightclub processed and saved in 3.0 minutes.
Label Parks and gardens processed and saved in 1.0 minutes.
Label Recreational facilities processed and saved in 23.0 minutes.
Label Tourist attractions processed and saved in 4.0 minutes.
Label Viewing and observation processed and saved in 3

### 5.1 Organize places by state

In [61]:
# Find osm_id: state
gdf_state = gpd.read_file("dbs/geo/vg2500_12-31.utm32s.shape/vg2500/vg2500_LAN.shp").to_crs(4326)
gdf_state = gdf_state.loc[gdf_state['GF'] == 9, :].rename(columns={'GEN': 'state'})
gdf_poi_c = gpd.read_postgis(f"""SELECT osm_id, geom FROM poi;""", con=engine)
gdf_poi_c = gdf_poi_c.sjoin(gdf_state[['state', 'geometry']])
gdf_poi_c.dropna(inplace=True)

In [62]:
state_list = list(gdf_poi_c.state.unique())
print(f"No. of states covered: {len(state_list)}")

No. of states covered: 16


In [52]:
data_folder = os.path.join('dbs/visits_day_did/')
paths2stops = {x.split('.')[0]: os.path.join(data_folder, x)\
               for x in list(os.walk(data_folder))[0][2]}
paths2stops_list = list(paths2stops.values())
paths2stops_list[0]

'dbs/visits_day_did/Accomodations.parquet'

In [56]:
def load_data_state(fname = None, gdf_poi=None, threshold_v=5, state2select='Berlin'):
    df_v = pd.read_parquet(fname)
    cols = ['osm_id', 'date', 'year', 'month', 'weekday', 'theme', 'label',
            'num_visits', 'num_visits_wt', 'd_ha', 'd_ha_wt', 
            'precipitation', 'pt_station_num']
    df_v = df_v.loc[df_v.num_visits >= threshold_v, cols]
    # Add state
    df_v = pd.merge(df_v, gdf_poi[['osm_id', 'state']], on='osm_id', how='left')
    return df_v.loc[df_v.state == state2select, :]

In [63]:
for st in state_list:
    print(f'Fetching {st}')
    df_v_list = []
    for lb, lb_f in tqdm(paths2stops.items(), desc='Collecting one state'):
        start = time.time()
        df_v = load_data_state(fname = lb_f, gdf_poi=gdf_poi_c, threshold_v=1, state2select=st)
        df_v_list.append(df_v)
        delta_t = (time.time() - start) // 60
        # print(f"Label {lb} processed and saved in {delta_t} minutes.")
    df_v = pd.concat(df_v_list)
    df_v.to_parquet(f'dbs/visits_day_did_states/{st}.parquet', index=False)
    print(f"State {st}: {len(df_v)} visits to {df_v.osm_id.nunique()} unique locations.")

Fetching Bayern


Collecting one state: 100%|██████████| 52/52 [00:42<00:00,  1.23it/s]


State Bayern: 5552797 visits to 121835 unique locations.
Fetching Baden-Württemberg


Collecting one state: 100%|██████████| 52/52 [00:41<00:00,  1.26it/s]


State Baden-Württemberg: 4882778 visits to 100855 unique locations.
Fetching Rheinland-Pfalz


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.27it/s]


State Rheinland-Pfalz: 1998808 visits to 37734 unique locations.
Fetching Nordrhein-Westfalen


Collecting one state: 100%|██████████| 52/52 [00:41<00:00,  1.26it/s]


State Nordrhein-Westfalen: 9552651 visits to 125046 unique locations.
Fetching Niedersachsen


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.28it/s]


State Niedersachsen: 3754600 visits to 66760 unique locations.
Fetching Schleswig-Holstein


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.29it/s]


State Schleswig-Holstein: 1370333 visits to 24128 unique locations.
Fetching Hamburg


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.30it/s]


State Hamburg: 1193919 visits to 13772 unique locations.
Fetching Bremen


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.28it/s]


State Bremen: 361111 visits to 4450 unique locations.
Fetching Hessen


Collecting one state: 100%|██████████| 52/52 [00:41<00:00,  1.25it/s]


State Hessen: 3145622 visits to 47919 unique locations.
Fetching Saarland


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.30it/s]


State Saarland: 436617 visits to 8045 unique locations.
Fetching Thüringen


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.29it/s]


State Thüringen: 877460 visits to 21019 unique locations.
Fetching Sachsen-Anhalt


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.29it/s]


State Sachsen-Anhalt: 1215683 visits to 19666 unique locations.
Fetching Sachsen


Collecting one state: 100%|██████████| 52/52 [00:41<00:00,  1.26it/s]


State Sachsen: 1824887 visits to 40010 unique locations.
Fetching Brandenburg


Collecting one state: 100%|██████████| 52/52 [00:39<00:00,  1.30it/s]


State Brandenburg: 1171135 visits to 22242 unique locations.
Fetching Mecklenburg-Vorpommern


Collecting one state: 100%|██████████| 52/52 [00:40<00:00,  1.29it/s]


State Mecklenburg-Vorpommern: 841633 visits to 17163 unique locations.
Fetching Berlin


Collecting one state: 100%|██████████| 52/52 [00:39<00:00,  1.30it/s]


State Berlin: 1859094 visits to 18598 unique locations.
