# Taxi trips ~ feature engineering

## Peeking at the data

In [2]:
!ls ~/river_data/Taxis


train.csv


In [3]:
!head ~/river_data/Taxis/train.csv


id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
id0190469,2,2016-01-01 00:00:17,2016-01-01 00:14:26,5,-73.98174285888672,40.71915817260742,-73.93882751464845,40.82918167114258,N,849
id1665586,1,2016-01-01 00:00:53,2016-01-01 00:22:27,1,-73.98508453369139,40.74716567993164,-73.95803833007811,40.71749114990234,N,1294
id1210365,2,2016-01-01 00:01:01,2016-01-01 00:07:49,5,-73.9652786254883,40.80104064941406,-73.94747924804686,40.81517028808594,N,408
id3888279,1,2016-01-01 00:01:14,2016-01-01 00:05:54,1,-73.98229217529298,40.751331329345696,-73.99134063720702,40.75033950805664,N,280
id0924227,1,2016-01-01 00:01:20,2016-01-01 00:13:36,1,-73.97010803222656,40.75979995727539,-73.9893569946289,40.742988586425774,N,736
id2294362,2,2016-01-01 00:01:33,2016-01-01 00:13:25,1,-73.98499298095702,40.77389144897461,-73.93649291992188,40.84777069091797,N,712
id1078247,2,2016-01-01 00:01:37,

In [1]:
import duckdb

db = duckdb.connect('taxi-trips.db')
db.execute('''
CREATE OR REPLACE TABLE trips AS (
    SELECT *
    FROM read_csv_auto('~/river_data/Taxis/train.csv')
    WHERE 40.64150619506836 < pickup_latitude AND pickup_latitude < 40.84248242950439
    AND -74.01721954345702 < pickup_longitude AND pickup_longitude < -73.7766876220703
)
''')


<duckdb.duckdb.DuckDBPyConnection at 0x11e7ccbf0>

In [5]:
db.table('trips').show()


┌───────────┬───────────┬─────────────────────┬───┬────────────────────┬────────────────────┬───────────────┐
│    id     │ vendor_id │   pickup_datetime   │ … │  dropoff_latitude  │ store_and_fwd_flag │ trip_duration │
│  varchar  │   int64   │      timestamp      │   │       double       │      varchar       │     int64     │
├───────────┼───────────┼─────────────────────┼───┼────────────────────┼────────────────────┼───────────────┤
│ id0190469 │         2 │ 2016-01-01 00:00:17 │ … │  40.82918167114258 │ N                  │           849 │
│ id1665586 │         1 │ 2016-01-01 00:00:53 │ … │  40.71749114990234 │ N                  │          1294 │
│ id1210365 │         2 │ 2016-01-01 00:01:01 │ … │  40.81517028808594 │ N                  │           408 │
│ id3888279 │         1 │ 2016-01-01 00:01:14 │ … │  40.75033950805664 │ N                  │           280 │
│ id0924227 │         1 │ 2016-01-01 00:01:20 │ … │ 40.742988586425774 │ N                  │           736 │
│ id229436

In [147]:
query = '''
SELECT
    MIN(pickup_datetime),
    MAX(pickup_datetime),
    COUNT(*),
    AVG(trip_duration),
    MAX(trip_duration),
    MIN(trip_duration),
    QUANTILE(trip_duration, 0.99)
FROM trips
'''
job = db.execute(query)
job.fetch_df()


Unnamed: 0,min(pickup_datetime),max(pickup_datetime),count_star(),avg(trip_duration),max(trip_duration),min(trip_duration),"quantile(trip_duration, 0.99)"
0,2016-01-01 00:00:17,2016-06-30 23:59:39,1453068,957.844934,3526282,1,3423


Let's verify the data is balanced across months.

In [148]:
query = '''
SELECT
    EXTRACT(MONTH FROM pickup_datetime),
    COUNT(*)
FROM trips
GROUP BY 1
'''
job = db.execute(query)
job.fetch_df()


Unnamed: 0,"main.date_part('month', pickup_datetime)",count_star()
0,1,228849
1,2,237445
2,3,255235
3,4,250674
4,5,247405
5,6,233460


