# Request Generation Utility Calculation Notebook

This notebook contains the implementation of utility calculations based on origin-destination travel data.

The full implemention is `helpers/utility_calculation.py`.

In [1]:
import os
import ast
import numpy as np
import pandas as pd
from typing import Tuple
# Set working directory
while os.path.basename(os.getcwd()).lower() != 'carsharingmodelcasestudy':
    os.chdir('..')
assert os.path.basename(os.getcwd()).lower() == 'carsharingmodelcasestudy', os.getcwd()

In [2]:
# Load preprocessed data
preprocessed_data = pd.read_csv('./requests/od_travel_data.csv', sep=';', encoding='utf-8')

# Parameters
L = [0, 1, 2, 5] # pricing levels, euro
P_CS = 0.5  # Per-minute fee
M = ['Walk', 'Bike', 'PublicTransit'] # modes of transport

# variable
mode_choice_model = "MNL"

beta_C_k = -1  # Cost sensitivity
beta_V_k = -0.3  # Vehicle time sensitivity
beta_P_k = -0.35  # Public transit time sensitivity
beta_A_k = -0.4  # Access time sensitivity
beta_W_k = -0.6  # Walk time sensitivity
beta_B_k = -0.5  # Bicycle time sensitivity

walking_speed = 5 # Assume walking speed 5 km/t
biking_speed = 15 # Assume biking speed of 15 km/t
biking_cost_per_km = 0.05 # Assume cost per km for bike ownership
driving_speed = 45 # Assume driving speed of 45 km/t

def T_A_bike_k():
    return np.random.uniform(0, 2) # time for k to access bike

In [3]:
def travel_time(distance_in_meters, travel_speed_kmh) -> float:
    """Distance in meters, travel speed in km/h -> time in minutes"""
    time = (( distance_in_meters / 1000 ) / travel_speed_kmh ) * 60
    return time

In [4]:
# Utility of alternative transportation methods
def utility_given_mode(mode: str, mode_data: Tuple, verbose=False) -> float:
    """Returns utility for given mode given a customer request
    input:
    - mode: str, mode of transport
    - mode_data: relevant for mode
    """
    if mode == "Walk":
        walking_distance_value = mode_data
        walking_time = travel_time(walking_distance_value, walking_speed)
        # Calculate utilities
        utility_walking_time_sensitivity = beta_W_k * walking_time
        return utility_walking_time_sensitivity
    elif mode == "Bike":
        walking_distance_value = mode_data
        biking_time = travel_time(walking_distance_value, biking_speed) 
        biking_cost =  biking_cost_per_km * walking_distance_value * 10**(-3) # convert to km
        # Calculate utilities
        access_time_1 = T_A_bike_k()
        access_time_2 = T_A_bike_k()
        print(f"bike access times {(access_time_1, access_time_2)}") if verbose else None
        utility_bicycle_access_time_sensitivity = beta_A_k * (access_time_1 + access_time_2)
        utility_bicycle_cost_sensitivity = beta_C_k * biking_cost
        utility_bicycle_time_sensitivity = beta_B_k * biking_time
        print(f"bike utility: {(utility_bicycle_access_time_sensitivity, utility_bicycle_cost_sensitivity, utility_bicycle_time_sensitivity)}") if verbose else None
        return utility_bicycle_time_sensitivity + utility_bicycle_cost_sensitivity + utility_bicycle_access_time_sensitivity
    elif mode == "PublicTransit":
        public_travel_price, public_travel_time, public_travel_access_time, public_travel_transfer_time = mode_data
        utility_cost_sensitivity = beta_C_k * public_travel_price
        utility_time_sensitivity = beta_P_k * public_travel_time
        utility_access_time_sensitivity = beta_A_k * public_travel_access_time + public_travel_transfer_time
        print((utility_cost_sensitivity, utility_time_sensitivity, utility_access_time_sensitivity)) if verbose else None
        return utility_cost_sensitivity + utility_time_sensitivity + utility_access_time_sensitivity
    else:
        raise ValueError("Invalid mode")

