In [None]:
import os
import ast
import sys
import pickle
import importlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from pandas.api.types import is_string_dtype
from pathlib import Path
from uuid import UUID
from collections import defaultdict

pd.set_option("display.max_columns", 100)

%matplotlib inline

In [None]:
INCLUDE_TEST_USERS = False

In [None]:
# Add path to your emission server here. Uncommented because the notebooks are run in the server.
# If running locally, you need to point this to the e-mission server repo.
# emission_path = Path(os.getcwd()).parent.parent.parent / 'my_emission_server' / 'e-mission-server'
# sys.path.append(str(emission_path))

# # Also add the home (viz_scripts) to the path
sys.path.append('../viz_scripts')

In [None]:
import emission.core.get_database as edb
import emission.storage.timeseries.abstract_timeseries as esta

In [None]:
DB_SOURCE = [
    "Stage_database",            # Does NOT have composite trips BUT has section modes and distances
    "openpath_prod_durham",      # Has composite trips
    "openpath_prod_mm_masscec",  # Has composite trips
    "openpath_prod_ride2own",    # Has composite trips
    "openpath_prod_uprm_nicr"    # Has composite trips
]

In [None]:
CURRENT_DB = DB_SOURCE[0]

assert CURRENT_DB in DB_SOURCE

In [None]:
REPLACED_MODE_DICT = {
    "Stage_database": {
        'no_trip': 'no_trip',
        'no_travel': 'no_trip',
        'Unknown': 'unknown',
        'unknown': 'unknown',
        'bus': 'transit',
        'drove_alone': 'car',
        'bike': 'p_micro',
        'shared_ride': 's_car',
        'walk': 'walk',
        'train': 'transit',
        'bikeshare': 's_micro',
        'not_a trip': 'no_trip',
        'pilot_ebike': 'p_micro',
        'electric_car': 'car',
        'taxi': 'ridehail',
        'not_a_trip': 'no_trip',
        'run': 'walk',
        'scootershare': 's_micro',
        'tramway': 'transit',
        'free_shuttle': 'transit',
        'e-bike': 'p_micro',
        'rental_car': 'car',
        'train_+ bus': 'transit',
        'skateboard': 'p_micro',
        'snowboarding': 'p_micro',
        'e_bike': 'p_micro',
        'golf_cart': 'unknown',
        'emergency_vehicle with others': 's_car',
        'call_friend': 's_car',
        'no_replacement': 'no_travel',
        'doing_nothing': 'no_trip',
        'na': 'no_trip',
        'ebike': 'p_micro',
        'hiking': 'walk',
        'n/a': 'no_trip',
        'testing': 'unknown',
        'home': 'no_trip',
        'must_walk 3-5 mi a day for back': 'walk',
        'family': 's_car',
        'car': 'car',
        'pilot_e-bike': 'p_micro',
        'pilot_bike': 'p_micro',
        'time_spent on the clock at amazon': 'no_trip',
        'working': 'no_trip',
        'walk_at work': 'walk',
        'sitting_on my butt doing nothing': 'no_trip',
        'nothing._delivered food for work': 'no_trip',
        'train,_bus and walk': 'transit',
        'work_vehicle': 'car',
        'friend_picked me up': 's_car',
        'ski': 'p_micro',
        'not_accurate': 'unknown',
        'stolen_ebike': 'p_micro'
    },
    "openpath_prod_durham": {
        'Unknown': 'unknown',
        'bike': 'p_micro',
        'shared_ride': 's_car',
        'drove_alone': 'car',
        'bus': 'transit',
        'no_travel': 'no_trip',
        'scootershare': 's_micro',
        'walk': 'walk',
        'taxi': 'ridehail',
        'e_car_drove_alone': 'car',
        'bikeshare': 's_micro',
        'ebike': 'p_micro',
        'train': 'transit',
        'e_car_shared_ride': 's_car'
    },
    "openpath_prod_mm_masscec": {
        'Unknown': 'unknown',
        'drove_alone': 'car',
        'walk': 'walk',
        'shared_ride': 's_car',
        'bike': 'p_micro',
        'bikeshare': 's_micro',
        'no_travel': 'no_trip',
        'taxi': 'ridehail',
        'bus': 'transit',
        'scootershare': 's_micro',
        'train': 'transit',
        'walking': 'walk',
        'e_car_drove_alone': 'car'
    },
    "openpath_prod_ride2own": {
        'Unknown': 'unknown',
        'drove_alone': 'car',
        'walk': 'walk',
        'shared_ride': 's_car',
        'bike': 'p_micro',
        'no_travel': 'no_trip',
        'taxi': 'ridehail',
        'bus': 'transit',
        'train': 'transit',
        'e_car_drove_alone': 'car',
        'e_car_shared_ride': 's_car'
    },
    "openpath_prod_uprm_nicr": {
        'Unknown': 'unknown',
        'walk': 'walk',
        'drove_alone': 'car'
    }
}

