# Flows Estimation with KDE

Estimate hourly inward and outward flows with KDE sampling for each zone.

In [60]:
import itertools as it
import numpy as np
import os
import pandas as pd
import pathlib
import pickle

from sklearn.base import BaseEstimator
from sklearn.metrics import mean_squared_error as mse
from sklearn.neighbors import KernelDensity

from typing import Any
from typing import Dict
from typing import List
from typing import Union

In [2]:
Confs = {
    'CITY': 'Louisville',
}

In [3]:
root_dir = pathlib.Path(os.path.abspath('')) \
                  .absolute().parent

data_model_dir = root_dir                 \
                     / 'odysseus'         \
                     / 'demand_modelling' \
                     / 'demand_models'    \
                     / Confs['CITY']

In [4]:
bookings_path = data_model_dir / 'bookings_train.csv'

In [5]:
df_bookings = pd.read_csv(str(bookings_path))

df_bookings.head()

Unnamed: 0.1,Unnamed: 0,start_time,end_time,duration,start_latitude,start_longitude,end_latitude,end_longitude,driving_distance,start_year,...,random_seconds_pos,ia_timeout,avg_speed,avg_speed_kmh,origin_id,destination_id,origin_i,origin_j,destination_i,destination_j
0,2,2018-12-31 23:49:17.581132-05:00,2019-01-01 00:05:36.900355-05:00,979.319223,38.22,-85.76,38.22,-85.763,434.43,2019,...,336.896992,192.829797,0.443604,1.596975,456,417,27.0,11.0,27.0,10.0
1,6,2019-01-01 01:33:14.162158-05:00,2019-01-01 01:51:05.576168-05:00,1071.41401,38.218,-85.765,38.217,-85.761,1415.92,2019,...,384.622233,521.035049,1.321543,4.757556,379,418,28.0,9.0,28.0,10.0
2,8,2019-01-01 02:44:21.372951-05:00,2019-01-01 02:58:16.793564-05:00,835.420613,38.22,-85.763,38.218,-85.763,160.9,2019,...,202.99289,507.752422,0.192598,0.693351,417,418,27.0,10.0,28.0,10.0
3,13,2019-01-01 04:13:02.633378-05:00,2019-01-01 04:21:15.675303-05:00,493.041925,38.235,-85.76,38.246,-85.756,1560.73,2019,...,389.884107,698.793951,3.165512,11.395842,447,519,18.0,11.0,12.0,13.0
4,18,2019-01-01 05:22:05.078119-05:00,2019-01-01 05:31:56.247410-05:00,591.169291,38.252,-85.756,38.243,-85.755,1110.21,2019,...,84.395055,2878.150709,1.87799,6.760764,516,521,9.0,13.0,14.0,13.0


Extract all the zone Ids

In [6]:
dest_zone_ids = set(df_bookings['destination_id'].unique())
orig_zone_ids = set(df_bookings['origin_id'].unique())

zone_ids = dest_zone_ids.union(orig_zone_ids)
zone_ids = list(zone_ids)

In [7]:
sorted(list(df_bookings.columns))

['Unnamed: 0',
 'avg_speed',
 'avg_speed_kmh',
 'city',
 'date',
 'day',
 'daytype',
 'destination_i',
 'destination_id',
 'destination_j',
 'driving_distance',
 'duration',
 'end_day',
 'end_daytype',
 'end_hour',
 'end_latitude',
 'end_longitude',
 'end_month',
 'end_time',
 'end_weekday',
 'end_year',
 'euclidean_distance',
 'geometry',
 'hour',
 'ia_timeout',
 'month',
 'origin_i',
 'origin_id',
 'origin_j',
 'random_seconds_end',
 'random_seconds_pos',
 'random_seconds_start',
 'start_day',
 'start_daytype',
 'start_hour',
 'start_latitude',
 'start_longitude',
 'start_month',
 'start_time',
 'start_weekday',
 'start_year',
 'year']

Extract the outward flows for each daytype for each hour for each zone Id

