# Visitation patterns
Output: `dbs/visits_day_sg/`

In [None]:
%load_ext autoreload
%autoreload 2
%cd D:\d-ticket-de

In [43]:
# Load libs
import pandas as pd
import os
os.environ['USE_PYGEOS'] = '0'
import workers
import h3
from tqdm import tqdm
import sqlalchemy
import numpy as np
from p_tqdm import p_map
import pickle

In [3]:
# Pyspark set up
os.environ['JAVA_HOME'] = "C:/Java/jdk-1.8"
from pyspark.sql import SparkSession
import sys
from pyspark import SparkConf
# Set up pyspark
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Create new context
spark_conf = SparkConf().setMaster("local[18]").setAppName("MobiSeg")
spark_conf.set("spark.executor.heartbeatInterval","3600s")
spark_conf.set("spark.network.timeout","7200s")
spark_conf.set("spark.sql.files.ignoreCorruptFiles","true")
spark_conf.set("spark.driver.memory", "56g")
spark_conf.set("spark.driver.maxResultSize", "0")
spark_conf.set("spark.executor.memory","8g")
spark_conf.set("spark.memory.fraction", "0.6")
spark_conf.set("spark.sql.session.timeZone", "UTC")
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
java_version = spark._jvm.System.getProperty("java.version")
print(f"Java version used by PySpark: {java_version}")
print('Web UI:', spark.sparkContext.uiWebUrl)

Java version used by PySpark: 1.8.0_401
Web UI: http://C19YUEI.net.chalmers.se:4040


In [4]:
# Data location
user = workers.keys_manager['database']['user']
password = workers.keys_manager['database']['password']
port = workers.keys_manager['database']['port']
db_name = workers.keys_manager['database']['name']
engine = sqlalchemy.create_engine(f'postgresql://{user}:{password}@localhost:{port}/{db_name}?gssencmode=disable')

## 1. Device filtering

In [15]:
df_d = pd.read_sql("""SELECT device_aid, "2019", "2022", "2023", grid_1km FROM home_g;""", con=engine)

In [16]:
# Share of devices that only appear in one year
for year in ('2019', '2022', '2023'):
    print(f'Share of devices with only year {year}', len(df_d.loc[df_d[year] == 100, :]) / len(df_d))

Share of devices with only year 2019 0.14504647621383096
Share of devices with only year 2022 0.1471900843238135
Share of devices with only year 2023 0.372127896291974


In [17]:
print(f'Share of devices with 2022 and 2019', len(df_d.loc[(df_d['2019'] > 0) & (df_d['2022'] > 0), :]) / len(df_d))

Share of devices with 2022 and 2019 0.016036688410012042


In [18]:
print(f'Share of devices with 2022 and 2023', len(df_d.loc[(df_d['2023'] > 0) & (df_d['2022'] > 0), :]) / len(df_d))

Share of devices with 2022 and 2023 0.3192819005186756


In [19]:
print(f'Share of devices with all years', len(df_d.loc[(df_d['2019'] > 0) & (df_d['2022'] > 0) & (df_d['2023'] > 0), :]) / len(df_d))

Share of devices with all years 0.011414662610191844


### 2.1 Devices sharing same home grids for three years

In [20]:
def year_coverage(data):
    y1, y2, y3 = data['2019'].sum(), data['2022'].sum(), data['2023'].sum()
    if (y1 > 0) & (y2 > 0) & (y3 > 0):
        return pd.Series(dict(yr_c=1))
    else:
        return pd.Series(dict(yr_c=0))

tqdm.pandas()
df_d_yc = df_d.groupby('grid_1km').progress_apply(year_coverage, include_groups=False).reset_index()

100%|██████████| 156450/156450 [00:54<00:00, 2893.09it/s]


In [21]:
shared_grids = df_d_yc.loc[df_d_yc.yr_c == 1, 'grid_1km'].unique()
devices2keep = df_d.loc[df_d['grid_1km'].isin(shared_grids), 'device_aid'].unique()
len(devices2keep)

22209798

## 2. Find the devices having records Mar-May 2022 and 2023

In [7]:
data_folder = os.path.join('dbs/combined_hex2visits_day/')
paths2stops = {int(x.split('_')[-1].split('.')[0]): os.path.join(data_folder, x)\
               for x in list(os.walk(data_folder))[0][2]}
paths2stops_list = list(paths2stops.values())
print(paths2stops_list[:2])

['dbs/combined_hex2visits_day/stops_0.parquet', 'dbs/combined_hex2visits_day/stops_1.parquet']


### 2.1 Overall filtering