SURVEY_DATA_DICT = {
    "Stage_database": {
        "Unique User ID (auto-filled, do not edit)": "user_id",
        "In which year were you born?": "birth_year",
        "What is your gender?": "gender",
        "Do you have a valid driver's license?": "has_drivers_license",
        "Are you a student?": "is_student",
        "What is the highest grade or degree that you have completed?": "highest_education",
        "Do you work for either pay or profit?": "is_paid",
        "Do you have more than one job?": "has_multiple_jobs",
        "Do you work full-time or part-time at your primary job?": "primary_job_type",
        "Which best describes your primary job?": "primary_job_description",
        "How did you usually get to your primary job last week? ": "primary_job_commute_mode",
        "Thinking about your daily commute to work last week, how many minutes did it usually take to get from home to the primary job/work place?": "primary_job_commute_time",
        "At your primary job, do you have the ability to set or change your own start time?": "is_primary_job_flexible",
        "Do you have the option of working from home or an alternate location instead of going into your primary work place?": "primary_job_can_wfh",
        "How many days per week do you usually work from home or an alternate location?": "wfh_days",
        "Do you own or rent your place of residence?": "residence_ownership_type",
        "What is your home type?": "residence_type",
        "Please identify which category represents your total household income, before taxes, for last year.": "income_category",
        "Including yourself, how many people live in your home?": "n_residence_members",
        "How many children under age 18 live in your home?": "n_residents_u18",
        "Including yourself, how many people have a driver's license in your household?": "n_residents_with_license",
        "How many motor vehicles are owned, leased, or available for regular use by the people who currently live in your household?": "n_motor_vehicles",
        "If you were unable to use your household vehicle(s), which of the following options would be available to you to get you from place to place?": "available_modes",
        "Do you have a medical condition that makes it difficult to travel outside of the home?": "has_medical_condition",
        "How long have you had this condition?": "medical_condition_duration"
    },
    # Retrieved from: e-mission-phone/survey-resources/data-xls/demo-survey-v1.xlsx
    "openpath_prod_durham": {
        "At_your_primary_job_do_you_ha": "is_primary_job_flexible",
        "Which_best_describes_your_prim": "primary_job_description",
        "Do_you_work_full_time_or_part_": "primary_job_type",
        "Do_you_have_the_option_of_work": "primary_job_can_wfh",
        "Please_describe_your_primary_job": "primary_job_description_2",
        "Do_you_have_more_than_one_job": "has_multiple_jobs",
        # Two columns: how many days/week do you work & what days of the week do you work. 
        # the latter has only 4 NA values, the former has 45 NA values.
        "What_days_of_the_week_do_you_t": "wfh_days",
        "How_many_days_do_you_usually_w_001": "n_wfh_days",
        # All these are NAs.
        "Which_one_below_describe_you_b": "description",
        "What_is_your_race_ethnicity": "race_or_ethnicity",
        "Are_you_a_student": "is_student",
        "What_is_the_highest_grade_or_d": "highest_education",
        "do_you_consider_yourself_to_be": "is_transgender",
        "What_is_your_gender": "gender",
        "How_old_are_you": "age",
        "Are_you_a_paid_worker": "is_paid",
        "Do_you_have_a_driver_license": "has_drivers_license",
        "How_long_you_had_this_conditio": "medical_condition_duration",
        "Including_yourself_how_many_w_001": "n_residents_u18",
        "Including_yourself_how_many_p": "n_residence_members",
        "Do_you_own_or_rent_your_home": "residence_ownership_type",
        "Please_identify_which_category": "income_category",
        "If_you_were_unable_to_use_your": "available_modes",
        "Including_yourself_how_many_p_001": "n_residents_with_license",
        "Including_yourself_how_many_w": "n_working_residents",
        "What_is_your_home_type": "residence_type",
        "How_many_motor_vehicles_are_ow": "n_motor_vehicles",
        "Do_you_have_a_condition_or_han": "has_medical_condition"
    },
    "openpath_prod_mm_masscec": {
        # Same questions as Durham.
        "At_your_primary_job_do_you_ha": "is_primary_job_flexible",
        "Which_best_describes_your_prim": "primary_job_description",
        "Do_you_work_full_time_or_part_": "primary_job_type",
        "Do_you_have_the_option_of_work": "primary_job_can_wfh",
        "Please_describe_your_primary_job": "primary_job_description_2",
        "Do_you_have_more_than_one_job": "has_multiple_jobs",
        # Two columns: how many days/week do you work & what days of the week do you work. 
        # the latter has only 4 NA values, the former has 45 NA values.
        "What_days_of_the_week_do_you_t": "wfh_days",
        "How_many_days_do_you_usually_w_001": "n_wfh_days",
        # All these are NAs.
        "Which_one_below_describe_you_b": "description",
        "What_is_your_race_ethnicity": "race_or_ethnicity",
        "Are_you_a_student": "is_student",
        "What_is_the_highest_grade_or_d": "highest_education",
        "do_you_consider_yourself_to_be": "is_transgender",
        "What_is_your_gender": "gender",
        "How_old_are_you": "age",
        "Are_you_a_paid_worker": "is_paid",
        "Do_you_have_a_driver_license": "has_drivers_license",
        "How_long_you_had_this_conditio": "medical_condition_duration",
        "Including_yourself_how_many_w_001": "n_residents_u18",
        "Including_yourself_how_many_p": "n_residence_members",
        "Do_you_own_or_rent_your_home": "residence_ownership_type",
        "Please_identify_which_category": "income_category",
        "If_you_were_unable_to_use_your": "available_modes",
        "Including_yourself_how_many_p_001": "n_residents_with_license",
        "Including_yourself_how_many_w": "n_working_residents",
        "What_is_your_home_type": "residence_type",
        "How_many_motor_vehicles_are_ow": "n_motor_vehicles",
        "Do_you_have_a_condition_or_han": "has_medical_condition"
    },
    "openpath_prod_ride2own": {
        # Same questions as Durham.
        "How_old_are_you": "age",
        "What_is_your_gender": "gender",
        "do_you_consider_yourself_to_be": "is_transgender",
        "What_is_your_race_ethnicity": "race_or_ethnicity",
        "Do_you_have_a_driver_license": "has_drivers_license",
        "Are_you_a_student": "is_student",
        "What_is_the_highest_grade_or_d": "highest_education",
        "Are_you_a_paid_worker": "is_paid",
        "Which_one_below_describe_you_b": "description",
        "Do_you_own_or_rent_your_home": "residence_ownership_type",
        "What_is_your_home_type": "residence_type",
        "Please_identify_which_category": "income_category",
        "Including_yourself_how_many_p": "n_residence_members",
        "Including_yourself_how_many_w": "n_working_residents",
        "Including_yourself_how_many_p_001": "n_residents_with_license",
        "Including_yourself_how_many_w_001": "n_residents_u18",
        "How_many_motor_vehicles_are_ow": "n_motor_vehicles",
        "If_you_were_unable_to_use_your": "available_modes",
        "Do_you_have_a_condition_or_han": "has_medical_condition",
        "How_long_you_had_this_conditio": "medical_condition_duration",
        "Do_you_have_more_than_one_job": "has_multiple_jobs",
        "Do_you_work_full_time_or_part_": "primary_job_type",
        "Which_best_describes_your_prim": "primary_job_description",
        "Please_describe_your_primary_job": "primary_job_description_2",
        "At_your_primary_job_do_you_ha": "is_primary_job_flexible",
        "Do_you_have_the_option_of_work": "primary_job_can_wfh",
        "How_many_days_do_you_usually_w_001": "n_wfh_days",
        "What_days_of_the_week_do_you_t": "wfh_days"
    },
    "openpath_prod_uprm_nicr": {
        # Same as Durham!
        "At_your_primary_job_do_you_ha": "is_primary_job_flexible",
        "Which_best_describes_your_prim": "primary_job_description",
        "Do_you_work_full_time_or_part_": "primary_job_type",
        "Do_you_have_the_option_of_work": "primary_job_can_wfh",
        "Please_describe_your_primary_job": "primary_job_description_2",
        "Do_you_have_more_than_one_job": "has_multiple_jobs",
        # Two columns: how many days/week do you work & what days of the week do you work. 
        # the latter has only 4 NA values, the former has 45 NA values.
        "What_days_of_the_week_do_you_t": "wfh_days",
        "How_many_days_do_you_usually_w_001": "n_wfh_days",
        # All these are NAs.
        "Which_one_below_describe_you_b": "description",
        "What_is_your_race_ethnicity": "race_or_ethnicity",
        "Are_you_a_student": "is_student",
        "What_is_the_highest_grade_or_d": "highest_education",
        "do_you_consider_yourself_to_be": "is_transgender",
        "What_is_your_gender": "gender",
        "How_old_are_you": "age",
        "Are_you_a_paid_worker": "is_paid",
        "Do_you_have_a_driver_license": "has_drivers_license",
        "How_long_you_had_this_conditio": "medical_condition_duration",
        "Including_yourself_how_many_w_001": "n_residents_u18",
        "Including_yourself_how_many_p": "n_residence_members",
        "Do_you_own_or_rent_your_home": "residence_ownership_type",
        "Please_identify_which_category": "income_category",
        "If_you_were_unable_to_use_your": "available_modes",
        "Including_yourself_how_many_p_001": "n_residents_with_license",
        "Including_yourself_how_many_w": "n_working_residents",
        "What_is_your_home_type": "residence_type",
        "How_many_motor_vehicles_are_ow": "n_motor_vehicles",
        "Do_you_have_a_condition_or_han": "has_medical_condition"
    }
}

