# River workshop

## Dictionaries are great

In [2]:
from river import datasets

dataset = datasets.Taxis()
first_sample = next(iter(dataset))

x, y = first_sample
pprint(x)


In [3]:
pprint({
    key: value.__class__.__name__
    for key, value in x.items()
})


In [5]:
import json

print(json.dumps(x, indent=4, default=str))


{
    "vendor_id": "2",
    "pickup_datetime": "2016-01-01 00:00:17",
    "passenger_count": 5,
    "pickup_longitude": -73.98174285888672,
    "pickup_latitude": 40.71915817260742,
    "dropoff_longitude": -73.93882751464845,
    "dropoff_latitude": 40.82918167114258,
    "store_and_fwd_flag": "N"
}


In [6]:
import collections

collections.Counter
collections.ChainMap
collections.defaultdict


collections.defaultdict

## Manual progressive validation

In [13]:
from river import compose
from river import linear_model
from river import metrics
from river import preprocessing

def distances(x):
    return {
        'L1': (
            abs(x['pickup_latitude'] - x['dropoff_latitude']) +
            abs(x['pickup_longitude'] - x['dropoff_longitude'])
        ),
        'L2': (
            (x['pickup_latitude'] - x['dropoff_latitude']) ** 2 +
            (x['pickup_longitude'] - x['dropoff_longitude']) ** 2
        ) ** 0.5
    }

model = compose.Pipeline(
    distances,
    preprocessing.StandardScaler(),
    linear_model.LinearRegression()
)

metric = metrics.MAE()

for x, y in dataset.take(10_000):
    y_pred = model.predict_one(x)
    metric.update(y, y_pred)
    model.learn_one(x, y)

metric


MAE: 541.162842

In [14]:
import datetime as dt

print(dt.timedelta(seconds=metric.get()))


0:09:01.162842


## Progressive validation one-liner

In [22]:
from river import evaluate

_ = evaluate.progressive_val_score(
    dataset=dataset.take(10_000),
    model=model.clone(),
    metric=metric.clone(),
    print_every=1000
)


[1,000] MAE: 488.670795
[2,000] MAE: 590.143499
[3,000] MAE: 510.149582
[4,000] MAE: 501.422914
[5,000] MAE: 579.632231
[6,000] MAE: 592.055439
[7,000] MAE: 613.247012
[8,000] MAE: 579.196832
[9,000] MAE: 561.097898
[10,000] MAE: 541.162842


## Delayed progressive validation

In [19]:
import itertools
from river import stream

dataset_with_delays = stream.simulate_qa(
    dataset=dataset,
    moment='pickup_datetime',
    delay=lambda _, y: dt.timedelta(seconds=y)
)

for i, x, y in itertools.islice(dataset_with_delays, 40):
    if y is None:
        print(f'{x["pickup_datetime"]} - trip #{i} departs')
    else:
        arrival_date = x['pickup_datetime'] + dt.timedelta(seconds=y)
        print(f'{arrival_date} - trip #{i} arrives after {y} seconds')


2016-01-01 00:00:17 - trip #0 departs
2016-01-01 00:00:53 - trip #1 departs
2016-01-01 00:01:01 - trip #2 departs
2016-01-01 00:01:14 - trip #3 departs
2016-01-01 00:01:20 - trip #4 departs
2016-01-01 00:01:33 - trip #5 departs
2016-01-01 00:01:37 - trip #6 departs
2016-01-01 00:01:47 - trip #7 departs
2016-01-01 00:02:06 - trip #8 departs
2016-01-01 00:02:45 - trip #9 departs
2016-01-01 00:03:02 - trip #10 departs
2016-01-01 00:03:31 - trip #6 arrives after 114 seconds
2016-01-01 00:03:31 - trip #11 departs
2016-01-01 00:03:35 - trip #12 departs
2016-01-01 00:04:42 - trip #13 departs
2016-01-01 00:04:57 - trip #14 departs
2016-01-01 00:05:07 - trip #15 departs
2016-01-01 00:05:08 - trip #16 departs
2016-01-01 00:05:18 - trip #17 departs
2016-01-01 00:05:35 - trip #18 departs
2016-01-01 00:05:39 - trip #19 departs
2016-01-01 00:05:54 - trip #3 arrives after 280 seconds
2016-01-01 00:06:04 - trip #20 departs
2016-01-01 00:06:12 - trip #21 departs
2016-01-01 00:06:22 - trip #22 departs
2

In [20]:
evaluate.progressive_val_score(
    dataset=dataset.take(10_000),
    model=model.clone(),
    metric=metric.clone(),
    # Same parameters as stream.simulate_qa
    moment='pickup_datetime',
    delay=lambda _, y: dt.timedelta(seconds=y)
)


MAE: 457.783569

## Feature engineering

### Window aggregates

In [23]:
from river import feature_extraction
from river import stats

def munge(x):
    return {
        **x,
        'weekday': x['pickup_datetime'].weekday()
    }