## Define the target

In [153]:
db.execute('''
CREATE OR REPLACE TABLE targets AS (
    SELECT
        id,
        IF(trip_duration > 3600, 3600, trip_duration) AS trip_duration
    FROM trips
)
''')
targets = db.execute('SELECT * FROM targets').fetch_df()
targets.head()


Unnamed: 0,id,trip_duration
0,id0190469,849
1,id1665586,1294
2,id1210365,408
3,id3888279,280
4,id0924227,736


## Distance features

We're looking to predict a duration. The most obvious feature is the distance between the pickup and dropoff locations. We'll use the Euclidean distance, which is the distance as the crow flies. We'll also add the Manhattan distance, which is the distance between two points if you can only travel on a rectangular grid.

In [158]:
db.execute('''
CREATE OR REPLACE VIEW distances AS (
    SELECT
        id,
        (
            ABS(pickup_longitude - dropoff_longitude) +
            ABS(pickup_latitude - dropoff_latitude)
        ) AS l1_distance,
        POW(
            POW(pickup_longitude - dropoff_longitude, 2) +
            POW(pickup_latitude - dropoff_latitude, 2),
            0.5
        ) AS l2_distance
    FROM trips
)
''')
distances = db.execute('SELECT * FROM distances').fetch_df()
distances.head()


Unnamed: 0,id,l1_distance,l2_distance
0,id0190469,0.152939,0.118097
1,id1665586,0.056721,0.040151
2,id1210365,0.031929,0.022726
3,id3888279,0.01004,0.009103
4,id0924227,0.03606,0.025557


## Basic model to evaluate feature importance

In [159]:
dataset = (
    db.execute('''
    SELECT
        trips.id,
        trips.pickup_datetime,
        targets.trip_duration,
        distances.* EXCLUDE (id)
    FROM trips
    LEFT JOIN targets USING (id)
    LEFT JOIN distances USING (id)
    ''')
    .fetch_df()
)
dataset.head()


Unnamed: 0,id,pickup_datetime,trip_duration,l1_distance,l2_distance
0,id0190469,2016-01-01 00:00:17,849,0.152939,0.118097
1,id1665586,2016-01-01 00:00:53,1294,0.056721,0.040151
2,id1210365,2016-01-01 00:01:01,408,0.031929,0.022726
3,id3888279,2016-01-01 00:01:14,280,0.01004,0.009103
4,id0924227,2016-01-01 00:01:20,736,0.03606,0.025557


In [160]:
import datetime as dt
import lightgbm as lgb

def train_and_test(dataset):

    cv = model_selection.TimeSeriesSplit(n_splits=5)
    dataset = dataset.sort_values('pickup_datetime')
    X = dataset.drop(columns=['id', 'trip_duration', 'pickup_datetime'])
    y = dataset['trip_duration']

    for col in X.columns[X.dtypes == 'object']:
        X[col] = X[col].astype('category')

    model = lgb.LGBMRegressor(
        n_estimators=30,
        max_depth=5,
        random_state=42
    )

    scores = model_selection.cross_val_score(
        model, X, y, scoring='neg_mean_absolute_error', cv=cv,
    )
    mae = -scores.mean()
    print(f'[MAE] {dt.timedelta(seconds=mae)}')

    model.fit(X, y)
    feature_importances = sorted(
        zip(X.columns, model.feature_importances_),
        key=lambda x: x[1],
        reverse=True
    )
    for feauture_name, importance in feature_importances:
        print(f'{feauture_name}: {importance}')


In [161]:
train_and_test(dataset)


[MAE] 0:04:22.751583
l2_distance: 543
l1_distance: 357


## Time features

In [162]:
db.execute('''
CREATE OR REPLACE VIEW time_features AS (
    SELECT
        id,
        EXTRACT(HOUR FROM pickup_datetime) AS pickup_hour,
        EXTRACT(WEEKDAY FROM pickup_datetime) AS pickup_weekday,
    FROM trips
)
''')