def calculate_utility_for_modes(alternative_trans_data, verbose = False):
    """Returns the utility for each mode given a customer request"""
    walking_distance_value = alternative_trans_data['walking_distance_value']
    public_travel_price = alternative_trans_data['public_travel_price']
    public_travel_time = alternative_trans_data['public_travel_time']
    public_travel_access_time = alternative_trans_data['public_travel_access_time']
    public_travel_transfer_time = alternative_trans_data['public_travel_transfer_time']
    print(f"calculating utilities for model using walking distance {walking_distance_value}") if verbose else None
    U_bar_mk = {} # Dict to store utility for each mode
    for mode in M:
        if mode == 'PublicTransit':
            U_bar_mk[mode] = utility_given_mode(mode, (public_travel_price, public_travel_time, public_travel_access_time, public_travel_transfer_time), verbose)
        elif mode == 'Bike':
            U_bar_mk[mode] = utility_given_mode(mode, (walking_distance_value), verbose)
        elif mode == 'Walk':
            U_bar_mk[mode] = utility_given_mode(mode, (walking_distance_value), verbose)
        print(f"mode {mode}, utility {U_bar_mk.get(mode, None)}") if verbose else None
    return U_bar_mk

In [5]:
def utility_kij(travel_data: Tuple[float, float, float]) -> Tuple[float, float, float]:
    """ Calculate utility for car-sharing service given travel data"""
    # Extract travel data
    walking_dist_from_origin_to_station, request_driving_dist, walking_dist_from_station_to_destination = travel_data
    # Calculate and return utility
    utility_cost_sensitivity = beta_C_k * P_CS * travel_time(request_driving_dist, driving_speed)
    utility_time_sensitivity = beta_V_k * travel_time(request_driving_dist, driving_speed)
    utility_access_time_sensitivity = beta_A_k * (travel_time(walking_dist_from_origin_to_station, walking_speed) + travel_time(walking_dist_from_station_to_destination, walking_speed))
    return (utility_access_time_sensitivity, utility_time_sensitivity, utility_cost_sensitivity)

def utility_kijl(utility_kij: Tuple[float, float, float], _pricing_level: float) -> float:
    """ Calculate utility for car-sharing service given travel data 
    input: Tuple with utility_access_time_sensitivity, utility_time_sensitivity, utility_cost_sensitivity"""
    # Extract travel data
    utility_access_time_sensitivity, utility_time_sensitivity, utility_cost_sensitivity = utility_kij
    # Calculate and return utility
    utility_cost_sensitivity += beta_C_k * (_pricing_level)
    return sum((utility_access_time_sensitivity, utility_time_sensitivity, utility_cost_sensitivity))

In [6]:
def maximum_utility_for_modes(U_bar_mk, verbose = False):
    """Returns the best mode and utility for a given customer request"""
    print(U_bar_mk) if verbose else None
    best_mode = max(U_bar_mk, key=U_bar_mk.get)
    return best_mode, U_bar_mk[best_mode]

In [7]:
def calculate_maximum_utilities_with_carsharing(
    i_stations: list, j_stations: list, walking_dist_1: list, driving_dist: list, walking_dist_2: list):
    u_kijl = {}
    for i_station in i_stations:
        for j_station in j_stations:
            walking_time_1_value = walking_dist_1.pop(0)
            driving_time_value = driving_dist.pop(0)
            walking_time_2_value = walking_dist_2.pop(0)
            CS_rij_travel_data = (walking_time_1_value, driving_time_value, walking_time_2_value)
            for l in L:
                trip_utility = [utility_kijl(utility_kij(CS_rij_travel_data), l), i_station[1], j_station[1]] # put coordinates in output
                if u_kijl.get(l) is None or trip_utility[0] > u_kijl[l][0]:
                    u_kijl[l] = trip_utility
    return u_kijl

In [8]:
def calculate_MNL_probabilities(utility_dict):
    exp_utility_dict = np.exp(list(utility_dict.values()))
    sum_exp_utility_dict = np.sum(exp_utility_dict)
    probabilities = {
        mode: exp_u / sum_exp_utility_dict
        for mode, exp_u in zip(utility_dict.keys(), exp_utility_dict)
    }
    return probabilities

In [9]:
def single_user_choices_MNL(u_kijl, U_bar_mk, m_star, verbose=False):
    """ Calculate feasible pricing levels using MNL model. We test at drop-off fee equal 0. If carsharing is chosen, the user will continue to choose carsharing unless the best alternative is better
    """
    def calculate_MNL_probabilities(utility_dict):
        exp_utility_dict = np.exp(list(utility_dict.values()))
        sum_exp_utility_dict = np.sum(exp_utility_dict)
        probabilities = {
            mode: exp_u / sum_exp_utility_dict
            for mode, exp_u in zip(utility_dict.keys(), exp_utility_dict)
        }
        return probabilities
    # Calculate probabilities
    comparison_dict = {mode: U_bar_mk[mode] for mode in M}
    comparison_dict.update({"Carsharing": u_kijl[L[L.index(0)]][0]})
    probabilities = calculate_MNL_probabilities(comparison_dict)
    initial_choice = np.random.choice(list(probabilities.keys()), p=list(probabilities.values()))
    print(f"Initial choice: {initial_choice}") if verbose else None
    L_kil = (
        {0: u_kijl[L[L.index(0)]]} | {l: u_kijl[l] for l in L if u_kijl[l][0] > m_star[1]}
        if initial_choice == "Carsharing"
        else {})
    return L_kil
                              