model = compose.Pipeline(
    munge,
    [
        distances,
        feature_extraction.TargetAgg(by='weekday', how=stats.Mean())
    ],
    preprocessing.StandardScaler(),
    linear_model.LinearRegression()
)

evaluate.progressive_val_score(
    dataset=dataset.take(10_000),
    model=model,
    metric=metric.clone(),
    moment='pickup_datetime',
    delay=lambda _, y: dt.timedelta(seconds=y)
)


MAE: 448.031811

### Geographical aggregates

In [25]:
dataset


Taxi ride durations in New York City.

The goal is to predict the duration of taxi rides in New York City.

      Name  Taxis                                                    
      Task  Regression                                               
   Samples  1,458,644                                                
  Features  8                                                        
    Sparse  False                                                    
      Path  /Users/max/river_data/Taxis/train.csv                    
       URL  https://maxhalford.github.io/files/datasets/nyc_taxis.zip
      Size  186.23 MB                                                
Downloaded  True                                                     

In [27]:
import statistics

min_lat = statistics.quantiles((x['pickup_latitude'] for x, _ in dataset.take(10_000)), n=100)[0]
max_lat = statistics.quantiles((x['pickup_latitude'] for x, _ in dataset.take(10_000)), n=100)[-1]
min_lon = statistics.quantiles((x['pickup_longitude'] for x, _ in dataset.take(10_000)), n=100)[0]
max_lon = statistics.quantiles((x['pickup_longitude'] for x, _ in dataset.take(10_000)), n=100)[-1]


In [31]:
grid_size = 20
lat_cuts = [min_lat + i * (max_lat - min_lat) / grid_size for i in range(grid_size)]
lon_cuts = [min_lon + i * (max_lon - min_lon) / grid_size for i in range(grid_size)]


In [32]:
import folium
import json

# Your code to calculate max_lon, min_lon, max_lat, min_lat, grid_size, lon_cuts, and lat_cuts here

# Create a Folium map centered at a location of your choice
m = folium.Map(location=[(min_lat + max_lat) / 2, (min_lon + max_lon) / 2], zoom_start=11)

# Create a list of features for the grid cells
features = []

# Create grid cells using lon_cuts and lat_cuts
for i in range(grid_size):
    for j in range(grid_size):
        cell = {
            "type": "Feature",
            "properties": {
                "name": f"Cell {i}-{j}"
            },
            "geometry": {
                "type": "Polygon",
                "coordinates": [
                    [
                        [lon_cuts[i - 1], lat_cuts[j - 1]],
                        [lon_cuts[i], lat_cuts[j - 1]],
                        [lon_cuts[i], lat_cuts[j]],
                        [lon_cuts[i - 1], lat_cuts[j]]
                    ]
                ]
            }
        }
        features.append(cell)

# Create a GeoJSON layer with the grid cells
grid_layer = folium.GeoJson(
    data={
        "type": "FeatureCollection",
        "features": features
    },
    name="Grid"
)

grid_layer.add_to(m)
m


In [42]:
import bisect

def munge_bis(x):
    return {
        **x,
        'weekday': x['pickup_datetime'].weekday(),
        'grid_cell': (
            bisect.bisect_left(lat_cuts, x['pickup_latitude']),
            bisect.bisect_left(lon_cuts, x['pickup_longitude'])
        )
    }

model = compose.Pipeline(
    munge_bis,
    [
        distances,
        feature_extraction.TargetAgg(by='weekday', how=stats.Mean()),
        feature_extraction.TargetAgg(by='grid_cell', how=stats.Count())
    ],
    preprocessing.StandardScaler(),
    linear_model.LinearRegression()
)

evaluate.progressive_val_score(
    dataset=dataset.take(10_000),
    model=model,
    metric=metric.clone(),
    moment='pickup_datetime',
    delay=lambda _, y: dt.timedelta(seconds=y)
)


MAE: 469.288426

In [55]:
from branca.colormap import LinearColormap

# Create a Folium map centered at a location of your choice
m = folium.Map(location=[(min_lat + max_lat) / 2, (min_lon + max_lon) / 2], zoom_start=11)

# Create a list of features for the grid cells
features = []

# Create a colormap to map counts to colors
cell_counts = {
    (i, j): count.get()
    for ((i, j),), count in model[1][-1]._groups.items()
}
colormap = LinearColormap(['yellow', 'red'], vmin=0, vmax=max(cell_counts.values()))

# Create grid cells using lon_cuts and lat_cuts, and color them based on counts
for i in range(grid_size):
    for j in range(grid_size):
        cell_count = cell_counts.get((i, j), 0)  # Retrieve the count from the dictionary
        color = colormap(cell_count)
        cell = {
            "type": "Feature",
            "properties": {
                "name": f"Cell {i}-{j}",
                "count": cell_count
            },
            "geometry": {
                "type": "Polygon",
                "coordinates": [
                    [
                        [lon_cuts[i - 1], lat_cuts[j - 1]],
                        [lon_cuts[i], lat_cuts[j - 1]],
                        [lon_cuts[i], lat_cuts[j]],
                        [lon_cuts[i - 1], lat_cuts[j]]
                    ]
                ]
            }
        }
        features.append(cell)

