In [1]:
from pprint import pprint
from river import datasets

X_y = datasets.Bikes()

for x, y in X_y:
    pprint(x)
    print(f'Number of available bikes: {y}')
    break



ModuleNotFoundError: No module named 'river'

In [2]:
from river import compose
from river import linear_model
from river import metrics
from river import evaluate
from river import preprocessing
from river import optim

X_y = datasets.Bikes()


model = compose.Select('clouds', 'humidity', 'pressure', 'temperature', 'wind')
model |= preprocessing.StandardScaler()
model |= linear_model.LinearRegression(optimizer=optim.SGD(0.001))

metric = metrics.MAE()

evaluate.progressive_val_score(X_y, model, metric, print_every=20_000)




[20,000] MAE: 4.912727
[40,000] MAE: 5.333554
[60,000] MAE: 5.330948
[80,000] MAE: 5.392313
[100,000] MAE: 5.423059
[120,000] MAE: 5.541223
[140,000] MAE: 5.613023
[160,000] MAE: 5.622428
[180,000] MAE: 5.567824


MAE: 5.563893

Possiamo vedere il pessimo risultato, questo anche perchè non abbiamo fornito abbastanza features! Quindi possiamo pensare di estrarre features per esempio, Possiamo magari estrapolare il numero medio di biciclette per ora, quindi dobbiamo estrarre l'ora dalla colonna `moment` e usare il metodo `TargetAgg` per aggregare i valori per ora

Quindi cosa viene fatto? Abbiamo che vengono selezionate le colonne interessate dalla regressione, poi viene applicata la funzione get_hour, che dato il nostro dataset ci ridà le informazioni dell'ora della colonna moment, che era la colonna della data, quindi creiamo una feature, aggregando il valore del target per stazione e ora, e ci facciamo la media!

In [3]:
from river import feature_extraction
from river import stats

X_y = iter(datasets.Bikes())

def get_hour(x):
    x['hour'] = x['moment'].hour
    return x

model = compose.Select('clouds', 'humidity', 'pressure', 'temperature', 'wind')
model += (
    get_hour |
    feature_extraction.TargetAgg(by=['station', 'hour'], how = stats.Mean())
)
model |= preprocessing.StandardScaler()
model |= linear_model.LinearRegression(optimizer=optim.SGD(0.001))

metric = metrics.MAE()

evaluate.progressive_val_score(X_y, model,metric, print_every=20_000)

[20,000] MAE: 3.721246
[40,000] MAE: 3.829972
[60,000] MAE: 3.845068
[80,000] MAE: 3.910259
[100,000] MAE: 3.888652
[120,000] MAE: 3.923727
[140,000] MAE: 3.980953
[160,000] MAE: 3.950034
[180,000] MAE: 3.934545


MAE: 3.933498

In [4]:
model

In [5]:
import itertools

X_y = iter(datasets.Bikes())

model = compose.Select('clouds', 'humidity', 'pressure', 'temperature', 'wind')
model += (
    get_hour |
    feature_extraction.TargetAgg(by=['station', 'hour'], how=stats.Mean())
)
model |= preprocessing.StandardScaler()
model |= linear_model.LinearRegression()

for x, y in itertools.islice(X_y, 10000):
    y_pred = model.predict_one(x)
    model.learn_one(x, y)

x, y = next(X_y)
print(model.debug_one(x))

0. Input
--------
clouds: 0 (int)
description: clear sky (str)
humidity: 52 (int)
moment: 2016-04-10 19:03:27 (datetime)
pressure: 1,001.00000 (float)
station: place-esquirol (str)
temperature: 19.00000 (float)
wind: 7.70000 (float)

1. Transformer union
--------------------
    1.0 Select
    ----------
    clouds: 0 (int)
    humidity: 52 (int)
    pressure: 1,001.00000 (float)
    temperature: 19.00000 (float)
    wind: 7.70000 (float)

    1.1 get_hour | y_mean_by_station_and_hour
    -----------------------------------------
    y_mean_by_station_and_hour: 7.97175 (float)

clouds: 0 (int)
humidity: 52 (int)
pressure: 1,001.00000 (float)
temperature: 19.00000 (float)
wind: 7.70000 (float)
y_mean_by_station_and_hour: 7.97175 (float)

2. StandardScaler
-----------------
clouds: -1.36138 (float)
humidity: -1.73083 (float)
pressure: -1.26076 (float)
temperature: 1.76232 (float)
wind: 1.45841 (float)
y_mean_by_station_and_hour: 0.05496 (float)

3. LinearRegression
-------------------
Na

The debug_one method shows what happens to an input set of features, step by step.

And now comes the catch. Up until now we've been using the progressive_val_score method from the evaluate module. What this does it that it sequentially predicts the output of an observation and updates the model immediately afterwards. This way of doing is often used for evaluating online learning models, but in some cases it is the wrong approach.

