# Uber Driver Data Analysis

(New version)

Works with SAR and Portal data.

Note that this notebook is not final, and more documentation is being added.

#### [Optional] Installing the required libraries with pip

In [1]:
! pip install numpy pandas portion



In [2]:
import datetime
import glob
import os
from pathlib import Path
from typing import Callable, Optional, Tuple

import numpy as np
import pandas as pd
import portion as P

In [3]:
class Table(pd.DataFrame):
    """Some Python magic to be able to type hint things like
    df: Table['begin': datetime, 'end': datetime]"""

    def __class_getitem__(cls, _):
        return list[str]

In [4]:
Timestamp = datetime.datetime

In [5]:
PeriodTable = Table['begin': Timestamp, 'end': Timestamp]

In [6]:
data_folder = Path(os.getcwd()) / 'data'

In [7]:
def find_file(pattern: str, folder: Path) -> Path:
    """
    Looks for a file matching the given pattern inside the given folder
    :param pattern: a glob file pattern
    :param folder: a path leading to a folder where the pattern should be applied
    :return: a path to the first file matching the pattern, or a value error if none.
    """
    matches = glob.glob(pattern, root_dir=folder)
    if len(matches) == 0:
        raise ValueError(f'Could not find file {pattern} in {folder}')
    elif len(matches) > 1:
        print(f'Found many matches for {pattern} in {folder}. Using the first.')
    return folder / matches[0]

In [8]:
def find_table(pattern: str, folder: Path, usecols: Optional[list[str]] = None) -> Table:
    """
    Looks for a csv-like file whose name matches the pattern and is located inside folder, using only the specified columns.
    If found, reads it and assigns the file name as a column.
    :param pattern: a glob file pattern
    :param folder: a path leading to a folder where the pattern should be applied
    :param usecols: an optional list of columns in the
    :return: a Table (a.k.a. pandas DataFrame) having the specified columns and the file name as a column
    """
    file = find_file(pattern, folder)
    table = pd.read_csv(file, usecols=usecols)
    if usecols is not None:
        table = table[usecols]
    return table.assign(file=file.name)

In [9]:
def find_date_range(pattern: str, folder: Path, date_cols: list[str]) -> (Timestamp, Timestamp):
    """

    :param pattern: a glob file pattern
    :param folder: a path leading to a folder where the pattern should be applied
    :param date_cols: a list of column names that are dates
    :return: the minimum and maxixmum observed dates
    Usage:
    find_date_range('*Driver Trip Status.csv', data_folder / 'Brice' / 'raw' / 'SAR',
                    ['begin_timestamp_local', 'end_timestamp_local'])
    """
    df = find_table(pattern, folder, usecols=date_cols)
    for c in date_cols:
        df[c] = pd.to_datetime(df[c])
    return df[date_cols].min().min(), df[date_cols].max().max()

In [10]:
def df_to_interval(df: PeriodTable) -> P.interval:
    """Converts a DataFrame with columns 'begin' and 'end' into a Portion interval, merging entries that overlap."""
    return P.Interval(*[P.closed(row['begin'], row['end']) for row in df.to_dict('records')])


def interval_to_df(interval: P.interval) -> PeriodTable:
    """Converts a Portion interval into a dataframe with columns 'begin' and 'end'."""
    return pd.DataFrame([{'begin': begin, 'end': end} for _, begin, end, _ in P.to_data(interval)])

In [11]:
def time_tuples_to_periods(
        table: Table['t1': Timestamp, 't2': Timestamp, 't3': Timestamp],
        columns: list[str],
        extra_info: list[Callable[[pd.Series], dict]]
) -> pd.DataFrame:
    """
    Takes a dataframe where each row has N timestamps corresponding to instants of status changes,
    and converts each row into N-1 rows of periods in the corresponding status.

    :param: table: a table having a number N > 1 of time-columns and L of entries.
    :param: columns: a list of n time-column names present in {table}.
    :param: extra_info: a list of functions taking a row of df and outputting a dictionary of additional information. Cannot have keys 'begin' and 'end'.
    :return: periods: a table having L * (N-1) entries, each with a 'begin' and 'end' timestamp and associated information as specified by additional_info.
    Usage:
    df = pd.DataFrame([{'request_ts': '3:47 PM', 'begintrip_ts': '4:00 PM', 'dropoff_ts': '4:13 PM'}])
    columns = ['request_ts', 'begintrip_ts', 'dropoff_ts']
    extra_info = [lambda r: {'status': 'P2'}, lambda r: {'status': 'P3'}]
    time_tuples_to_periods(df, columns, extra_info)
    > begin    end      status
    > 3:47 PM  4:00 PM  P2
    > 4:00 PM  4:13 PM  P3
    """
    assert len(columns) == len(
        extra_info) + 1, f'The length of additional information should correspond to the number of generated periods (N-1).'
    periods = pd.DataFrame(table.apply(
        lambda r: [{'begin': r[b], 'end': r[e], **d(r)} for b, e, d in zip(columns, columns[1:], extra_info)],
        axis=1
    ).explode().to_list())
    for col in ['begin', 'end']:
        periods[col] = pd.to_datetime(periods[col])
    return periods