In [None]:
## Source: db_utils.py in op-admin-dashboard.

BINARY_DEMOGRAPHICS_COLS = [
    'user_id',
    '_id',
]

EXCLUDED_DEMOGRAPHICS_COLS = [
    'data.xmlResponse', 
    'data.name',
    'data.version',
    'data.label',
    'xmlns:jr',
    'xmlns:orx',
    'id',
    'start',
    'end',
    'attrxmlns:jr',
    'attrxmlns:orx',
    'attrid',
    '__version__',
    'attrversion',
    'instanceID',
]

In [None]:
## Source: scaffolding.py

def expand_userinputs(labeled_ct):
    '''
    param: labeled_ct: a dataframe of confirmed trips, some of which have labels
    params: labels_per_trip: the number of labels for each trip.
        Currently, this is 2 for studies and 3 for programs, and should be 
        passed in by the notebook based on the input config.
        If used with a trip-level survey, it could be even larger.
    '''
    # CASE 1 of https://github.com/e-mission/em-public-dashboard/issues/69#issuecomment-1256835867
    if len(labeled_ct) == 0:
        return labeled_ct
    label_only = pd.DataFrame(labeled_ct.user_input.to_list(), index=labeled_ct.index)
    # disp.display(label_only.head())
    labels_per_trip = len(label_only.columns)
    print("Found %s columns of length %d" % (label_only.columns, labels_per_trip))
    expanded_ct = pd.concat([labeled_ct, label_only], axis=1)
    assert len(expanded_ct) == len(labeled_ct), \
        ("Mismatch after expanding labels, expanded_ct.rows = %s != labeled_ct.rows %s" %
            (len(expanded_ct), len(labeled_ct)))
    print("After expanding, columns went from %s -> %s" %
        (len(labeled_ct.columns), len(expanded_ct.columns)))
    assert len(expanded_ct.columns) == len(labeled_ct.columns) + labels_per_trip, \
        ("Mismatch after expanding labels, expanded_ct.columns = %s != labeled_ct.columns %s" %
            (len(expanded_ct.columns), len(labeled_ct.columns)))
    # disp.display(expanded_ct.head())
    return expanded_ct

In [None]:
## Source: scaffolding.py

def data_quality_check(expanded_ct):
    '''1. Delete rows where the mode_confirm was pilot_ebike and repalced_mode was pilot_ebike.
       2. Delete rows where the mode_confirm was pilot_ebike and repalced_mode was same_mode.
       3. Replace same_mode for the mode_confirm for Energy Impact Calcualtion.'''

    # TODO: This is only really required for the initial data collection around the minipilot
    # in subsequent deployes, we removed "same mode" and "pilot_ebike" from the options, so the
    # dataset did not contain of these data quality issues

    if 'replaced_mode' in expanded_ct.columns:
        expanded_ct.drop(expanded_ct[(expanded_ct['mode_confirm'] == 'pilot_ebike') & (expanded_ct['replaced_mode'] == 'pilot_ebike')].index, inplace=True)
        expanded_ct.drop(expanded_ct[(expanded_ct['mode_confirm'] == 'pilot_ebike') & (expanded_ct['replaced_mode'] == 'same_mode')].index, inplace=True)
        expanded_ct['replaced_mode'] = np.where(expanded_ct['replaced_mode'] == 'same_mode',expanded_ct['mode_confirm'], expanded_ct['replaced_mode'])
    
    return expanded_ct

In [None]:
## Source: scaffolding.py

uuid_df = pd.json_normalize(list(edb.get_uuid_db().find()))