In [8]:
def devices_period_stats(file_path=None):
    import pandas as pd
    def period_stats(data):
        if len(data) > 0:
            return pd.Series(dict(no_active_days=data['date'].nunique(),
                                  no_rec=len(data),
                                  no_hex=data['h3_id'].nunique()))
        else:
            return pd.Series(dict(no_active_days=0,
                                  no_rec=0,
                                  no_hex=0))
    df_g = pd.read_parquet(file_path)
    df_g['date'] = pd.to_datetime(df_g['date'])
    # Define filtering condition (Month: 3 to 5, Year: 2022 or 2023)
    filtered_df = df_g[
        ((df_g['date'].dt.month >= 3) & (df_g['date'].dt.month <= 5)) &  # Months: March to May
        (df_g['date'].dt.year.isin([2022, 2023]))                     # Years: 2022 & 2023
    ]
    return filtered_df.groupby('device_aid').apply(period_stats).reset_index()

# Use p_map for parallel processing with progress bar
df_indi_list = p_map(devices_period_stats, paths2stops_list, num_cpus=18)
df_indi = pd.concat(df_indi_list)
print("No. of individual devices covered", len(df_indi))

In [None]:
df_indi.to_sql('stops_hex_indi_dt', engine, schema='data_desc', index=False, method='multi', if_exists='replace', chunksize=10000)

In [11]:
df_indi[['no_active_days', 'no_rec', 'no_hex']].describe()

No. of individual devices covered 15372651


### 2.2 Refine the coverage to make sure the devices
1) Cover 2022-03 - 2022-04 and 2022-05
2) Cover 2023-03 - 2023-04 and 2023-05

In [24]:
def devices_period_stats_stringent(file_path=None):
    import pandas as pd
    def period_stats(data):
        if len(data) > 0:
            return pd.Series(dict(no_active_days=data['date'].nunique(),
                                  no_rec=len(data),
                                  no_hex=data['h3_id'].nunique()))
        else:
            return pd.Series(dict(no_active_days=0,
                                  no_rec=0,
                                  no_hex=0))
    df_g = pd.read_parquet(file_path)
    df_g['date'] = pd.to_datetime(df_g['date'])
    # Define filtering condition (Month: 3 to 5, Year: 2022 or 2023)
    filtered_df = df_g[
        ((df_g['date'].dt.month >= 3) & (df_g['date'].dt.month <= 5)) &  # Months: March to May
        (df_g['date'].dt.year.isin([2022, 2023]))                     # Years: 2022 & 2023
    ].copy()
    filtered_df.loc[:, 'period'] = filtered_df['date'].dt.month.apply(lambda x: 1 if x == 5 else 0)
    filtered_df.loc[:, 'ym'] = filtered_df['date'].dt.year.astype(str) + filtered_df['period'].astype(str)
    return filtered_df.groupby(['device_aid', 'ym']).apply(period_stats, include_groups=False).reset_index()

In [25]:
ind_f = devices_period_stats_stringent(file_path=paths2stops_list[0])

In [26]:
def comp_check(data):
    if len(data) == 4:
        return pd.Series(dict(comp=1))
    return pd.Series(dict(comp=0))
ind_f = ind_f.groupby('device_aid').apply(lambda x: comp_check(x), include_groups=False).reset_index()
print(len(ind_f[ind_f['comp'] == 1]) / len(ind_f))

0.05384554984099564


In [None]:
# Use p_map for parallel processing with progress bar
df_indi_list = p_map(devices_period_stats_stringent, paths2stops_list, num_cpus=18)
df_indi = pd.concat(df_indi_list)

In [33]:
df_indi = df_indi[df_indi['no_active_days'] > 0]
print("No. of individual devices covered", df_indi['device_aid'].nunique())

No. of individual devices covered 15372651


In [36]:
# Optimized function
def comp_check_fast(df):
    # Count the number of rows per group
    group_sizes = df.groupby('device_aid').size()
    # Create a Series where 1 indicates groups with 4 rows, 0 otherwise
    result = (group_sizes == 4).astype(int).rename('comp')
    return result.reset_index()

In [37]:
# Apply the optimized function
result = comp_check_fast(df_indi)
print(result.head())

                             device_aid  comp
0  000000a1-0008-6027-1edb-1c247d2ac927     0
1  0000014d-784a-48b6-bf3c-85ae60d4ee79     0
2  000001c2-847c-64e6-a0f0-bfb4c048bcef     0
3  000001f1-c486-6900-b677-7f8c23697770     0
4  0000020d-060b-6d27-b77d-6867f12f73b1     0


In [38]:
print(len(result[result['comp'] == 1]) / len(result), len(result[result['comp'] == 1]))

0.054013260302338224 830327


In [39]:
devices2keep = result[result['comp'] == 1]['device_aid'].unique()

In [44]:
# Save the list to a file
with open('dbs/devices2keep.pkl', 'wb') as f:
    pickle.dump(devices2keep, f)

print("List saved to 'devices2keep.pkl'")

List saved to 'devices2keep.pkl'


## 3. Individual weight
We only use individuals with over 15 nights at home.
### 3.1 Focus on a subset of individual devices