In [12]:
def make_status_intervals(df: PeriodTable) -> dict[str, P.interval]:
    return {s: df_to_interval(df[df.status == s]) for s in df.status.unique()}

In [13]:
def interval_merge_logic(lt: dict[str, P.interval], oo: dict[str, P.interval]) -> dict[str, P.interval]:
    P3 = lt['P3'] | oo['P3']
    P2 = (lt['P2'] | oo['P2']) - P3
    P1 = oo['P1'] - (P2 | P3)
    return {'P1': P1, 'P2': P2, 'P3': P3}

In [14]:
def main_interval_logic(lt: dict[str, P.interval], oo: dict[str, P.interval], P0_has_priority=False) -> Table:
    """Consider the following ordering of priorities: P3 > P2 > P1. P0_has_priority determines if P0 is on the left or right of inequalities."""
    if P0_has_priority:
        for d in [lt, oo]:
            for k in d.keys():
                if k != 'P0':
                    d[k] = d[k] - oo['P0']
    lt['P2'] = lt['P2'] - lt['P3']
    oo['P2'] = oo['P2'] - oo['P3']
    oo['P1'] = oo['P1'] - (oo['P2'] | oo['P3'])
    intervals = interval_merge_logic(lt, oo)
    return pd.concat([interval_to_df(i).assign(status=f'{s} consistent') for s, i in intervals.items()])

In [15]:
def merge_overlapping_intervals(table: PeriodTable, agg_dict: Optional[dict] = None) -> PeriodTable:
    """
    Groups the given table by status, then sorts all intervals by begin datetime and merges overlapping entries "efficiently".
    :param table: the table whose intervals should be merged
    :param agg_dict: maps each row to the operation that should be applied to combine interval attributes when merging them
    :return: a table with merged intervals per status
    """
    agg_dict = agg_dict or {c: 'sum' for c in table.columns if c not in ['begin', 'end', 'status']}

    def handle_groups(table: PeriodTable) -> PeriodTable:
        intervals = table[['begin', 'end']].sort_values(['begin', 'end'])
        group = 0
        group_end = intervals.end.iloc[0]

        def find_group(row):
            nonlocal group, group_end
            if row.begin <= group_end:
                group_end = max(row.end, group_end)
            else:
                group += 1
                group_end = row.end
            return group

        groups = intervals.apply(find_group, axis=1)
        return table.groupby(groups).agg({'begin': 'min', 'end': 'max', **agg_dict}).reset_index(drop=True)

    return table.groupby('status').apply(handle_groups).reset_index(level=1, drop=True).reset_index()

In [16]:
def mile2km(n_miles: float) -> float:
    return n_miles * 1.609344

#### SAR preprocessing logic

In [17]:
def load_lifetime_trips(folder: Path, pattern: str = '*Driver Lifetime Trips.csv') -> PeriodTable:
    table = find_table(pattern, folder,
                       ['request_timestamp_local', 'begintrip_timestamp_local', 'dropoff_timestamp_local', 'status',
                        'request_to_begin_distance_miles', 'trip_distance_miles', 'original_fare_local'])
    table = table[table.status.isin(['completed', 'fare_split'])].drop(columns='status')
    table.replace({r'\N': np.nan}, inplace=True)
    for col in ['request_to_begin_distance_miles', 'original_fare_local']:
        table[col] = table[col].astype(float)
    table = time_tuples_to_periods(table,
                                   columns=['request_timestamp_local', 'begintrip_timestamp_local',
                                            'dropoff_timestamp_local'],
                                   extra_info=[lambda r: {'status': 'P2',
                                                          'distance_km': mile2km(r['request_to_begin_distance_miles']),
                                                          'file': r['file']},
                                               lambda r: {'status': 'P3',
                                                          'distance_km': mile2km(r['trip_distance_miles']),
                                                          'uber_paid': r['original_fare_local'],
                                                          'file': r['file']}])
    return table