In [8]:
hourly_zone_out_flows = {}

In [9]:
for daytype, daytype_df in df_bookings.groupby('start_daytype'):
    hourly_zone_out_flows[daytype] = {}
    
    for hour, hour_df in daytype_df.groupby('start_hour'):
        hourly_zone_out_flows[daytype][hour] = {}
        
        for zone, zone_df in hour_df.groupby('origin_id'):
            out_flows = zone_df[['start_day', 'start_time']] \
                        .groupby('start_day').count()        \
                        .rename(columns={'start_time': 'out_flow_vec'})
            
            hourly_zone_out_flows[daytype][hour][zone] = out_flows['out_flow_vec'] \
                                                        .values.tolist()

Extract the inward flows for each daytype for each hour for each zone Id

In [10]:
hourly_zone_in_flows = {}

In [11]:
for daytype, daytype_df in df_bookings.groupby('end_daytype'):
    hourly_zone_in_flows[daytype] = {}
    
    for hour, hour_df in daytype_df.groupby('end_hour'):
        hourly_zone_in_flows[daytype][hour] = {}
        
        for zone, zone_df in hour_df.groupby('destination_id'):
            in_flows = zone_df[['end_day', 'end_time']] \
                       .groupby('end_day').count()      \
                       .rename(columns={'end_time': 'in_flow_vec'})
            
            hourly_zone_in_flows[daytype][hour][zone] = in_flows['in_flow_vec'] \
                                                        .values.tolist()

Fill N/A with the appropriate data

In [12]:
for daytype in ['weekday', 'weekend']:
    if daytype not in hourly_zone_out_flows:
        hourly_zone_out_flows[daytype] = {}
        
    if daytype not in hourly_zone_in_flows:
        hourly_zone_in_flows[daytype]  = {}
            
    for hour in range(24):
        if hour not in hourly_zone_out_flows[daytype]:
            hourly_zone_out_flows[daytype][hour] = {}
            
        if hour not in hourly_zone_in_flows[daytype]:
            hourly_zone_in_flows[daytype][hour]  = {}
                
        # Fill in each leftover zone Id with
        # an irrelevant [0] entry
        for zone in zone_ids:
            if zone not in hourly_zone_out_flows[daytype][hour]:
                hourly_zone_out_flows[daytype][hour][zone] = [0]
                
            if zone not in hourly_zone_in_flows[daytype][hour]:
                hourly_zone_in_flows[daytype][hour][zone]  = [0]

Aggregate inward and outward flows together for each daytype for each hour for each zone Id. Ideally we would like to use KDE to estimate each pair of inward and outward flows with a normal density, but in order to do so we should make sure that we have a reasonable sample size $N$ for each function to let KDE reliably estimate a standard multivariate normal density with a small MSE as desired.

So, how can we choose $N$? We decided to follow the guidelines presented in the chapter *"Required sample size for given accuracy"* of the book "Density Estimation for Statistics and Data Analysis, B. W. Silverman, 1986". Silverman established that the sample size $N$ required to ensure that the relative MSE at zero is less than $0.1$ (accurate to about 3 significant figures) when estimating a standard multivariate normal density is the following:

![KDE required sample size](images/KDE-sample-size.jpg)

We want to estimate a 2-dimensional multivariate density (inward and outward flow), therefore we fix $N$ to the reported $19$. Since we don't expect all the distributions, or rather not even most of them, to have at least $N$ samples, what should we do about those? A simple workoaround could be to fall back to the baseline average if a distribution doesn't satisfy the $N$-sample requirement, otherwise let KDE estimate a proper density.

**What is KDE?**

Kernel Density Estimation (KDE) is a non-parametric method for estimating the probability density function of a given random variable. It is also referred to by its traditional name, the *Parzen-Rosenblatt* method, after its discoverers.

Given a sample $\hat{X}$ of independent, identically distributed (i.i.d) observations $(x_1, x_2, ..., x_n)$ of a random variable drawn from an unknown source distribution, the kernel density estimate is given by the formula:

$$
p(x) = \frac{1}{nh} \sum_{j=1}^{n} K \left( \frac{x - x_j}{h} \right)
$$


where $n$ is the sample size, $K(a)$ is the kernel function and $h$ is the smoothing parameter, also called the *bandwidth*.

In [13]:
N = 19

In [101]:
class AverageDensity(BaseEstimator):
    """
    Dummy method for probability density function estimation that always samples the distribution average.
    """
    def __init__(self):
        self.avg_X = None
    
    def fit(self, X: Union[List, np.ndarray]):
        if type(X) == list:
            X = np.asarray(X)
            
        self.avg_X = X.mean(axis=0)
        return self
    
    def sample(self):
        return np.asarray([self.avg_X])

In [102]:
def enough_samples(distributions: List[List],
                   N: int) -> bool:
    """
    Check whether all the submitted distributions have a big enough sample size.
    """
    for dist in distributions:
        if len(dist) < N:
            return False
        
    return True

In [103]:
def take(distribution: List, N: int,
         _sorted: bool = True) -> List:
    X = np.asarray(distribution)
    
    # Extract N indices
    I = np.random.choice(len(X), size=N,
                         replace=False)
    
    if _sorted:
        I = sorted(I)

    return X[I].tolist()

In [104]:
def get_kde_model(distributions: List[List],
                  N: int) -> BaseEstimator:
    out_f_X, in_f_X = distributions
    
    # Downsample all the distribution to N size
    # by randomly drawing N samples from each of them
    downs_out_f_X = take(out_f_X, N)
    downs_in_f_X  = take(in_f_X, N)
    
    X = list(it.zip_longest(downs_out_f_X, downs_in_f_X,
                            fillvalue=0))
    
    return KernelDensity().fit(X)

In [105]:
def get_avg_flows_model(distributions: List[List]) \
                       -> BaseEstimator:
    out_f_X, in_f_X = distributions
    
    X = list(it.zip_longest(out_f_X, in_f_X,
                            fillvalue=0))
    
    return AverageDensity().fit(X)

In [19]:
hourly_zone_flows = {}

for daytype in ['weekday', 'weekend']:
    hourly_zone_flows[daytype] = {}
            
    for hour in range(24):
        hourly_zone_flows[daytype][hour] = {}
                
        for zone in zone_ids:
            hourly_out_flow = hourly_zone_out_flows[daytype][hour][zone]
            hourly_in_flow  = hourly_zone_in_flows[daytype][hour][zone]
            
            distributions = [
                hourly_out_flow,
                hourly_in_flow
            ]
            
            if enough_samples(distributions, N):
                flows_model = get_kde_model(distributions, N)
            else:
                flows_model = get_avg_flows_model(distributions)
                
            hourly_zone_flows[daytype][hour][zone] = flows_model

In [20]:
hourly_zone_flows