In [9]:
#devices2keep = pd.read_sql("""SELECT device_aid, no_active_days FROM data_desc.stops_hex_indi_dt;""", con=engine)
#devices2keep = devices2keep['device_aid'].to_list()

In [41]:
df_home = pd.read_sql("""SELECT device_aid, grid_1km, pop_1km FROM home_g;""", con=engine)
df_h = pd.read_sql("""SELECT device_aid, count FROM home WHERE count > 14;""", con=engine)
df_home = pd.merge(df_home, df_h, on='device_aid', how='left')
df_home.drop(columns=['count'], inplace=True)

# Generate weights only for individuals covered in the analysis time period
df_home = df_home.loc[df_home['device_aid'].isin(devices2keep), :]
tqdm.pandas()
df_home_s = df_home.groupby('grid_1km').progress_apply(lambda x: pd.Series(dict(count=len(x))), include_groups=False).reset_index()
df_home_s = pd.merge(df_home, df_home_s, on='grid_1km', how='left')
df_home_s.loc[:, 'wt_p'] = df_home_s.loc[:, 'pop_1km'] / df_home_s.loc[:, 'count']

100%|██████████| 70259/70259 [00:08<00:00, 7986.71it/s] 


In [42]:
w0 = ((np.std(df_home_s.loc[:, 'wt_p']) / np.mean(df_home_s.loc[:, 'wt_p'])) ** 2 + 1) ** 0.5 * 3.5 * np.median(df_home_s.loc[:, 'wt_p'])
df_home_s.loc[df_home_s['wt_p'] > w0, 'wt_p'] = w0

In [47]:
df_home_s[['device_aid', 'wt_p']].to_sql('weight_dt', engine, schema='public', index=False,
                                    method='multi', if_exists='replace')

KeyboardInterrupt: 

In [48]:
df_home_s[['device_aid', 'wt_p']].to_parquet("dbs/weight_dt.parquet", index=False)

## 4. Device filtering for h3 grids

In [49]:
data_folder = os.path.join('dbs/combined_hex2visits_day/')
paths2stops_list = [data_folder + x for x in os.listdir(data_folder)]
paths2stops_list[:2]

['dbs/combined_hex2visits_day/stops_0.parquet',
 'dbs/combined_hex2visits_day/stops_1.parquet']

In [50]:
paths2stops = {v.split('/')[2].split('.')[0].split('_')[1]: v for v in paths2stops_list}

In [51]:
df_indi = pd.read_sql("""SELECT device_aid, grdi, net_rent_100m FROM home_g;""", con=engine)
df_indi.loc[df_indi['grdi'] < 0, 'grdi'] = 0

# Only consider DT-analysis covered devices
# df_home_s = pd.read_sql("""SELECT * FROM weight_dt;""", con=engine)
df_indi = pd.merge(df_home_s[['device_aid', 'wt_p']], df_indi, on='device_aid', how='left')
grdi = df_indi['grdi'].median()
net_rent_100m = df_indi['net_rent_100m'].median()
df_indi['grdi'] = df_indi['grdi'].fillna(grdi)
df_indi['net_rent_100m'] = df_indi['net_rent_100m'].fillna(net_rent_100m)
print(f"No. of devices with grdi and net_rent_100m: {len(df_indi)}")

No. of devices with grdi and net_rent_100m: 838208


In [52]:
df_indi.dropna(inplace=True)
print(f"No. of devices with grdi and net_rent_100m: {len(df_indi)}")

No. of devices with grdi and net_rent_100m: 826050


In [53]:
h3_id_list = []
for k, v in tqdm(paths2stops.items(), desc='Adding individual group/weight to devices'):
    df = pd.read_parquet(v)
    if 'wt_p' not in df.columns:
        df = pd.merge(df, df_indi[['device_aid', 'grdi', 'net_rent_100m', 'wt_p']], on='device_aid', how='left')
        df.dropna(inplace=True)
        h3_id_list += list(df['h3_id'].unique())
        h3_id_list = list(set(h3_id_list))
        df.to_parquet(f'dbs/combined_hex2visits_day_sg_dt/stops_{k}.parquet', index=False)

Adding individual group/weight to devices: 100%|██████████| 300/300 [15:40<00:00,  3.13s/it]


In [54]:
df_h3 = pd.DataFrame(h3_id_list, columns=['h3_id'])
upper_reso = 3
tqdm.pandas()
df_h3.loc[:, f'h3_parent_{upper_reso}'] = df_h3['h3_id'].progress_apply(lambda x: h3.cell_to_parent(x, upper_reso))
print(df_h3[f'h3_parent_{upper_reso}'].nunique())
df_h3.to_parquet('dbs/combined_hex2visits_day_sg_dt_h3_batches.parquet', index=False)

100%|██████████| 209709/209709 [00:00<00:00, 429352.79it/s]


49