In [18]:
def load_on_off(folder: Path, pattern: str = '*Driver Online Offline.csv') -> PeriodTable:
    table = find_table(pattern, folder,
                       ['begin_timestamp_local', 'end_timestamp_local', 'earner_state',
                        'begin_lat', 'begin_lng', 'end_lat', 'end_lng'])
    table.rename(columns={'begin_timestamp_local': 'begin', 'end_timestamp_local': 'end',
                          'earner_state': 'status'}, inplace=True)
    table = table.replace({r'\N': np.nan, 'ontrip': 'P3', 'enroute': 'P2', 'open': 'P1', 'offline': 'P0'})
    for col in ['begin', 'end']:
        table[col] = pd.to_datetime(table[col])
    return table.dropna()

In [19]:
def load_dispatches(path: str | Path) -> Table:
    # TODO (not finished)
    table = pd.read_csv(path, usecols=['start_timestamp_local', 'end_timestamp_local', 'dispatches', 'completed_trips',
                                       'accepts', 'rejects', 'expireds', 'driver_cancellations', 'rider_cancellations',
                                       'minutes_online', 'minutes_on_trip', 'trip_fares'])
    return table

In [20]:
def load_trip_status(folder: Path, pattern: str = '*Driver Trip Status.csv') -> PeriodTable:
    table = find_table(pattern, folder, ['begin_timestamp_local', 'end_timestamp_local', 'status', 'end_reason'])
    table.columns = ['begin', 'end', *table.columns[2:]]
    for col in ['begin', 'end']:
        table[col] = pd.to_datetime(table[col])
    return table

In [21]:
def load_distance_traveled(folder: Path) -> pd.DataFrame:
    pass  # TODO

In [22]:
def load_sar(folder: Path) -> Table:
    lifetime_trips = load_lifetime_trips(folder)
    on_off = load_on_off(folder)
    return pd.concat([lifetime_trips, on_off]).reset_index(drop=True)

#### Portal preprocessing

In [23]:
def load_portal(folder: Path):
    df = find_table('*driver_lifetime_trips*.csv', folder,
                    ['Status', 'Local Request Timestamp', 'Begin Trip Local Timestamp', 'Local Dropoff Timestamp',
                     'Trip Distance (miles)', 'Duration (Seconds)', 'Local Original Fare'])
    df = df[df['Status'].isin(['completed', 'fare_split'])]
    df = time_tuples_to_periods(df, columns=['Local Request Timestamp', 'Begin Trip Local Timestamp',
                                             'Local Dropoff Timestamp'],
                                extra_info=[lambda r: {'status': 'P2'},
                                            lambda r: {'status': 'P3',
                                                       'distance_km': mile2km(r['Trip Distance (miles)']),
                                                       'uber_paid': r['Local Original Fare']}])
    return df

#### Guillaume-specific logic