if not INCLUDE_TEST_USERS:
    uuid_df = uuid_df.loc[~uuid_df.user_email.str.contains('_test_'), :]

filtered = uuid_df.uuid.unique()

agg = esta.TimeSeries.get_aggregate_time_series()
all_ct = agg.get_data_df("analysis/confirmed_trip", None)

print(f"Before filtering, length={len(all_ct)}")
participant_ct_df = all_ct.loc[all_ct.user_id.isin(filtered), :]
print(f"After filtering, length={len(participant_ct_df)}")

expanded_ct = expand_userinputs(participant_ct_df)
expanded_ct = data_quality_check(expanded_ct)
print(expanded_ct.columns.tolist())
expanded_ct['replaced_mode'] = expanded_ct['replaced_mode'].fillna('Unknown')

In [None]:
# # Additional preprocessing for replaced mode (if any)

mode_counts = expanded_ct['replaced_mode'].value_counts()
drop_modes = mode_counts[mode_counts == 1].index.tolist()

expanded_ct.drop(
    index=expanded_ct.loc[expanded_ct.replaced_mode.isin(drop_modes)].index,
    inplace=True
)

# Additional modes to drop.
expanded_ct.drop(
    index=expanded_ct.loc[expanded_ct.replaced_mode.isin(
        # Remove all rows with air, boat, or weird answers.
        ['houseboat', 'gondola', 'airline_flight', 'aircraft', 'zoo', 'air',
         'airplane', 'boat', 'flight', 'plane', 'meal', 'lunch']
    )].index,
    inplace=True
)

expanded_ct.replaced_mode = expanded_ct.replaced_mode.apply(lambda x: REPLACED_MODE_DICT[CURRENT_DB][x])

# Demographic pre-processing

In [None]:
# Demographics

if CURRENT_DB != "Stage_database":

    decoded_uuids = [str(x) for x in filtered]

    ## Source: query_demographics() in op-admin-dashboard.
    ts = esta.TimeSeries.get_aggregate_time_series()
    entries = list(ts.find_entries(["manual/demographic_survey"]))

    available_key = {}
    for entry in entries:
        survey_key = list(entry['data']['jsonDocResponse'].keys())[0]
        if survey_key not in available_key:
            available_key[survey_key] = []

        # Minor modification: Added user_id check to filter users.
        if str(entry['user_id']) in decoded_uuids:
            available_key[survey_key].append(entry)

    dataframes = {}
    for key, json_object in available_key.items():
        df = pd.json_normalize(json_object)
        dataframes[key] = df

    for key, df in dataframes.items():
        if not df.empty:
            for col in BINARY_DEMOGRAPHICS_COLS:
                if col in df.columns:
                    df[col] = df[col].apply(str) 
            columns_to_drop = [col for col in df.columns if col.startswith("metadata")]
            df.drop(columns= columns_to_drop, inplace=True) 
            df.columns=[col.rsplit('.',1)[-1] if col.startswith('data.jsonDocResponse.') else col for col in df.columns]
            for col in EXCLUDED_DEMOGRAPHICS_COLS:
                if col in df.columns:
                    df.drop(columns= [col], inplace=True)

    survey_data = pd.DataFrame()                
    for v in dataframes.values():
        survey_data = pd.concat([survey_data, v], axis=0, ignore_index=True)
else:
    # Read the demographics.
    # Ensure that you have access to this survey file and that it is placed in the given destination.
    survey_data = pd.read_csv('../viz_scripts/Can Do Colorado eBike Program - en.csv')
    survey_data.rename(columns={'Unique User ID (auto-filled, do not edit)': 'user_id'}, inplace=True)

In [None]:
def get_section_durations(confirmed_trips: pd.DataFrame):
    
    import pandarallel

    # Initialize the parallel processing.
    pandarallel.initialize(progress_bar=False)

    """
    Extract section-wise durations from trips for every trips.
    """

    # the inner function has access to these variables.
    primary_key = 'analysis/inferred_section'
    fallback_key = 'analysis/cleaned_section'

    def get_durations(user_id, trip_id):

        inferred_sections = esdt.get_sections_for_trip(key = primary_key,
            user_id = user_id, trip_id = trip_id)

        if inferred_sections and len(inferred_sections) > 0:
            return [x.data.duration for x in inferred_sections]
        
        print("Falling back to confirmed trips...")

        cleaned_sections = esdt.get_sections_for_trip(key = fallback_key,
            user_id = user_id, trip_id = trip_id)
    
        if cleaned_sections and len(cleaned_sections) > 0:
            return [x.data.duration for x in cleaned_sections]

        return []

    confirmed_trips['section_durations'] = confirmed_trips.parallel_apply(
        lambda x: get_durations(x.user_id, x.cleaned_trip), axis=1
    )

    return confirmed_trips

In [None]:
if CURRENT_DB == "Stage_database":
    
    if os.path.exists('./data/cached_allceo_data.csv'):
        
        # Replace current instance of dataframe with the cached dataframe.
        expanded_ct = pd.read_csv('./data/cached_allceo_data.csv')
        expanded_ct.loc[expanded_ct.replaced_mode == 'no_travel', 'replaced_mode'] = 'no_trip'
    else:
        ## NOTE: Run this cell only if the cached CSV is not already available. It will take a LOT of time.
        ## Benchmark timing: ~12 hours on a MacBook Pro (2017 model) with pandarallel, 4 workers.
        expanded_ct = get_section_durations(expanded_ct)
        expanded_ct.to_csv('./data/cached_allceo_data.csv', index=False)

In [None]:
print(len(survey_data.user_id.unique()), len(expanded_ct.user_id.unique()))

In [None]:
survey_data.rename(SURVEY_DATA_DICT[CURRENT_DB], axis='columns', inplace=True)

### Demographic data preprocessing

In [None]:
# gtg
survey_data['ft_job'] = survey_data.primary_job_type.apply(
    lambda x: 1 if str(x).lower() == 'full_time' else 0
)