# logical loop for each possible user
def single_user_choice_simulation(request_row, verbose = False):
    alternative_trans = request_row.alternative_transportation_data
    i_stations = request_row.i_stations
    j_stations = request_row.j_stations
    walking_dist_1 = request_row.walking_dist_1_list
    driving_dist = request_row.driving_dist_list
    walking_dist_2 = request_row.walking_dist_2_list
    # Calculate utilities for cs and modes
    u_kijl = calculate_maximum_utilities_with_carsharing(i_stations, j_stations, walking_dist_1, driving_dist, walking_dist_2) # utility and station at each level
    U_bar_mk = calculate_utility_for_modes(alternative_trans, verbose) # utility for modes
    m_star = maximum_utility_for_modes(U_bar_mk, verbose) # best alternative
    if mode_choice_model == 'max':
        # For simple maximization, calculate feasibility at each pricing level
        L_kil = {l: u_kijl[l] for l in L if u_kijl[l][0] > m_star[1]}
    elif mode_choice_model == 'MNL':
        # for MNL, calculate choice probabilities
        L_kil = single_user_choices_MNL(u_kijl, U_bar_mk, m_star, verbose)
    else:
        raise ValueError("Invalid mode choice model")
    print(f"L_kil {L_kil}") if verbose else None
    return L_kil, m_star
    

In [10]:
def preprocess_requests(requests, verbose = False):
    B_r = {} # Set of all feasible combinations for all requests
    m_star = {} # Set of all maximum alternative utilities for all requests
    for r_idx, row in enumerate(requests.itertuples()):
        B_r_k, m_star_k = single_user_choice_simulation(row, verbose)
        B_r[r_idx] = B_r_k
        m_star_k_dict = {
            'origin': row.origin_node,
            'destination': row.destination_node,
            'best_alternative_mode': m_star_k[0],
            'best_alternative_utility': m_star_k[1]
        }
        m_star[r_idx] = m_star_k_dict
    return B_r, m_star

def format_Br_df(res_B_r):
    # Create dataframe with feasible combinations
    records = []
    for outer_key, inner_dict in res_B_r.items():
        for fee, details in inner_dict.items():
            # details[0] is the numeric value, details[1] and details[2] are coordinate tuples
            utility = details[0]
            coord1_lat, coord1_lon = details[1]
            coord2_lat, coord2_lon = details[2]
            records.append({
                'id': outer_key,
                'fee': fee,
                'utility': utility,
                'station_start_lat': coord1_lat,
                'station_start_lon': coord1_lon,
                'station_end_lat': coord2_lat,
                'station_end_lon': coord2_lon
            })
    feasible_combinations = pd.DataFrame(records)
    return feasible_combinations

In [11]:
# process requests
preprocessed_data_subset = preprocessed_data.sample(10)

import ast

def parse_to_list(value):
    if isinstance(value, str):
        try:
            return ast.literal_eval(value)
        except (ValueError, SyntaxError):
            return []
    return value if isinstance(value, list) else []

def parse_to_dict(value):
    if isinstance(value, str):
        try:
            return ast.literal_eval(value)
        except (ValueError, SyntaxError):
            return {}
    return value if isinstance(value, dict) else {}

columns_that_are_lists = ['i_stations', 'j_stations', 'walking_dist_1_list', 'driving_dist_list', 'walking_dist_2_list']
columns_that_are_dicts = ['alternative_transportation_data']

preprocessed_data_subset = preprocessed_data.sample(100)

for col in columns_that_are_lists:
    preprocessed_data_subset[col] = preprocessed_data_subset[col].apply(parse_to_list)
for col in columns_that_are_dicts:
    pass
    preprocessed_data_subset[col] = preprocessed_data_subset[col].apply(parse_to_dict)

In [12]:
# test function
res_B_r, res_m_star = preprocess_requests(preprocessed_data_subset, verbose=False)
feasible_combinations = format_Br_df(res_B_r)
try:
    print(len(feasible_combinations['id'].unique()))
except:
    pass

In [None]:
res_B_r

## Inspect individual steps below