In [24]:
def guillaume_filtering_logic(
        daily: Table,
        percentage_df_path: Optional[str | Path] = data_folder / 'Guillaume' / 'percentage.csv'
) -> Table:
    # First, weight P1 times based on the percentage that Guillaume was working for Uber on that month
    percentage = pd.read_csv(percentage_df_path)
    percentage['Uber'] /= 100
    P1 = daily.loc[daily.status.str.contains('P1')].copy()
    for i, row in percentage.iterrows():
        if row['Uber'] == 0 and P1[(P1.date.dt.year == row.year) & (
                P1.date.dt.month == row.month)].duration.sum() > datetime.timedelta(0):
            print(
                f'bad specification for {row.year}/{row.month}: activity found even though specified percentage was 0')
        P1['duration'] = np.where(
            (P1.date.dt.year == row.year) & (P1.date.dt.month == row.month),
            P1['duration'] * row['Uber'],
            P1['duration'])
    # Second, remove all morning weekday entries when Guillaume was working for IMAD, except for the specific dates below
    dates_to_keep = [datetime.date(2020, 11, 26),
                     *pd.date_range(datetime.date(2020, 12, 21), datetime.date(2020, 12, 25)).values,
                     *pd.date_range(datetime.date(2021, 2, 1), datetime.date(2021, 2, 12)).values,
                     *pd.date_range(datetime.date(2021, 8, 16), datetime.date(2021, 8, 28)).values,
                     *pd.date_range(datetime.date(2021, 9, 20), datetime.date(2021, 10, 3)).values,
                     *pd.date_range(datetime.date(2021, 11, 25), datetime.date(2021, 12, 12)).values,
                     *pd.date_range(datetime.date(2022, 4, 25), datetime.date(2022, 5, 13)).values]
    P1.drop(P1[(P1.time_of_day == 'AM') &
               (P1.day_type == 'week day') &
               ~P1.date.apply(lambda d: d.date()).isin(dates_to_keep)].index,
            inplace=True)
    return P1

In [25]:
def split_AM_PM(table: PeriodTable) -> PeriodTable:
    """
    Splits intervals spanning many days or many morning/afternoon periods in two.
    If the interval is associated to numerical values (like distance or money), these values are
    transferred to the new intervals but are weighted according to the new intervals' duration.
    :param table: the table whose intervals should be split
    :return: a table with no intervals spanning over AM and PM
    """

    def scaled_interval(begin: Timestamp, end: Timestamp, attributes: dict, og_duration) -> dict:
        """Creates an interval (a dict with begin, end and other attributes) given the original attributes and the original """
        return {'begin': begin, 'end': end,
                **{k: v * (end - begin) / og_duration if isinstance(v, float) else v for k, v in attributes.items()}}

    def rec(begin: Timestamp, end: Timestamp, **rest) -> list[dict]:
        rows = []
        og_duration = end - begin
        # Check if the interval spans many days, and split into as many days as it spans
        if begin.day != end.day:
            rows.append(scaled_interval(begin, begin.replace(hour=23, minute=59, second=59), rest, og_duration))
            for days in range(end.day - begin.day - 1):
                mid = begin + datetime.timedelta(days=days)
                rows.append(scaled_interval(mid.replace(hour=0, minute=0, second=0),
                                            mid.replace(hour=23, minute=59, second=59), rest, og_duration))
            rows.append(scaled_interval(end.replace(hour=0, minute=0, second=0), end, rest, og_duration))
            # Call itself recursively to split resulting days into AM/PM
            return [e for r in rows for e in rec(**r)]
        # Check if the interval spans many AM/PM periods
        elif begin.hour < 12 <= end.hour:
            rows.append(scaled_interval(begin, end.replace(hour=11, minute=59, second=59), rest, og_duration))
            rows.append(scaled_interval(end.replace(hour=12, minute=0, second=0), end, rest, og_duration))
            return rows
        else:
            return [{'begin': begin, 'end': end, **rest}]

    return pd.DataFrame([e for d in table.to_dict('records') for e in rec(**d)])

In [26]:
def find_week_limits(date: Timestamp) -> str:
    week_start = date - datetime.timedelta(days=date.weekday())
    week_end = week_start + datetime.timedelta(days=6)
    return f'{week_start.date()} to {week_end.date()}'.replace('-', '/').replace('to', '-')

### Running the pipeline