dataset = (
    db.execute('''
    SELECT
        trips.id,
        trips.pickup_datetime,
        targets.trip_duration,
        distances.* EXCLUDE (id),
        time_features.* EXCLUDE (id)
    FROM trips
    LEFT JOIN targets USING (id)
    LEFT JOIN distances USING (id)
    LEFT JOIN time_features USING (id)
    ''')
    .fetch_df()
)

train_and_test(dataset)


[MAE] 0:03:56.103040
pickup_hour: 430
l2_distance: 223
pickup_weekday: 204
l1_distance: 43


Does changing the type of these features help?

In [163]:
db.execute('''
CREATE OR REPLACE VIEW time_features AS (
    SELECT
        id,
        CAST(EXTRACT(HOUR FROM pickup_datetime) AS STRING) AS hour,
        CAST(EXTRACT(WEEKDAY FROM pickup_datetime) AS STRING) AS weekday,
    FROM trips
)
''')

dataset = (
    db.execute('''
    SELECT
        trips.id,
        trips.pickup_datetime,
        targets.trip_duration,
        distances.* EXCLUDE (id),
        time_features.* EXCLUDE (id)
    FROM trips
    LEFT JOIN targets USING (id)
    LEFT JOIN distances USING (id)
    LEFT JOIN time_features USING (id)
    ''')
    .fetch_df()
)

train_and_test(dataset)


[MAE] 0:03:52.233418
hour: 360
l2_distance: 267
weekday: 224
l1_distance: 49


Answer: a little bit.

## Average duration per day of week and hour

In [164]:
db.execute('''
CREATE OR REPLACE VIEW time_aggs AS (

    WITH per_hour AS (
        SELECT
            EXTRACT(HOUR FROM pickup_datetime) AS hour,
            AVG(trip_duration) AS avg_duration_per_hour
        FROM trips
        GROUP BY 1
    ),

    per_weekday AS (
        SELECT
            EXTRACT(WEEKDAY FROM pickup_datetime) AS weekday,
            AVG(trip_duration) AS avg_duration_per_weekday
        FROM trips
        GROUP BY 1
    ),

    overall AS (
        SELECT
            AVG(trip_duration) AS avg_duration
        FROM trips
    )

    SELECT
        trips.id,
        per_hour.avg_duration_per_hour,
        per_weekday.avg_duration_per_weekday,
        overall.avg_duration
    FROM trips
    LEFT JOIN per_hour ON
        EXTRACT(HOUR FROM pickup_datetime) = per_hour.hour
    LEFT JOIN per_weekday ON
        EXTRACT(WEEKDAY FROM pickup_datetime) = per_weekday.weekday
    LEFT JOIN overall ON TRUE

)
''')
db.execute('SELECT * FROM time_aggs').fetch_df().head()


Unnamed: 0,id,avg_duration_per_hour,avg_duration_per_weekday,avg_duration
0,id0190469,936.750604,988.769519,957.844934
1,id1665586,936.750604,988.769519,957.844934
2,id1210365,936.750604,988.769519,957.844934
3,id3888279,936.750604,988.769519,957.844934
4,id0924227,936.750604,988.769519,957.844934


In [165]:
dataset = (
    db.execute('''
    SELECT
        trips.id,
        trips.pickup_datetime,
        targets.trip_duration,
        distances.* EXCLUDE (id),
        time_features.* EXCLUDE (id),
        time_aggs.* EXCLUDE (id)
    FROM trips
    LEFT JOIN targets USING (id)
    LEFT JOIN distances USING (id)
    LEFT JOIN time_features USING (id)
    LEFT JOIN time_aggs USING (id)

    ''')
    .fetch_df()
)
dataset.head()


Unnamed: 0,id,pickup_datetime,trip_duration,l1_distance,l2_distance,hour,weekday,avg_duration_per_hour,avg_duration_per_weekday,avg_duration
0,id0190469,2016-01-01 00:00:17,849,0.152939,0.118097,0,5,936.750604,988.769519,957.844934
1,id1665586,2016-01-01 00:00:53,1294,0.056721,0.040151,0,5,936.750604,988.769519,957.844934
2,id1210365,2016-01-01 00:01:01,408,0.031929,0.022726,0,5,936.750604,988.769519,957.844934
3,id3888279,2016-01-01 00:01:14,280,0.01004,0.009103,0,5,936.750604,988.769519,957.844934
4,id0924227,2016-01-01 00:01:20,736,0.03606,0.025557,0,5,936.750604,988.769519,957.844934