# gtg
survey_data['multiple_jobs'] = survey_data.has_multiple_jobs.apply(
    lambda x: 1 if str(x).lower() == 'yes' else 0
)

# gtg
survey_data.loc[
    survey_data.n_motor_vehicles.isin(
        ['prefer_not_to_say', 'Prefer not to say / Prefiero no decir.']
    ), 'n_motor_vehicles'] = 0

survey_data.loc[survey_data.n_motor_vehicles.isin(
    ['more_than_3', '4+', 'more_than_4', 'more_than_3']
), 'n_motor_vehicles'] = 4
survey_data.n_motor_vehicles = survey_data.n_motor_vehicles.astype(int)

# gtg
survey_data.has_drivers_license = survey_data.has_drivers_license.apply(
    lambda x: 1 if str(x).lower() == 'yes' else 0
)

survey_data.loc[survey_data.n_residents_u18 == 'prefer_not_to_say', 'n_residents_u18'] = 0
survey_data.n_residents_u18 = survey_data.n_residents_u18.astype(int)

survey_data.loc[survey_data.n_residence_members == 'prefer_not_to_say', 'n_residence_members'] = 0
survey_data.n_residence_members = survey_data.n_residence_members.astype(int)

survey_data.loc[survey_data.n_residents_with_license == 'prefer_not_to_say'] = 0
survey_data.loc[survey_data.n_residents_with_license == 'more_than_4', 'n_residents_with_license'] = 4
survey_data.n_residents_with_license = survey_data.n_residents_with_license.astype(int)

# Handle abnormal inputs.
survey_data = survey_data[
     (survey_data.n_residence_members - survey_data.n_residents_with_license >= 0) &
    (survey_data.n_residence_members - survey_data.n_residents_u18 >= 0)
].reset_index(drop=True)

# gtg
if CURRENT_DB != "Stage_database":
    survey_data.n_working_residents = survey_data.n_working_residents.apply(
        lambda x: 0 if x == 'prefer_not_to_say' else int(x)
    )
else:
    survey_data['n_working_residents'] = survey_data['n_residence_members'] - survey_data['n_residents_u18']
    
survey_data = survey_data[survey_data.n_working_residents >= 0].reset_index(drop=True)

# gtg
survey_data.is_paid = survey_data.is_paid.apply(lambda x: 1 if x == 'Yes' else 0)

# gtg
survey_data.has_medical_condition = survey_data.has_medical_condition.apply(
    lambda x: 1 if str(x).lower() == 'yes' else 0
)

## gtg
survey_data.is_student.replace({
    'Not a student': 0, 
    'Yes - Full Time College/University': 1,
    'Yes - Vocation/Technical/Trade School': 1,
    'Yes - K-12th Grade including GED': 1, 
    'Work': 0, 
    'No': 0,
    'Prefer not to say': 0,
    'Yes - Part-Time College/University': 1,
    'Taking prerequisites missing for grad program ': 1, 
    'Graduate': 1,
    'Custodian': 0, 
    'Work at csu': 0,
    'not_a_student': 0, 
    'yes___vocation_technical_trade_school': 1,
    'yes___part_time_college_university': 1,
    'prefer_not_to_say': 0, 
    'yes___k_12th_grade_including_ged': 1,
    'yes___full_time_college_university': 1
}, inplace=True)

### Additinal Demographic Data Preprocessing

In [None]:
if CURRENT_DB == "Stage_database":
    age = survey_data.birth_year.apply(
        lambda x: 2024 - int(x) if int(x) > 100 else int(x)
    )
    
    upper = age - (age % 5)
    lower = upper + 5
    new_col = (upper + 1).astype(str) + '___' + lower.astype(str) + '_years_old'
    survey_data['age'] = new_col
    
    survey_data.loc[survey_data.age.isin([
        '66___70_years_old', '76___80_years_old', '81___85_years_old'
    ]), 'age'] = '__65_years_old'
    
    survey_data.drop(columns=['birth_year'], inplace=True)

else:
    survey_data = survey_data[survey_data.age != 0].reset_index(drop=True)

if survey_data.columns.isin(['primary_job_commute_mode', 'primary_job_commute_time']).all():
    survey_data.drop(columns=['primary_job_commute_mode', 'primary_job_commute_time'], inplace=True)

In [None]:
def normalize_job_descriptions(db_name, df):
    if db_name != 'Stage_database':
        PRIMARY_JOB_DESCRIPTION_DICT = {
            "sales_or_service": "Sales or service",
            "other": "Other",
            "": "Other",
            "professional__managerial__or_technical": "Professional, Manegerial, or Technical",
            "manufacturing__construction__maintenance": "Manufacturing, construction, maintenance, or farming",
            "clerical_or_administrative_support": "Clerical or administrative support",
            "prefer_not_to_say": "Prefer not to say"
        }
        
        df.primary_job_description = df.primary_job_description.apply(
            lambda x: PRIMARY_JOB_DESCRIPTION_DICT[x]
        )
    else:
        df.primary_job_description = df.primary_job_description.str.strip()

        # Normalize the job description. Inspired from the 'e-bike trips by occupation' 
        # plot in the CanBikeCo full pilot paper.
        df.loc[
            df.primary_job_description.isin([
                'Paraprofessional', 'Education', 'education/early childhood', 'Teacher',
                'Education non-profit manager', 'Scientific research', 'Research',
                'Preschool Tracher'
            ]), 'primary_job_description'
        ] = 'Education'

        df.loc[
            df.primary_job_description.isin([
                'Custodian', 'Custodial', 'Csu custodian', 'Janitorial',
                'Custodial Maintanace'
            ]), 'primary_job_description'
        ] = 'Custodial'

        df.loc[
            df.primary_job_description.isin([
                'Inbound cs', 'Accounting Technician', 
                'Clerical'
            ]), 'primary_job_description'
        ] = 'Clerical or administrative support'

        df.loc[
            df.primary_job_description.isin([
                'Restaurant manager', 'Transportaion Services',
            ]), 'primary_job_description'
        ] = 'Sales or service'

        df.loc[
            df.primary_job_description.isin([
                'Pastry chef and line cook', 'Cook', 'Chef', 'Dining Services',
                'Food Service', 'Cooking', 'Residential Dining Services', 'Line Cook'
            ]), 'primary_job_description'
        ] = 'Food service'

        df.loc[
            df.primary_job_description.isin([
                'CNA', 'Caregiver/ Qmap', 'Health care', 'Nurse',
                'Healthcare', 'Medical', 'Medical field',
                'Family support'
            ]), 'primary_job_description'
        ] = 'Medical/healthcare'

        df.loc[
            df.primary_job_description.isin([
                'Amazon', 'Hockey rink', 'Caregiver', 'Security', 'Nonprofit social work',
                'Therapeutic', 'Driver'
            ]), 'primary_job_description'
        ] = 'Other'

        df.loc[
            df.primary_job_description.isin([
                'Hospital laundry', 'Matreal handler', 'Maintenance',
                'Co op laundry'
            ]), 'primary_job_description'
        ] = 'Manufacturing, construction, maintenance, or farming'

        df.loc[df.primary_job_description.isna(), 'primary_job_description'] = 'Other'

    return df