In [27]:
def pipeline(
        table: PeriodTable,
        interval_logic: Optional[Callable[[PeriodTable], PeriodTable]] = None,
        splitting_logic: Optional[Callable[[PeriodTable], PeriodTable]] = None,
        filtering_logic: Optional[Callable[[PeriodTable], PeriodTable]] = None,
        name: str = 'analysis',
        facets: Tuple[str, ...] = ('duration', 'distance_km', 'uber_paid'),
        save_at: Path = data_folder / 'results'
) -> Tuple[pd.DataFrame, ...]:
    # Apply interval logic if specified
    if interval_logic is not None:
        table = interval_logic(table)

    # Make sure that each interval is contained within a single 12-hour time period (AM or PM) by splitting when necessary
    # Note: this logic is specific to each driver
    if splitting_logic is not None:
        table = splitting_logic(table)

    # Now we can compute all of these time properties since they will be the same for begin and end
    table['date'] = table.end.dt.date
    table['day_of_week'] = table.end.dt.day_name()
    table['day_type'] = (table.end.dt.weekday < 5).replace({True: 'week day', False: 'weekend'})
    table['time_of_day'] = (table.end.dt.hour < 12).replace({True: 'AM', False: 'PM'})
    table['week'] = table.end.apply(find_week_limits)
    table['month'] = table.end.apply(lambda d: f'{d.month:02d}. {d.month_name()}')
    table['year'] = table.end.dt.year
    table['duration'] = table.end - table.begin

    # Group entries by day and time period
    daily = table.groupby(['date', 'year', 'month', 'week', 'day_of_week',
                           'day_type', 'time_of_day', 'status']).agg({f: 'sum' for f in facets}).reset_index()
    daily['date'] = pd.to_datetime(daily.date)

    # Filter the data if a filtering function is specified
    if filtering_logic is not None:
        filtered = filtering_logic(daily)
        filtered['status'] = filtered.status + '|filtered'
        daily = pd.concat([daily, filtered])

    monthly = daily.groupby(['year', 'month', 'status']).agg({f: 'sum' for f in facets}).reset_index()
    weekly = daily.groupby(['week', 'status']).agg({f: 'sum' for f in facets}).reset_index()
    yearly = daily.groupby(['year', 'status']).agg({f: 'sum' for f in facets}).reset_index()
    total = yearly.groupby(['status']).agg({f: 'sum' for f in facets}).reset_index()

    # Writing tables to disk
    save_at.mkdir(parents=True, exist_ok=True)
    table.to_csv(save_at / f'{name}_original_splitted.csv', index=False)
    daily.to_csv(save_at / f'{name}_daily.csv', index=False)
    weekly.to_csv(save_at / f'{name}_weekly.csv', index=False)
    monthly.to_csv(save_at / f'{name}_monthly.csv', index=False)
    yearly.to_csv(save_at / f'{name}_yearly.csv', index=False)
    total.to_csv(save_at / f'{name}_total.csv', index=False)

    return daily, weekly, yearly, total

You can find SAR samples in the KDrive at:
- old: `hestiaai /Common documents/HestiaLabs/PDIO- Data/Driver Data/Guillaume data/Lemoine Guillaume/Lemoine_SAR_06.08.2022.zip`.
- new: `PersonalData.IO /Lemoine_12102022-20221110T164542Z-001.zip`

In [28]:
_ = pipeline(load_sar(data_folder / 'Guillaume' / 'raw' / 'SAR (new)'),
             interval_logic=lambda t: merge_overlapping_intervals(t, {c: 'sum' for c in ['uber_paid', 'distance_km']}),
             splitting_logic=split_AM_PM,
             filtering_logic=guillaume_filtering_logic,
             name='sar',
             save_at=data_folder / 'Guillaume' / 'results')

In [29]:
_ = pipeline(load_sar(data_folder / 'Kidane' / 'raw' / 'SAR'),
             interval_logic=lambda t: merge_overlapping_intervals(t, {c: 'sum' for c in ['uber_paid', 'distance_km']}),
             name='sar',
             save_at=data_folder / 'Kidane' / 'results')

In [30]:
_ = pipeline(load_sar(data_folder / 'Brice' / 'raw' / 'SAR'),
             interval_logic=lambda t: merge_overlapping_intervals(t, {c: 'sum' for c in ['uber_paid', 'distance_km']}),
             name='sar',
             save_at=data_folder / 'Brice' / 'results')

A Portal sample can be found on our KDrive at `hestiaai /Common documents/HestiaLabs/PDIO- Data/Driver Data/Guillaume data/Lemoine Guillaume/202207/Uber Data F0699B53.zip`

In [31]:
_ = pipeline(load_portal(data_folder / 'Guillaume' / 'raw' / 'Portal' / 'Driver'),
             name='portal',
             splitting_logic=split_AM_PM,
             filtering_logic=guillaume_filtering_logic,
             save_at=data_folder / 'Guillaume' / 'results')

In [32]:
_ = pipeline(load_portal(data_folder / 'Aria' / 'raw' / 'Portal' / 'Driver'),
             name='portal',
             splitting_logic=split_AM_PM,
             save_at=data_folder / 'Aria' / 'results')