## Sociodemographic Imputation for Mobile Device Location Data
### This is processed on AWS EMR

In [None]:
sc.install_pypi_package('pyarrow==0.10.0')
sc.install_pypi_package('matplotlib')
sc.install_pypi_package('geopandas')
sc.install_pypi_package('descartes')
sc.install_pypi_package('shapely')
sc.install_pypi_package('sklearn')
sc.install_pypi_package('spark_sklearn')
sc.install_pypi_package('imblearn')
sc.install_pypi_package('requests')
sc.install_pypi_package('hashids')
sc.install_pypi_package('pandas==1.0.0')
sc.install_pypi_package('scipy')
sc.install_pypi_package('numpy')
sc.install_pypi_package('geopy')
sc.install_pypi_package('pytz')
sc.install_pypi_package('s3fs')
sc.install_pypi_package('ipfn')
sc.install_pypi_package('boto3')
sc.install_pypi_package('pygeohash')

In [2]:
import sys
import os
import pytz
import pickle
#import pygeohash as pgh
from functools import reduce
from dateutil import tz
import datetime
from hashids import Hashids
# from s3fs.core import S3FileSystem
import shapely.geometry as sg
from shapely.geometry import Point, Polygon, shape
from shapely import wkt
import shapely.speedups
from geopy.distance import geodesic
import geopandas as gpd
import pandas as pd
import numpy as np
import time
import cgitb
from itertools import islice
from pyspark import Row, SQLContext
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DecimalType, StructType, StructField
from pyspark.sql import SparkSession, SQLContext, DataFrame, Window,WindowSpec
from pyspark.sql.functions import mean as _mean
from pyspark.sql.functions import coalesce,explode, pandas_udf, PandasUDFType, udf, struct, lag, col, when, lit, first, sha1, concat, lpad, substring, regexp_replace, countDistinct, isnull, isnan
import pyspark.sql.functions as SparkFunc
from pyspark.sql.window import Window
from pyspark.ml.feature import Bucketizer
from pyspark import SparkContext, SparkConf
from scipy.spatial import cKDTree as cKDTree
from sklearn.neighbors import KDTree,BallTree
from s3fs.core import S3FileSystem
import boto3
from pyspark.ml.classification import RandomForestClassifier,RandomForestClassificationModel,DecisionTreeClassifier,DecisionTreeClassificationModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import PipelineModel
from imblearn.over_sampling import SMOTE
from ipfn import ipfn
from pyspark.sql.functions import regexp_replace
from pyspark import SparkContext
import pygeohash as pgh

sql_context = SQLContext(sc)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Read device list, modify worker column

In [3]:
year = 2022
month = 1

if month == 2:
    num_days = 28
elif month in [4,6,9,11]:
    num_days = 30
else:
    num_days = 31
num_days

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

31

In [None]:
# read national device list with home and work info
National_device_list = spark.read.parquet("%02d/All8/AllFilter_HWProfile" % month)

# read device list of workers without non-fixed work location
nf_worker = spark.read.parquet("/Nonfixed_Worker/")

nf_worker = nf_worker.withColumn('nf_worker',lit(1))
National_device_list = National_device_list.join(nf_worker,'device_id',how='left')

# if worker
National_device_list = National_device_list.withColumn(
    'WORKER',when(((col('w_geohash')).isNotNull())|(col('nf_worker')==1),1).otherwise(0))

National_device_list.count()

# Add BGFIPS based on geohash7

In [None]:
# Read CBG shapefile
CBG = spark.read.format("csv").option("header", "True").option("delimiter", ",").load(
    "/Supplement Input/National_CBG_Bounds_2021.csv").coalesce(1).toPandas()
CBG.set_index('BGFIPS',inplace=True)
CBG = CBG.dropna()
CBG['min_lon'] = CBG['min_lon'].astype('float')
CBG['min_lat'] = CBG['min_lat'].astype('float')
CBG['max_lon'] = CBG['max_lon'].astype('float')
CBG['max_lat'] = CBG['max_lat'].astype('float')
sc.broadcast(CBG)
    
def efficient_spatial_join(geo_hash7):
    point = pgh.decode_exactly(geo_hash7)
    Latitude = point[0]
    Longitude = point[1]
    CBG_1 = CBG.loc[(CBG['min_lat'] <= Latitude) 
                    & (CBG['min_lon'] <= Longitude) 
                    & (CBG['max_lat'] >= Latitude) 
                    & (CBG['max_lon'] >= Longitude)]
    if len(CBG_1) == 0:
        BGFIPS = 'OutofUSA'
    elif len(CBG_1) == 1:
        BGFIPS = CBG_1.index[0]
    elif len(CBG_1['CTFIPS'].unique()) == 1:
        BGFIPS = CBG_1.index[0]
    else:
        dist = []
        for i in range(len(CBG_1)):
            dist.append(geodesic(CBG_1[['Latitude','Longitude']].iloc[i],(Latitude, Longitude)).meters)
        min_idx=np.argmin(dist)
        BGFIPS = CBG_1.index[min_idx]
            
    return BGFIPS

find_OD_udf = udf(efficient_spatial_join, StringType())

National_device_list = National_device_list.withColumn("BGFIPS",find_OD_udf(col('h_geohash')))

National_device_list.count()

# Add ACS variables

In [None]:
ACS_vars = spark.read.format("csv").option("header", "True").option(
    "delimiter", ",").option("inferSchema", "True").load(
    "/ACS_20215Y_BG_age5_inc5_pov_edu.csv").withColumn("BGFIPS", lpad("BGFIPS", 12, '0'))