In [None]:
INCOME_DICT = {
    'Stage_database': {
        'Prefer not to say': 0,
        'Less than $24,999': 1,
        '$25,000-$49,999': 2,
        '$50,000-$99,999': 3,
        '$100,000 -$149,999': 4,
        '$150,000-$199,999': 5,
        '$150,000': 5,
        '$150,000-$199,999': 6,
        '$200,000 or more': 7
    },
    'Others': {
        'prefer_not_to_say': 0, 
        'less_than__24_999': 1,
        '_25_000_to__49_999': 2,
        '_50_000_to__99_999': 3,
        '_100_000_to__149_999': 4,
        '_150_000_to__199_999': 5
    }
}

In [None]:
survey_data = normalize_job_descriptions(CURRENT_DB, survey_data)

In [None]:
if CURRENT_DB == 'Stage_database':
    survey_data.income_category = survey_data.income_category.apply(
        lambda x: INCOME_DICT['Stage_database'][x]
    )
else:
    survey_data.income_category = survey_data.income_category.apply(
        lambda x: INCOME_DICT['Others'][x]
    )

In [None]:
from sklearn.preprocessing import OneHotEncoder

def generate_ohe_features(df, feature_name):
    ohe = OneHotEncoder()
    ohe.fit(df[[feature_name]])
    return pd.DataFrame(
        ohe.transform(df[[feature_name]]).todense(), 
        columns=ohe.get_feature_names_out(),
        index=df.index
    ), ohe

In [None]:
survey_data.reset_index(drop=True, inplace=True)

ohe_features = ['highest_education', 'primary_job_description', 'gender', 'age']

for ohe in ohe_features:
    df, _ = generate_ohe_features(survey_data, ohe)
    survey_data = survey_data.merge(right=df, left_index=True, right_index=True)

In [None]:
to_drop = [
    'Timestamp', 'gender', 'highest_education', 'primary_job_type', 'primary_job_description', 
    'primary_job_commute_mode', 'primary_job_commute_time', 'is_primary_job_flexible', 
    'primary_job_can_wfh', 'wfh_days', 'Which one below describe you best?', 'residence_ownership_type', 
    'residence_type', 'medical_condition_duration', 'has_multiple_jobs', 'age', '_id', 'data.ts',
    'primary_job_description_2', 'wfh_days', 'n_wfh_days', 'description', 'race_or_ethnicity', 
    'highest_education', 'is_transgender', 'medical_condition_duration'
]

for column in to_drop:
    if column in survey_data.columns:
        survey_data.drop(columns=[column], inplace=True)

## Merge sensed data and demographics

In [None]:
# Additional preprocessing to filter unwanted users from sensed trips data.
expanded_ct['user_id_join'] = expanded_ct['user_id'].apply(lambda x: str(x).replace('-', ''))
survey_data['user_id_join'] = survey_data['user_id'].apply(lambda x: str(x).replace('-', ''))

survey_data.rename(columns={'user_id': 'survey_user_id'}, inplace=True)

common = set(expanded_ct.user_id_join.unique()).intersection(
    set(survey_data.user_id_join.unique())
)

filtered_trips = expanded_ct.loc[expanded_ct.user_id_join.isin(common), :].reset_index(drop=True)
filtered_survey = survey_data.loc[survey_data.user_id_join.isin(common), :].reset_index(drop=True)

In [None]:
# Just to double-check.
print(len(filtered_trips.user_id.unique()), len(filtered_survey.survey_user_id.unique()))

In [None]:
# Compute the section_*_argmax.

def compute_argmax(db: str, row):
    
    if db != 'Stage_database':
    
        sections = row['inferred_section_summary']

        if pd.isna(sections) or len(sections) == 0 or len(sections['distance']) == 0:
            return row

        try:
            mode = sorted(sections['distance'].items(), key=lambda x: x[-1], reverse=True)[0][0]
            distance = sections['distance'][mode]
            duration = sections['duration'][mode]

            row['section_mode_argmax'] = mode
            row['section_distance_argmax'] = distance
            row['section_duration_argmax'] = duration

        except:
            row['section_mode_argmax'] = np.nan
            row['section_distance_argmax'] = np.nan
            row['section_duration_argmax'] = np.nan

        finally:
            return row
    else:
        
        try:
            distances = ast.literal_eval(row['section_distances'])
            durations = ast.literal_eval(row['section_durations'])
            modes = ast.literal_eval(row['section_modes'])

            argmax = np.argmax(distances)
            
            row['section_distance_argmax'] = distances[argmax]
            row['section_duration_argmax'] = durations[argmax]
            row['section_mode_argmax'] = modes[argmax]
            
        except:
            row['section_mode_argmax'] = np.nan
            row['section_distance_argmax'] = np.nan
            row['section_duration_argmax'] = np.nan
            
        finally:
            return row

In [None]:
filtered_trips.reset_index(drop=True, inplace=True)