In [166]:
train_and_test(dataset)


[MAE] 0:03:52.097779
hour: 349
l2_distance: 264
weekday: 154
avg_duration_per_weekday: 68
l1_distance: 50
avg_duration_per_hour: 15
avg_duration: 0


These features don't bring much because they're convey the same information as the day of week and hour features.

Before we go further, there's a big issue to spot: we're leaking the target variable into the features. We're using the average duration per day of week and hour to predict the duration. But the duration is used to compute the average duration per day of week and hour. That's cheating. We need to fix that.

## Rolling average duration per day of week and hour

In [167]:
db.execute('''
CREATE OR REPLACE VIEW rolling_time_aggs AS (

    SELECT
        trips.id,

        AVG(trip_duration) OVER (
            PARTITION BY EXTRACT(HOUR FROM pickup_datetime)
            ORDER BY pickup_datetime
            ROWS BETWEEN UNBOUNDED PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_per_hour,

        AVG(trip_duration) OVER (
            PARTITION BY EXTRACT(WEEKDAY FROM pickup_datetime)
            ORDER BY pickup_datetime
            ROWS BETWEEN UNBOUNDED PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_per_weekday,

        AVG(trip_duration) OVER (
            ORDER BY pickup_datetime
            ROWS BETWEEN UNBOUNDED PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration

    FROM trips

)
''')
db.execute('SELECT * FROM rolling_time_aggs').fetch_df().head()


Unnamed: 0,id,avg_duration_per_hour,avg_duration_per_weekday,avg_duration
0,id0190469,,,
1,id1665586,849.0,849.0,849.0
2,id1210365,1071.5,1071.5,1071.5
3,id3888279,850.333333,850.333333,850.333333
4,id0924227,707.75,707.75,707.75


In [168]:
dataset = (
    db.execute('''
    SELECT
        trips.id,
        trips.pickup_datetime,
        targets.trip_duration,
        distances.* EXCLUDE (id),
        time_features.* EXCLUDE (id)
    FROM trips
    LEFT JOIN targets USING (id)
    LEFT JOIN distances USING (id)
    LEFT JOIN time_features USING (id)
    LEFT JOIN rolling_time_aggs USING (id)
    ''')
    .fetch_df()
)

train_and_test(dataset)


[MAE] 0:03:52.060319
hour: 359
l2_distance: 269
weekday: 228
l1_distance: 44


The performance got slightly worse, because it's more realistic now. We're not cheating anymore: we're only using information that happens before each row.

The nice thing is that we now have a practical way to manipulate windows, using `OVER`.

## Looking at recent windows

In [170]:
db.execute('''
CREATE OR REPLACE VIEW recent_time_aggs AS (

    SELECT
        trips.id,

        AVG(trip_duration) OVER (
            PARTITION BY EXTRACT(HOUR FROM pickup_datetime)
            ORDER BY pickup_datetime
            ROWS BETWEEN 100 PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_per_hour_recent,

        AVG(trip_duration) OVER (
            PARTITION BY EXTRACT(WEEKDAY FROM pickup_datetime)
            ORDER BY pickup_datetime
            ROWS BETWEEN 100 PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_per_weekday_recent,

        AVG(trip_duration) OVER (
            ORDER BY pickup_datetime
            ROWS BETWEEN 100 PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_recent

    FROM trips

)
''')

dataset = (
    db.execute('''
    SELECT
        trips.id,
        trips.pickup_datetime,
        targets.trip_duration,
        distances.* EXCLUDE (id),
        time_features.* EXCLUDE (id),
        rolling_time_aggs.* EXCLUDE (id),
        recent_time_aggs.* EXCLUDE (id)
    FROM trips
    LEFT JOIN targets USING (id)
    LEFT JOIN distances USING (id)
    LEFT JOIN time_features USING (id)
    LEFT JOIN rolling_time_aggs USING (id)
    LEFT JOIN recent_time_aggs USING (id)
    ''')
    .fetch_df()
)