National_device_list = National_device_list.join(ACS_vars,on='BGFIPS')

National_device_list.count()

# Add trip characteristics

In [11]:
file_lst = S3FileSystem(anon=False).ls(
    '' % (year, month))
file_lst = ['s3://'+ tripfile for tripfile in file_lst]
#trip rate
trip_rosters = spark.read.parquet(*file_lst).select('device_id')

CNTYTPS = trip_rosters.groupby('device_id').count().withColumn('CNTTDTR', col('count')/num_days).select('device_id','CNTTDTR')
df = spark.read.format("csv").option("header", "True").option("delimiter", ",").option("inferschema", "true").load(
       's3://mti-nextgen-temp/SocioDemoDeviceWeight/socialdemo_input_55.csv').coalesce(1).toPandas().dropna()
#inflation factor of number of trips from NHTS and MDLD
trip_factor = df['CNTTDTR'].mean()/CNTYTPS.groupBy().avg("CNTTDTR").take(1)[0][0]
CNTYTPS = CNTYTPS.withColumn('CNTTDTR', col('CNTTDTR')*trip_factor)

CNTYTPS.write.parquet("/%s/%02d/TripCount" % (year,month))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
CNTYTPS = spark.read.parquet("/%s/%02d/TripCount" % (year,month))

National_device_list = National_device_list.join(CNTYTPS,on='device_id', how='left').fillna(0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Impute sociodemo

In [19]:
# if sampling size are too small. to avoid the socio demo of the entire population falling into a certain category, random draw can be applied
import random
def random_draw(prob_array):
    proba = np.random.choice([0,1,2,3,4], 1, p=prob_array.tolist())[0]
    return float(proba)

random_draw_udf = SparkFunc.udf(lambda x: random_draw(x), StringType())

age_model = DecisionTreeClassificationModel.load("/age_model")
features = National_device_list.select('device_id','cbg_age_pct_under_17', 'cbg_age_pct_18_24', 'cbg_age_pct_25_34',
                                       'cbg_age_pct_35_54', 'cbg_age_pct_55_64', 'cbg_age_pct_65_over',
                                       'cbg_inc_pct_less_25k','cbg_inc_pct_25k_50k', 'cbg_inc_pct_50k_75k',
                                       'cbg_inc_pct_75k_125k', 'cbg_inc_pct_125k_more',
                                       'ct_pov_rate', 'ct_edu_pct_less_HS','ct_edu_pct_SCND_AD', 'ct_edu_pct_BD_more','CNTTDTR','WORKER')

features_assembler = VectorAssembler(inputCols=['cbg_age_pct_under_17', 'cbg_age_pct_18_24', 'cbg_age_pct_25_34',
                                       'cbg_age_pct_35_54', 'cbg_age_pct_55_64', 'cbg_age_pct_65_over',
                                       'cbg_inc_pct_less_25k','cbg_inc_pct_25k_50k', 'cbg_inc_pct_50k_75k',
                                       'cbg_inc_pct_75k_125k', 'cbg_inc_pct_125k_more',
                                       'ct_pov_rate', 'ct_edu_pct_less_HS','ct_edu_pct_SCND_AD', 'ct_edu_pct_BD_more','CNTTDTR','WORKER'],outputCol='features')
features = features_assembler.transform(features)
features = features.select('device_id','features')
age_predictions = age_model.transform(features)
age_predictions = age_predictions.withColumn('age', random_draw_udf(col('probability'))).select('device_id','age')

income_model = DecisionTreeClassificationModel.load("/income_model")
features = National_device_list.select('device_id','cbg_age_pct_under_17', 'cbg_age_pct_18_24', 'cbg_age_pct_25_34',
                                       'cbg_age_pct_35_54', 'cbg_age_pct_55_64', 'cbg_age_pct_65_over',
                                       'cbg_inc_pct_less_25k','cbg_inc_pct_25k_50k', 'cbg_inc_pct_50k_75k',
                                       'cbg_inc_pct_75k_125k', 'cbg_inc_pct_125k_more',
                                       'ct_pov_rate', 'ct_edu_pct_less_HS','ct_edu_pct_SCND_AD', 'ct_edu_pct_BD_more','CNTTDTR','WORKER')

features_assembler = VectorAssembler(inputCols=['cbg_age_pct_under_17', 'cbg_age_pct_18_24', 'cbg_age_pct_25_34',
                                       'cbg_age_pct_35_54', 'cbg_age_pct_55_64', 'cbg_age_pct_65_over',
                                       'cbg_inc_pct_less_25k','cbg_inc_pct_25k_50k', 'cbg_inc_pct_50k_75k',
                                       'cbg_inc_pct_75k_125k', 'cbg_inc_pct_125k_more',
                                       'ct_pov_rate', 'ct_edu_pct_less_HS','ct_edu_pct_SCND_AD', 'ct_edu_pct_BD_more','CNTTDTR','WORKER'],outputCol='features')
features = features_assembler.transform(features)
features = features.select('device_id','features')
income_predictions = income_model.transform(features)
income_predictions = income_predictions.withColumn('income', random_draw_udf(col('probability'))).select('device_id','income')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
National_device_list = National_device_list.join(age_predictions,on='device_id',how='left')
National_device_list = National_device_list.join(income_predictions,on='device_id',how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
National_device_list.write.parquet("%s/%02d/SocioDemo" % (year, month))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…