### Available feature generation

In [None]:
available = {
    # AllCEO
    'Bicycle': 'p_micro',
    'Do not have vehicle': 'unknown',
    'Do not have vehicle ': 'unknown',
    'Get a ride from a friend or family member': 's_car',
    'None': 'no_trip',
    'Public transportation (bus, subway, light rail, etc.)': 'transit',
    'Rental car (including Zipcar/ Car2Go)': 'car',
    'Shared bicycle or scooter': 's_micro',
    'Skateboard': 'p_micro',
    'Taxi (regular taxi, Uber, Lyft, etc)': 'ridehail',
    'Walk/roll': 'walk',
    'Prefer not to say': 'unknown',
    # Others
    'public_transportation__bus__subway__ligh': 'transit',
    'get_a_ride_from_a_friend_or_family_membe': 's_car', 
    'bicycle': 'p_micro', 
    'walk': 'walk',
    'taxi__regular_taxi__uber__lyft__etc': 'ridehail',
    'rental_car__including_zipcar__car2go': 'car', 
    'prefer_not_to_say': 'unknown'
}

# We use the sensed mode to update the available modes.
# This is to account for any user data input errors. E.g.: user does not select car as available mode
# but the sensed mode is car.
section_mode_mapping = {
    'bicycling': ['p_micro', 's_micro'],
    'car': ['s_car', 'car', 'ridehail'],
    'no_sensed': ['unknown'],
    'walking': ['walk'],
    'unknown': ['unknown'],
    'transit': ['transit']
}

In [None]:
filtered_trips = filtered_trips.apply(lambda x: compute_argmax(CURRENT_DB, x), axis=1)

# Drop all rows where argmax mode == air
filtered_trips.drop(
    index=filtered_trips.loc[filtered_trips.section_mode_argmax.isin(['AIR_OR_HSR', 'air_or_hsr']),:].index, 
    inplace=True
)

filtered_trips.section_mode_argmax.replace({
    'subway': 'transit',
    'no_sensed': 'unknown',
    'train': 'transit',
    'TRAM': 'transit',
    'LIGHT_RAIL': 'transit',
    'CAR': 'car',
    'WALKING': 'walking',
    'BICYCLING': 'bicycling',
    'UNKNOWN': 'unknown',
    'TRAIN': 'transit',
    'SUBWAY': 'transit',
    'BUS': 'transit',
    'bus': 'transit'
}, inplace=True)

filtered_trips.dropna(subset='section_mode_argmax', inplace=True)

In [None]:
## Meters -> miles
filtered_trips['section_distance_argmax'] *= 0.000621371

## Seconds -> minutes
filtered_trips['section_duration_argmax'] /= 60.

## Total distance and duration are scaled too.
filtered_trips['distance'] *= 0.000621371
filtered_trips['duration'] /= 60.

In [None]:
filtered_trips = filtered_trips.merge(right=filtered_survey, left_on='user_id_join', right_on='user_id_join')

## Update available indicators

In [None]:
import itertools

new_cols = list(set(available.values()))
filtered_trips[new_cols] = 0

for user_id, user_trips in filtered_trips.groupby('user_id'):
    
    if CURRENT_DB == "Stage_database":
    
        # Get the set of available modes (demographics.)
        all_av_modes = user_trips['available_modes'].str.split(';').explode()
    else:
        # Get the set of available modes (demographics.)
        all_av_modes = user_trips['available_modes'].str.split().explode()
    
    # Get all sensed modes.
    all_sections = user_trips['section_mode_argmax'].unique()
    
    # Map to Common Normal Form.
    mapped_sections = set(list(itertools.chain.from_iterable([section_mode_mapping[x] for x in all_sections])))
    mapped_demo_av = set([available[x] for x in all_av_modes.unique()])
    
    # Perform a set union.
    combined = list(mapped_sections.union(mapped_demo_av))
    
    # Update dummy indicators.
    filtered_trips.loc[filtered_trips.user_id == user_id, combined] = 1

filtered_trips.rename(columns=dict([(c, 'av_'+c) for c in new_cols]), inplace=True)

### Cost estimation

In [None]:
# All values are taken from VTPI.
# https://www.vtpi.org/tca/tca0501.pdf
mode_cost_per_mile = {
    # bicycle/skateboard
    'p_micro': 0.,
    'no_trip': 0.,
    # Shared car is half the cost of regular car, which is $0.6/mile.
    's_car': 0.3,
    # Rental car.
    'car': 0.6,
    # Average of bus and train taken.
    'transit': 0.5,
    # Shared bicyle or scooter - values taken from https://nacto.org/shared-micromobility-2020-2021/ and 
    # https://www.mckinsey.com/industries/automotive-and-assembly/our-insights/how-sharing-the-road-is-likely-to-transform-american-mobility
    's_micro': 0.3,
    # uber/taxi/lyft
    'ridehail': 2.,
    'walk': 0.,
    'unknown': 0.
}

# Assumptions.
mode_init_cost = {
    'p_micro': 0.,
    'no_trip': 0.,
    # Shared car is half the cost of regular car, which is $0.6/mile.
    's_car': 0.,
    # Rental car.
    'car': 0.,
    # Average of bus and train taken.
    'transit': 0.,
    # $1 unlocking cost.
    's_micro': 1.,
    # uber/taxi/lyft
    'ridehail': 1.5,
    'walk': 0.,
    'unknown': 0.
}

In [None]:
def compute_cost_estimates(df: pd.DataFrame):
    
    # Create some extra colums.
    columns = [c.replace('av_', '') for c in df.columns if 'av_' in c]

    # Initialize the columns to 0.
    df[columns] = 0.

    rows = list()

    # Iterate over every row.
    for _, row in df.iterrows():
        # Check which flags are active.
        row_dict = row.to_dict()

        # Access the section_distance_argmax attribute for the distance. Note that this is now in miles.
        distance = row_dict['section_distance_argmax']
        
        # Mask using availability.
        for lookup in columns:
            row_dict[lookup] = row_dict['av_' + lookup] * (
                mode_init_cost[lookup] + (mode_cost_per_mile[lookup] * distance)
            )

        rows.append(row_dict)

    new_df = pd.DataFrame(rows)
    new_df.rename(columns=dict([(c, 'cost_'+c) for c in columns]), inplace=True)

    return new_df