The following paragraph is extremely important. When evaluating a machine learning model, the goal is to simulate production conditions in order to get a trust-worthy assessment of the performance of the model. In our case, we typically want to forecast the number of bikes available in a station, say, 30 minutes ahead. Then, once the 30 minutes have passed, the true number of available bikes will be available and we will be able to update the model using the features available 30 minutes ago. If you think about, this is exactly how a real-time machine learning system should work. The problem is that this isn't what the progressive_val_score method is emulating, indeed it is simply asking the model to predict the next observation, which is only a few minutes ahead, and then updates the model immediately. We can prove that this is flawed by adding a feature that measures a running average of the very recent values.

Quindi quello che abbiamo fatto con il metodo ``progressive_val_score`` è di fare previsione sul prossimo paramentro, quindi a noi arriva un valore e noi cerchiamo di predirre il prossimo basandoci sul nostro modello, e poi aggiorniamo.
Ma nel caso volessimo prevedere i valori per i prossimi 30 minuti..., quindi prevediamo i prossimi quante biciclette ci saranno fra 30 minuti, quindi noi prevediamo questo numero e 30 minuti dopo avremo il vero valore! E solo ora potremo aggiornare il modello!


In [7]:
X_y = datasets.Bikes()

model = compose.Select('clouds', 'humidity', 'pressure', 'temperature', 'wind')
model += (
    get_hour |
    feature_extraction.TargetAgg(by=['station', 'hour'], how=stats.Mean()) + 
    feature_extraction.TargetAgg(by='station', how=stats.EWMean(0.5))
)
model |= preprocessing.StandardScaler()
model |= linear_model.LinearRegression()

metric = metrics.MAE()

evaluate.progressive_val_score(X_y, model, metric, print_every=20_000)

[20,000] MAE: 20.159286
[40,000] MAE: 10.458898
[60,000] MAE: 7.2759
[80,000] MAE: 5.715397
[100,000] MAE: 4.775094
[120,000] MAE: 4.138421
[140,000] MAE: 3.682591
[160,000] MAE: 3.35015
[180,000] MAE: 3.091398


MAE: 3.06414

Quindi noi stiamo semplificando troppo il lavoro, cioè questo è un compito troppo facile per il nostro modellino... Quello che davvero vorremmo è poter fare forecasting per i prossimi 30 minuti! e dopo aggiornare il nostro modello!

Possiamo simulare questo comportamento utilizzando i paramentri `moment` e `delay` nel `progressive_val_score`. Quindi l'idea è quella che ogni osservazione dello stream sia mostrata due volte, una prima volta per fare la predizione, e una seconda per fare update del modello quando il *VERO VALORE* arriva al nostro stream!
Quindi `moment` è il parametro che determina quale variabile dovrebbe essere usata come timestamp, mentre la `dealy` controlla la durata dell'attesa prima di mostrare il vero valore al modello

In [8]:
import datetime as dt
model = compose.Select('clouds', 'humidity', 'pressure', 'temperature', 'wind')

model += (
    get_hour|
    feature_extraction.TargetAgg(by=['station','hour'], how=stats.Mean()) +
    feature_extraction.TargetAgg(by='station', how=stats.EWMean(0.5))
)
model |=preprocessing.StandardScaler
model |= linear_model.LinearRegression()

evaluate.progressive_val_score(
    dataset=datasets.Bikes(),
    model=model,
    metric=metrics.MAE(),
    moment='moment',
    delay=dt.timedelta(minutes=30),
    print_every=20_000
)

[20,000] MAE: 2.24812
[40,000] MAE: 2.240287
[60,000] MAE: 2.270287
[80,000] MAE: 2.28649
[100,000] MAE: 2.294264
[120,000] MAE: 2.275891
[140,000] MAE: 2.261411
[160,000] MAE: 2.285978
[180,000] MAE: 2.289353


MAE: 2.29304

In [9]:
from river import ensemble
from river import optim

model = compose.Select('clouds', 'humidity', 'pressure', 'temperature', 'wind')
model += (
    get_hour |
    feature_extraction.TargetAgg(by=['station', 'hour'], how=stats.Mean())
)
model += feature_extraction.TargetAgg(by='station', how=stats.EWMean(0.5))
model |= preprocessing.StandardScaler()
model |= ensemble.EWARegressor([
    linear_model.LinearRegression(optim.SGD()),
    linear_model.LinearRegression(optim.RMSProp()),
    linear_model.LinearRegression(optim.Adam())
])

evaluate.progressive_val_score(
    dataset=datasets.Bikes(),
    model=model,
    metric=metrics.MAE(),
    moment='moment',
    delay=dt.timedelta(minutes=30),
    print_every=20_000
)

[20,000] MAE: 2.253263
[40,000] MAE: 2.242859
[60,000] MAE: 2.272001
[80,000] MAE: 2.287776
[100,000] MAE: 2.295292
[120,000] MAE: 2.276748
[140,000] MAE: 2.262146
[160,000] MAE: 2.286621
[180,000] MAE: 2.289925


MAE: 2.293604