train_and_test(dataset)


[MAE] 0:03:52.018493
hour: 307
l2_distance: 241
weekday: 115
avg_duration_recent: 66
avg_duration_per_weekday_recent: 54
avg_duration_per_weekday: 36
l1_distance: 33
avg_duration_per_hour_recent: 28
avg_duration: 18
avg_duration_per_hour: 2


## Including geography

The averages we calculate are over the whole dataset. But we know that the duration depends on the pickup and dropoff locations. Let's include the geography in our averages.

In [171]:
grid = db.execute('''
CREATE OR REPLACE TABLE cells AS

    WITH min_max_coords AS (
        SELECT
            MIN(pickup_latitude) AS min_lat,
            MAX(pickup_latitude) AS max_lat,
            MIN(pickup_longitude) AS min_lon,
            MAX(pickup_longitude) AS max_lon
        FROM trips
    ),

    grid AS (
        SELECT x, y
        FROM (
            SELECT (ROW_NUMBER() OVER () - 1) AS x
            FROM main.range(1, 21)
        )
        CROSS JOIN (
            SELECT (ROW_NUMBER() OVER () - 1) AS y
            FROM main.range(1, 21)
        )
    )

    SELECT
        FORMAT('{}-{}', x, y) AS cell_id,
        min_lat + (y * lat_interval) AS cell_min_lat,
        min_lat + ((y + 1) * lat_interval) AS cell_max_lat,
        min_lon + (x * lon_interval) AS cell_min_lon,
        min_lon + ((x + 1) * lon_interval) AS cell_max_lon
    FROM grid
    CROSS JOIN (
        SELECT
            *,
            (max_lat - min_lat) / 20 AS lat_interval,
            (max_lon - min_lon) / 20 AS lon_interval
        FROM min_max_coords
    )
''')

cells = db.execute('SELECT * FROM cells').fetch_df()
cells.head()


Unnamed: 0,cell_id,cell_min_lat,cell_max_lat,cell_min_lon,cell_max_lon
0,0-0,40.64151,40.651558,-74.017212,-74.005186
1,1-0,40.64151,40.651558,-74.005186,-73.99316
2,2-0,40.64151,40.651558,-73.99316,-73.981134
3,3-0,40.64151,40.651558,-73.981134,-73.969109
4,4-0,40.64151,40.651558,-73.969109,-73.957083


In [172]:
import folium
import json

min_lat = cells['cell_min_lat'].min()
max_lat = cells['cell_max_lat'].max()
min_lon = cells['cell_min_lon'].min()
max_lon = cells['cell_max_lon'].max()
m = folium.Map(location=[(min_lat + max_lat) / 2, (min_lon + max_lon) / 2], zoom_start=11)

folium.GeoJson(
    data={
        "type": "FeatureCollection",
        "features": [
            {
                "type": "Feature",
                "properties": {
                    "name": f"Cell {cell['cell_id']}"
                },
                "geometry": {
                    "type": "Polygon",
                    "coordinates": [
                        [
                            [cell['cell_min_lon'], cell['cell_min_lat']],
                            [cell['cell_max_lon'], cell['cell_min_lat']],
                            [cell['cell_max_lon'], cell['cell_max_lat']],
                            [cell['cell_min_lon'], cell['cell_max_lat']],
                        ]
                    ]
                }
            }
            for cell in cells.to_dict(orient='records')
        ]
    },
    name="Grid"
).add_to(m)
m


## Count-based encoding