In [None]:
filtered_trips = compute_cost_estimates(filtered_trips)

### Outlier removal

In [None]:
print(f"For {CURRENT_DB=}, before outlier removal, n_rows = {filtered_trips.shape[0]}")

In [None]:
# Drop instances where duration/distance is unusable.
filtered_trips.drop(
    index=filtered_trips.loc[(filtered_trips.section_distance_argmax <= 0) | (filtered_trips.section_duration_argmax <= 0), :].index,
    inplace=False
).reset_index(drop=True, inplace=True)


# bus, train, bicycling, walking, car
# split-apply-combine
def drop_outliers(df: pd.DataFrame, low=0.1, high=0.9) -> pd.DataFrame:
    
    def filter_by_percentiles(group):
        distance_low = group['section_distance_argmax'].quantile(low)
        distance_high = group['section_distance_argmax'].quantile(high)
        duration_low = group['section_duration_argmax'].quantile(low)
        duration_high = group['section_duration_argmax'].quantile(high)
        
        l1_filter = group[
            (group['section_distance_argmax'] >= distance_low) &
            (group['section_distance_argmax'] <= distance_high)
        ].reset_index(drop=True)
        
        l2_filter = l1_filter[
            (l1_filter['section_duration_argmax'] >= duration_low) &
            (l1_filter['section_duration_argmax'] <= duration_high)
        ].reset_index(drop=True)
        
        return l2_filter
    
    return df.groupby('section_mode_argmax').apply(filter_by_percentiles).reset_index(drop=True)

In [None]:
filtered_trips = drop_outliers(filtered_trips, low=0.01, high=0.99)

# Ideal speed. distance/time (in hours).
filtered_trips['mph'] = (
    (filtered_trips['section_distance_argmax'] * 60.)/filtered_trips['section_duration_argmax']
)

In [None]:
filtered_trips[['section_mode_argmax', 'section_duration_argmax', 'section_distance_argmax', 'mph']].head(10)

In [None]:
def filter_mph(df: pd.DataFrame, low=0.1, high=0.9) -> pd.DataFrame:
    
    MPH_THRESHOLDS = {
        # https://www.sciencedirect.com/science/article/pii/S2210670718304682
        'bicycling': 15.,
        # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7806575/
        'walking': 2.93
    }
    
    def custom_filter(group):
        # Drop data specified in the dict manually.
        if group.name in MPH_THRESHOLDS.keys():
            f_df = group[group['mph'] <= MPH_THRESHOLDS[group.name]]
        else:
            mph_low = group['mph'].quantile(low)
            mph_high = group['mph'].quantile(high)

            f_df = group[(group['mph'] >= mph_low) & (group['mph'] <= mph_high)]
        
        return f_df
    
    return df.groupby('section_mode_argmax').apply(custom_filter).reset_index(drop=True)

In [None]:
filtered_trips = filter_mph(filtered_trips, low=0.01, high=0.99)

In [None]:
filtered_trips.groupby('section_mode_argmax')[['section_distance_argmax', 'section_duration_argmax']].describe()

In [None]:
filtered_trips.groupby('section_mode_argmax')[['mph']].describe()

In [None]:
print(f"For {CURRENT_DB=}, After outlier removal, n_rows = {filtered_trips.shape[0]}")

In [None]:
to_drop=[
    '_id', 'additions', 'cleaned_section_summary', 'cleaned_trip', 'confidence_threshold', 
    'end_fmt_time', 'end_loc', 'end_local_dt_day', 'raw_trip', 'purpose_confirm',
    'end_local_dt_minute', 'end_local_dt_month', 'end_local_dt_second', 'end_local_dt_timezone', 
    'end_local_dt_weekday', 'end_local_dt_year', 'end_place', 'end_ts', 'expectation', 'expected_trip', 
    'inferred_labels', 'inferred_section_summary', 'inferred_trip', 'metadata_write_ts', 'mode_confirm', 
    'section_durations', 'section_modes', 'source', 'start_fmt_time', 'start_loc', 'start_local_dt_day', 
    'start_local_dt_minute', 'start_local_dt_month', 'start_local_dt_second', 
    'start_local_dt_timezone', 'start_local_dt_weekday', 'start_local_dt_year', 'start_place', 
    'start_ts', 'user_id_join', 'user_input', 'survey_user_id', 'section_distances',
    'data.local_dt.year', 'data.local_dt.month', 'data.local_dt.day', 'data.local_dt.hour', 
    'data.local_dt.minute', 'data.local_dt.second', 'data.local_dt.weekday', 'data.local_dt.timezone',
    'data.fmt_time'
]

for col in to_drop:
    if col in filtered_trips.columns:
        filtered_trips.drop(columns=[col], inplace=True)

In [None]:
filtered_trips.rename({'start_local_dt_hour': 'start:hour', 'end_local_dt_hour': 'end:hour'}, inplace=True)

In [None]:
print(filtered_trips.columns.tolist())

In [None]:
display(filtered_trips.head())

In [None]:
print(f"Done processing for {CURRENT_DB=}, Number of unique users: {len(filtered_trips.user_id.unique())}")

In [None]:
targets = ['p_micro', 'no_trip', 's_car', 'transit', 'car', 's_micro', 'ridehail', 'walk', 'unknown']

# Rename and map targets.
filtered_trips.rename(columns={'replaced_mode': 'target'}, inplace=True)
filtered_trips.replace({'target': {t: ix+1 for ix, t in enumerate(targets)}}, inplace=True)

In [None]:
# savepath = Path('./data/filtered_data')
savepath = Path('./data/filtered_data')

if not savepath.exists():
    savepath.mkdir()

filtered_trips.to_csv(savepath / f'preprocessed_data_{CURRENT_DB}.csv', index=False)