# Create a GeoJSON layer with the grid cells and add to map
grid_layer = folium.GeoJson(
    data={
        "type": "FeatureCollection",
        "features": features
    },
    style_function=lambda x: {
        'fillColor': colormap(x['properties']['count']),
        'color': 'black',
        'weight': 1,
        'fillOpacity': 0.4
    },
    name="Grid"
)

grid_layer.add_to(m)

# Add the colormap to the map
colormap.add_to(m)
m


## Progressive validation for batch models

In [58]:
from sklearn import preprocessing as sk_preprocessing
from sklearn import pipeline as sk_pipeline
from sklearn import linear_model as sk_linear_model
from sklearn import exceptions as sk_exceptions

sk_model = sk_pipeline.make_pipeline(
    sk_preprocessing.StandardScaler(),
    sk_linear_model.LinearRegression()
)

metric = metrics.MAE()

X = []
Y = []

for i, (x, y) in enumerate(dataset.take(10_000)):
    x_arr = list(distances(x).values())
    X.append(x_arr)
    Y.append(y)

    if i % 1_000 == 0 and i > 0:
        sk_model.fit(pd.DataFrame(X), Y)

    try:
        y_pred = sk_model.predict([x_arr])[0]
    except sk_exceptions.NotFittedError:
        y_pred = 0
    metric.update(y, y_pred)

metric


MAE: 557.213129

## Mondrian forests are a strong baseline

In [60]:
from river import forest

model = compose.Pipeline(
    distances,
    forest.AMFRegressor(seed=42)
)

evaluate.progressive_val_score(
    dataset=dataset.take(10_000),
    model=model,
    metric=metric.clone(),
    moment='pickup_datetime',
    delay=lambda _, y: dt.timedelta(seconds=y)
)


MAE: 430.469101

## Unsupervised updates during inference

Without...

In [69]:
model = compose.Pipeline(
    distances,
    preprocessing.StandardScaler(),
    linear_model.LinearRegression()
)

evaluate.progressive_val_score(
    dataset=dataset.take(10_000),
    model=model,
    metric=metric.clone()
)


MAE: 541.162842

With...

In [70]:
with compose.learn_during_predict():
    metric = evaluate.progressive_val_score(
        dataset=dataset.take(10_000),
        model=model.clone(),
        metric=metric.clone()
    )
metric


MAE: 540.87705

## Grid expansion

In [76]:
from river import optim
from river import utils

model = compose.Pipeline(
    distances,
    preprocessing.StandardScaler(),
    linear_model.LinearRegression()
)

grid = {
    'LinearRegression': {
        'optimizer': [
            (optim.SGD, {'lr': [.05, .01, .001]}),
            (optim.Adam, {'lr': [.05, .01, .001]})
        ]
    }
}

candidates = utils.expand_param_grid(model, grid)

for candidate in candidates:
    metric = evaluate.progressive_val_score(
        dataset=dataset.take(10_000),
        model=candidate,
        metric=metric.clone()
    )
    print(candidate['LinearRegression'].optimizer, metric)


SGD MAE: 7,851.383381
SGD MAE: 541.162842
SGD MAE: 528.255267
Adam MAE: 614.078932
Adam MAE: 649.885257
Adam MAE: 659.615598


## Model selection using bandits

In [86]:
from river import bandit
from river import model_selection

meta_model = model_selection.BanditRegressor(
    models=utils.expand_param_grid(model, grid),
    metric=metrics.MAE(),
    policy=bandit.EpsilonGreedy(epsilon=0.4, decay=0.05, seed=42),
)

evaluate.progressive_val_score(
    dataset=dataset.take(10_000),
    model=meta_model,
    metric=metric.clone()
)


MAE: 546.430121

In [88]:
meta_model.policy


Arm ID   Reward            Pulls   Share   
     5           MAE: 0.       0    0.00%  
     3   MAE: 422.985028       3    0.03%  
     0   MAE: 464.725473      86    0.86%  
     2   MAE: 466.373717      90    0.90%  
     4   MAE: 467.464257     160    1.60%  
     1   MAE: 549.309871   9,661   96.61%  

## VectorDict

In [89]:
import random

x = {i: random.random() for i in range(300)}
y = {i: random.random() for i in range(300)}
%timeit {x[i] * y[i] for i in range(300)}


22.2 µs ± 184 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


In [90]:
x_vec = utils.VectorDict(x)
y_vec = utils.VectorDict(y)
%timeit x_vec @ y_vec


8.39 µs ± 7.93 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


In [91]:
import math

assert math.isclose(x_vec @ y_vec, sum(x[i] * y[i] for i in range(300)))