In [120]:
db.execute('''
CREATE OR REPLACE VIEW geo_aggs AS (
    SELECT
        id,
        pickup_cell_id,
        dropoff_cell_id,

        AVG(trip_duration) OVER (
            PARTITION BY pickup_cell_id, dropoff_cell_id
            ORDER BY pickup_datetime
            ROWS BETWEEN 1000 PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_per_cell_pair,

        AVG(trip_duration) OVER (
            PARTITION BY EXTRACT(HOUR FROM pickup_datetime), pickup_cell_id, dropoff_cell_id
            ORDER BY pickup_datetime
            ROWS BETWEEN 1000 PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_per_hour_per_cell_pair,

        AVG(trip_duration) OVER (
            PARTITION BY EXTRACT(WEEKDAY FROM pickup_datetime), pickup_cell_id, dropoff_cell_id
            ORDER BY pickup_datetime
            ROWS BETWEEN 1000 PRECEDING
            AND 1 PRECEDING
        ) AS avg_duration_per_weekday_per_cell_pair,

    FROM (
        SELECT
            trips.*,
            pickup_cells.cell_id AS pickup_cell_id,
            dropoff_cells.cell_id AS dropoff_cell_id
        FROM trips
        LEFT JOIN cells AS pickup_cells ON
            pickup_latitude BETWEEN pickup_cells.cell_min_lat AND pickup_cells.cell_max_lat
            AND pickup_longitude BETWEEN pickup_cells.cell_min_lon AND pickup_cells.cell_max_lon
        LEFT JOIN cells AS dropoff_cells ON
            dropoff_latitude BETWEEN dropoff_cells.cell_min_lat AND dropoff_cells.cell_max_lat
            AND dropoff_longitude BETWEEN dropoff_cells.cell_min_lon AND dropoff_cells.cell_max_lon
    )
)
''')
db.execute('SELECT * FROM geo_aggs').fetch_df().head()


Unnamed: 0,id,pickup_cell_id,dropoff_cell_id,avg_duration_per_cell_pair,avg_duration_per_hour_per_cell_pair,avg_duration_per_weekday_per_cell_pair
0,id1615460,0-0,2-2,368.0,,
1,id0847078,0-0,3-4,,,
2,id3967876,0-10,1-7,922.0,922.0,
3,id3073001,0-10,1-7,3865.5,874.666667,85330.0
4,id3819150,0-10,1-7,3411.454545,521.0,43521.0


In [179]:
dataset = (
    db.execute('''
    SELECT
        trips.id,
        trips.pickup_datetime,
        targets.trip_duration,
        distances.* EXCLUDE (id),
        time_features.* EXCLUDE (id),
        recent_time_aggs.* EXCLUDE (id),
        geo_aggs.* EXCLUDE (id)
    FROM trips
    LEFT JOIN targets USING (id)
    LEFT JOIN distances USING (id)
    LEFT JOIN time_features USING (id)
    LEFT JOIN rolling_time_aggs USING (id)
    LEFT JOIN recent_time_aggs USING (id)
    LEFT JOIN geo_aggs USING (id)
    ''')
    .fetch_df()
)

train_and_test(dataset)


[MAE] 0:03:33.154628
hour: 188
l2_distance: 145
dropoff_cell_id: 133
avg_duration_per_hour_per_cell_pair: 115
weekday: 91
avg_duration_recent: 54
pickup_cell_id: 43
avg_duration_per_weekday_per_cell_pair: 41
avg_duration_per_weekday_recent: 38
avg_duration_per_cell_pair: 27
avg_duration_per_hour_recent: 13
l1_distance: 12


In [180]:
dataset.groupby(['pickup_cell_id', 'dropoff_cell_id']).size().sort_values(ascending=False).head(10)


pickup_cell_id  dropoff_cell_id
3-11            2-10               11054
2-10            3-11               10205
2-11            3-11                8711
2-10            2-11                8132
2-11            2-10                8112
3-11            3-11                7559
2-10            2-10                7392
                2-9                 6715
3-11            2-11                6675
4-12            3-11                6625
dtype: int64

In [186]:
(
    dataset.query('pickup_cell_id == "3-11" and dropoff_cell_id == "2-10"')
    [['pickup_datetime', 'trip_duration', 'avg_duration_per_cell_pair']]
    .plot(x='pickup_datetime', y=['avg_duration_per_cell_pair'])
)


In [188]:
dataset.to_pickle('../../data/taxi_trip_dataset.pkl')