{'weekday': {0: {1029: AverageDensity(),
   1069: AverageDensity(),
   1109: AverageDensity(),
   1176: AverageDensity(),
   1187: AverageDensity(),
   1188: AverageDensity(),
   168: AverageDensity(),
   1228: AverageDensity(),
   244: AverageDensity(),
   245: AverageDensity(),
   264: AverageDensity(),
   269: AverageDensity(),
   279: AverageDensity(),
   1304: AverageDensity(),
   283: AverageDensity(),
   284: AverageDensity(),
   285: AverageDensity(),
   296: AverageDensity(),
   297: AverageDensity(),
   298: AverageDensity(),
   299: AverageDensity(),
   303: AverageDensity(),
   304: AverageDensity(),
   317: AverageDensity(),
   319: AverageDensity(),
   1343: AverageDensity(),
   322: AverageDensity(),
   323: AverageDensity(),
   324: AverageDensity(),
   328: AverageDensity(),
   329: AverageDensity(),
   330: AverageDensity(),
   337: AverageDensity(),
   338: AverageDensity(),
   339: AverageDensity(),
   341: AverageDensity(),
   342: AverageDensity(),
   343: Average

Serialize the flows models with Pickle

In [21]:
hourly_zone_flows_path = data_model_dir / 'hourly_zone_flows.pickle'

In [22]:
with open(str(hourly_zone_flows_path), 'wb') as pfn:
    pickle.dump(hourly_zone_flows, pfn)

Deserialize the flows models with Pickle

In [23]:
with open(str(hourly_zone_flows_path), 'rb') as pfn:
    dese_hourly_zone_flows = pickle.Unpickler(pfn).load()

In [24]:
daytype = 'weekend'
hour = 18
zone = 500

In [25]:
dese_hourly_zone_flows[daytype][hour][zone].sample()

array([0., 0.])

## Inward and outward baseline RMSE

In the following section we will compute for both the cities of Louisville and Austin the RMSE between the actuall inward and outward flows, $D(t, z)$ and $O(t, z)$ respectively, per zone, $z$, and per hour, $t$, and the predicted ones within the 08/2018 to 08/2019 timespan following three different strategies:

- Fully average baseline;
- Mixed average-KDE baseline with sample size, $n = 4$;
- Mixed average-KDE baseline with sample size, $n = 19$.

**Please note:** This sections assumes that ODySSEUS' demand modelling has been already executed for both cities.

In [215]:
CityFlow = Dict[str, Dict[int, Dict[int, Union[List[int], float, BaseEstimator]]]]

In [107]:
def load_df_bookings(city: str):
    """
    Load the bookings DataFrame for a given city.
    """
    root_dir = pathlib.Path(os.path.abspath('')) \
                      .absolute().parent

    data_model_dir = root_dir                 \
                         / 'odysseus'         \
                         / 'demand_modelling' \
                         / 'demand_models'    \
                         / city

    bookings_path = data_model_dir / 'bookings_train.csv'
    
    return pd.read_csv(str(bookings_path))

In [108]:
def extract_zone_ids(df_bookings: pd.DataFrame):
    """
    Extract all the zone Ids of a city given its bookings DataFrame.
    """
    dest_zone_ids = set(df_bookings['destination_id'].unique())
    orig_zone_ids = set(df_bookings['origin_id'].unique())

    zone_ids = dest_zone_ids.union(orig_zone_ids)
    
    return list(zone_ids)

In [109]:
def get_hourly_zone_in_flows(df_bookings: pd.DataFrame):
    """
    Compute the inward flows for each daytype for each hour for each zone Id.
    """
    hourly_zone_in_flows = {}
    
    for daytype, daytype_df in df_bookings.groupby('end_daytype'):
        hourly_zone_in_flows[daytype] = {}

        for hour, hour_df in daytype_df.groupby('end_hour'):
            hourly_zone_in_flows[daytype][hour] = {}

            for zone, zone_df in hour_df.groupby('destination_id'):
                in_flows = zone_df[['end_day', 'end_time']] \
                           .groupby('end_day').count()      \
                           .rename(columns={'end_time': 'in_flow_vec'})

                hourly_zone_in_flows[daytype][hour][zone] = in_flows['in_flow_vec'] \
                                                            .values.tolist()
                
    return hourly_zone_in_flows

In [110]:
def get_hourly_zone_out_flows(df_bookings: pd.DataFrame):
    """
    Compute the outward flows for each daytype for each hour for each zone Id.
    """
    hourly_zone_out_flows = {}
    
    for daytype, daytype_df in df_bookings.groupby('start_daytype'):
        hourly_zone_out_flows[daytype] = {}

        for hour, hour_df in daytype_df.groupby('start_hour'):
            hourly_zone_out_flows[daytype][hour] = {}

            for zone, zone_df in hour_df.groupby('origin_id'):
                out_flows = zone_df[['start_day', 'start_time']] \
                            .groupby('start_day').count()        \
                            .rename(columns={'start_time': 'out_flow_vec'})

                hourly_zone_out_flows[daytype][hour][zone] = out_flows['out_flow_vec'] \
                                                            .values.tolist()

    return hourly_zone_out_flows

In [111]:
def fillna_flows(in_flows: CityFlow, out_flows: CityFlow,
                 zone_ids: List[int], fillvalue: int = 0):
    for daytype in ['weekday', 'weekend']:
        if daytype not in out_flows:
            out_flows[daytype] = {}

        if daytype not in in_flows:
            in_flows[daytype]  = {}

        for hour in range(24):
            if hour not in out_flows[daytype]:
                out_flows[daytype][hour] = {}

            if hour not in in_flows[daytype]:
                in_flows[daytype][hour]  = {}

            # Fill in each leftover zone Id with
            # an irrelevant [0] entry
            for zone in zone_ids:
                if zone not in out_flows[daytype][hour]:
                    out_flows[daytype][hour][zone] = [fillvalue]

                if zone not in in_flows[daytype][hour]:
                    in_flows[daytype][hour][zone]  = [fillvalue]
                    
    return in_flows, out_flows

In [192]:
def get_hourly_zone_flows(city: str, return_df: bool = False):
    _df_bookings = load_df_bookings(city)
    _zone_ids = extract_zone_ids(_df_bookings)
    
    _in_flows  = get_hourly_zone_in_flows(_df_bookings)
    _out_flows = get_hourly_zone_out_flows(_df_bookings)
    
    _in_flows, _out_flows = fillna_flows(_in_flows,
                                         _out_flows,
                                         _zone_ids)
    
    def compute_flows(fullyavg: bool = False,
                      _N: int = 19):
        _hourly_zone_flows = {}

        for daytype in ['weekday', 'weekend']:
            _hourly_zone_flows[daytype] = {}

            for hour in range(24):
                _hourly_zone_flows[daytype][hour] = {}

                for zone in _zone_ids:
                    hourly_out_flow = _out_flows[daytype][hour][zone]
                    hourly_in_flow  = _in_flows[daytype][hour][zone]

                    distributions = [
                        hourly_out_flow,
                        hourly_in_flow
                    ]
                    
                    if fullyavg:
                        flows_model = get_avg_flows_model(distributions)
                    else:
                        if enough_samples(distributions, _N):
                            flows_model = get_kde_model(distributions, _N)
                        else:
                            flows_model = get_avg_flows_model(distributions)

                    _hourly_zone_flows[daytype][hour][zone] = flows_model
                    
        return _hourly_zone_flows
                
    if return_df:
        return compute_flows, _df_bookings
    else:
        return compute_flows, None

In [113]:
def get_flow_vecs(df_bookings: pd.DataFrame, fillvalue: Any = None):
    df_in_flow_vec = df_bookings[['end_year', 'end_month',
                        'end_day', 'end_hour',
                        'end_daytype', 'destination_id',
                        'driving_distance']]                    \
                      .rename(columns={
                          'end_year': 'year',
                          'end_month': 'month',
                          'end_day': 'day',
                          'end_hour': 'hour',
                          'end_daytype': 'daytype',
                          'destination_id': 'zone_id'
                                      })                        \
                      .groupby(['year', 'month', 'day', 'hour',
                                'daytype','zone_id']).count()   \
                      .rename(columns={
                          'driving_distance': 'in_flow_vec',
                                      })
    
    df_out_flow_vec = df_bookings[['start_year', 'start_month',
                        'start_day', 'start_hour',
                        'start_daytype', 'origin_id',
                        'driving_distance']]                    \
                      .rename(columns={
                          'start_year': 'year',
                          'start_month': 'month',
                          'start_day': 'day',
                          'start_hour': 'hour',
                          'start_daytype': 'daytype',
                          'origin_id': 'zone_id'
                                      })                        \
                      .groupby(['year', 'month', 'day', 'hour',
                                'daytype','zone_id']).count()   \
                      .rename(columns={
                          'driving_distance': 'out_flow_vec',
                                      })
    
    df_flow_vecs = df_in_flow_vec.merge(df_out_flow_vec,
                                        left_index=True,
                                        right_index=True,
                                        how='outer')
    
    return df_flow_vecs if fillvalue is None \
           else df_flow_vecs.fillna(fillvalue)

In [216]:
def get_flow_estimation_rmse(df_bookings: pd.DataFrame,
                             flows_generators: Dict[str, CityFlow],
                             fillvalue: Any = None):
    df_flow_vecs = get_flow_vecs(
        df_bookings, fillvalue)
    
    flow_preds = {k: ([], []) for k in flows_generators.keys()}
    flow_gt    = ([], [])
    
    for idx, row in df_flow_vecs.iterrows():
        daytype = idx[4]
        hour = idx[3]
        zone_id = idx[5]

        in_flow = row['in_flow_vec']
        out_flow = row['out_flow_vec']

        if not np.isnan(in_flow):
            flow_gt[0].append(in_flow)

        if not np.isnan(out_flow):
            flow_gt[1].append(out_flow)

        for k, flow_gene in flows_generators.items():
            pred_flows = flow_gene[daytype][hour][zone_id].sample()

            pred_out_flow = pred_flows[0][0]
            pred_in_flow  = pred_flows[0][1]

            if not np.isnan(in_flow):
                flow_preds[k][0].append(pred_in_flow)

            if not np.isnan(out_flow):
                flow_preds[k][1].append(pred_out_flow)
                
    flow_rmses = {}

    for i, tag in enumerate(['In-flow', 'Out-flow']):
        flow_rmses[tag] = {}

        for k, flow_pred in flow_preds.items():
            flow_rmses[tag][k] = mse(flow_gt[i], flow_pred[i],
                                     squared=False)
            
    return flow_rmses

### Louisville

In [193]:
data_louisville = get_hourly_zone_flows('Louisville', True)

flows_generator_louisville = data_louisville[0]
df_louisville_bookings     = data_louisville[1]

In [194]:
flows_louisville_fullyavg = flows_generator_louisville(True)

In [195]:
flows_louisville_mixed4   = flows_generator_louisville(False, 4)

In [196]:
flows_louisville_mixed19  = flows_generator_louisville(False, 19)

In [217]:
flows_gene_louisville = {
    'Fully-avg':    flows_louisville_fullyavg,
    'Mixed-KDE-4':  flows_louisville_mixed4,
    'Mixed-KDE-19': flows_louisville_mixed19
}

In [220]:
flows_rmses_louisville = get_flow_estimation_rmse(df_louisville_bookings,
                                                  flows_gene_louisville)

In [221]:
flows_rmses_louisville

{'In-flow': {'Fully-avg': 2.613591231655728,
  'Mixed-KDE-4': 3.30886076581088,
  'Mixed-KDE-19': 3.292170353878819},
 'Out-flow': {'Fully-avg': 3.4042255476280374,
  'Mixed-KDE-4': 4.213082720934414,
  'Mixed-KDE-19': 4.12692434857142}}

### Austin

In [222]:
data_austin = get_hourly_zone_flows('Austin', True)

flows_generator_austin = data_austin[0]
df_austin_bookings     = data_austin[1]

In [223]:
flows_austin_fullyavg = flows_generator_austin(True)

In [224]:
flows_austin_mixed4   = flows_generator_austin(False, 4)

In [225]:
flows_austin_mixed19  = flows_generator_austin(False, 19)

In [226]:
flows_gene_austin = {
    'Fully-avg':    flows_austin_fullyavg,
    'Mixed-KDE-4':  flows_austin_mixed4,
    'Mixed-KDE-19': flows_austin_mixed19
}

In [227]:
flows_rmses_austin = get_flow_estimation_rmse(df_austin_bookings,
                                              flows_gene_austin)

In [228]:
flows_rmses_austin

{'In-flow': {'Fully-avg': 20.29762184756747,
  'Mixed-KDE-4': 26.603687580221788,
  'Mixed-KDE-19': 25.362332599437188},
 'Out-flow': {'Fully-avg': 21.089393447525715,
  'Mixed-KDE-4': 27.589706566697444,
  'Mixed-KDE-19': 25.755512707995766}}