![River-logo](../assets/river_logo.png)

# General introduction to River and its functionality

`River` is a Python library for online machine learning. It first appears on November 2020, which is a result of a merge between [`creme`](https://github.com/MaxHalford/creme) and [`scikit-multiflow`](https://github.com/scikit-multiflow/scikit-multiflow).

Currently, with more than 3000 stars on `Github` and itself being applied in various data science project at major tech firms (Samsung/Harman Kardon, Bee2Beep, BNP Paribas, Lyft, etc.), `River` has turned itself to become a go-to library for machine learning on streaming data.

Key features of `River`:

* **Incremental learning**: For all models within `River`, only one sample is updated at a time.
* **Adaptive learning**: Methods are robust against any concept drift/anomalies in dynamic situations.
* **General**: `River` is designed for all-purpose use, just like `scikit-learn`. Models within the package deals with all kinds of problems, including regression, classification, clustering, and even ad-hoc tasks like pre-processing, etc.
* **Efficient**: Streaming methods in `River` are designed to consume time and memory at an efficient manner, mostly sub-linear.
* **Easy to use**: `River` is suitable for use at any experience level, from beginners to experts/researchers.

# From batch learning to stream learning

## Batch learning

Batch learning assumes that **all** data is available at the same time, for both training and testing, and is processed in batches or as a whole.

A minimal pipeline for traditional machine learning includes:

1. Data preprocessing and engineering
1. Training
1. Testing

We will take an example with [`Iris`](https://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html), a dataset that everyone in the world of machine learning should all have heard of, at least once. This is available as part of `scikit-learn`.

With this dataset, we will use $75\%$ of them for training, and the remaining $25\%$ for testing. The training and testing set have to be totally separated from each other, i.e they cannot overlap on one another.

In [None]:
from sklearn import datasets as datasets_skl
from sklearn.naive_bayes import GaussianNB as GaussianNB_skl
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix

In [None]:
iris = datasets_skl.load_iris()

# load variables and output into different variables
x = iris.data
y = iris.target

# split into training and testing set
x_train, x_test, y_train, y_test = train_test_split(
    x, y, test_size = 0.25, stratify=y, random_state=0
)

In [None]:
# shape of datasets
print(f"Shape of iris: {iris.data.shape}, "
      f"training set: {x_train.shape}, testing set: {x_test.shape}")

# feature names
print(f"The iris feature names include:", 
      ', '.join(str(p) for p in iris.feature_names))

# target names
print(f"The iris target names (groups) include:", 
      ', '.join(str(p) for p in iris.feature_names))

This is a classification problem, with $4$ features and $3$ groups (target names). As such, for this problem, we will use the Gaussian Naive Bayes (`GaussianNB`) for a parallel comparision against its incremental version in `River`.

In [None]:
gaussiannb_sklearn = GaussianNB_skl()
gaussiannb_sklearn.fit(X=x_train, y=y_train)
y_pred = gaussiannb_sklearn.predict(X=x_test)

In [None]:
# derive accuracy
print(f"SKLearn's Gaussian Naive Bayes accuracy: {accuracy_score(y_test, y_pred):.5f}")

A model accuracy of $97\%$, given that the dataset is simple and has a small number of data points, is indeed acceptable. However, in real life scenarios, generally speaking, we can not have all the data available at the same time. In fact, they usually come at a constant rate, which means that we have to collect data over time and generate new models after a certain time window.

However, this approach raises a lot of relevant questions, include

* Should we train new models on new data or even old data? To what time point?
* How much new data is enough to start creating a new model?
* How much should the new model be better than the old one, in terms of cost effectiveness?
* Storing data (especially for big data problems) is sometimes not feasible, impratical and/or too costly
* etc.

All of these problems, combined, triggered the formation of stream learning practices.

## Stream learning

Under the assumtion that

* Data comes in a stream, which means that we do not have all the data available at the same time;
* The number of data points are infinite; and 
* The statistics of the future-coming points are not predictable.

The stream learning methods are specifically designed to deal with this situation under a certain set of requirements:

* **Data are processed at one sample at a time.**: A sample is only seen once (learned once) and then the information regarding this point is totally forgotten after processing.
* **Time and memory efficient**: Although the data stream is infinite, the algorithm must be designed to be efficient enough to handle a large number of data points coming in a continuous pattern. Most online learning algorithms deemed to use resources at a sub-linear fashion.
* **Able to predict at any time**: The model can provide predictions for any data point coming at any time.

Within the scope of this demo, the stream can be simulated by iterating through the whole dataset, *one observation at a time* with `zip`.

In [None]:
for x_i, y_i in zip(x_train, y_train):
    # TBA
    pass
x_i, y_i # final observation

When dealing with a large number of data in 2D, `numpy.ndarrays` are much more preferred. Most machine learning packages in Python, including `scikit-learn`, natively deals with this type of data. 

However, when dealing with data coming in a streaming fashion, particularly under the assumption that data only comes one point at a time as one-dimensional arrays, dictionary is much more preferred. As such, when feeding the data into `River`'s algorithms, we need a bit of transformation. This can easily be done as follows:

In [None]:
for x_i, y_i in zip(x_train, y_train):
    x_i_dict = dict(zip(iris.feature_names, x_i))

x_i_dict, y_i

Although `scikit-learn` supports certain incremental algorithms (i.e these methods are provided with a `partial_fit` methods), this method only works for  *mini batches* - data that comes in at small batches at the same time.  

This also means that, we are doing "training", and then "testing" later, which is not the suitable practice for fully online machine learing.

In the following example with `River`, we are also using the Gaussian Naive Bayes method, which is an **incremental method** itself. The training and predicting method are provided via `learn_one` and `predict_one`.

### The `progressive_val_score` method

In [None]:
# Imports
from river import evaluate
from river import stream
from river import naive_bayes
from river import metrics

from sklearn.datasets import load_iris

The `progressive_val_score` method implements the **prequential evaluation** and requires three components: 

- a data stream
- a model
- a metric

In [None]:
# Setup stream and estimators
data = stream.iter_sklearn_dataset(datasets_skl.load_iris(), 
                                   shuffle=True, 
                                   seed=42)
nb = naive_bayes.GaussianNB()
metric = metrics.Accuracy()

# Setup evaluator
evaluate.progressive_val_score(dataset=data, 
                               model=nb, 
                               metric=metric, 
                               print_every=15)

### Under the hood

In the streaming setting, training and predicting happen inside a loop. Remember, a data sample is available only once in the stream.

The `iter_sklearn` method converts a `scikit-learn` dataset object into an iterable that returns the data as `dict`. This way, we do not need the manual `zip` command as mentioned previously.

The following example shows the main steps performed by `progressive_val_score`:

In [None]:
model = naive_bayes.GaussianNB()
metric = metrics.Accuracy()
data = stream.iter_sklearn_dataset(load_iris(), shuffle=True, seed=42)
n_passed_samples = 0

y_pred = []
y_true = []

for x, y in data:
    # Predict class
    y_p = model.predict_one(x)                    
    if y_p is not None:
        # Update metric
        metric.update(y_true=y, y_pred=y_p)       
    # Train the model
    model.learn_one(x, y)  
    # Process next sample if the model is empty
    if y_p is None:
        continue
    # Add one to the number of passed samples
    n_passed_samples += 1
    y_pred.append(y_p)
    y_true.append(y)
    if n_passed_samples % 15 == 0:
        print(f'[{metric.cm.n_samples + 1}] {metric}')
        
print(f'Model accuracy calculated by accuracy_score from sklearn: {accuracy_score(y_true, y_pred):.4f}')

### The order matters!

Notice the *order* of the training and prediction operations.

When a new data sample ($x_i$, $y_i$) arrives:

1. Predict the label $y_i'$ for the new data sample $x_i$, *not yet "seen"* by the model.
2. Train the model using the new sample ($x_i$, $y_i$).

This practice is commonly known as **prequential evaluation** or **interleaved test-then-train** evaluation. This means that, the method is used to predict the outcome of the new data point, even before learning.

A benefit from this approach is that we can use all labeled samples in the stream to measure a model's performance, since the model can provide predictions at any point in the stream, depending on its current internal state. Batch methods can only predict after they have finished training.

In [None]:
model = naive_bayes.GaussianNB()
metric = metrics.Accuracy()
data = stream.iter_sklearn_dataset(load_iris(), shuffle=True, seed=0)
n_passed_samples = 0

y_pred = []
y_true = []

for x, y in data:
    # Predict class
    y_p = model.predict_one(x)                    
    if y_p is not None:
        # Update metric
        metric.update(y_true=y, y_pred=y_p)       
    # Train the model
    model.learn_one(x, y)  
    # Process next sample if the model is empty
    if y_p is None:
        continue
    # Add one to the number of passed samples
    n_passed_samples += 1
    y_pred.append(y_p)
    y_true.append(y)
    if n_passed_samples % 15 == 0:
        print(f'[{metric.cm.n_samples + 1}] {metric}')
        
print(f'Model accuracy calculated by accuracy_score from sklearn: {accuracy_score(y_true, y_pred):.4f}')

### Key facts

- We use *all* labeled samples in the stream to measure a model's performance. 
- The model provides predictions at any point in the stream, according to its *current* internal state. Batch methods can only predict after they finished training.
- All methods (models, metrics, etc.) in `River` are updated **one sample at a time**
  - Mini-batch processing is supported by some methods.

### Dictionaries

The default data structure used internally in `River`.

- Efficient storage of 1-dimensional data (1 sample)
- Supports sparse/missing data
- Easy access to features by name (key)

In [None]:
# Last sample from the experiment above
print(x, y)

Besides methods, performance metrics in `River` can also be updated with one sample at a time. This means that we do not need to store all true and predicted labels in two separate arrays, which is sometimes *impractical*. 

Moreover, these metrics share the same  **confusion matrix**, which is efficient for calculating various metrics at the same time, for one single data stream.

In [None]:
from river.metrics import Accuracy

metric = Accuracy()
for yt, yp in zip(y_true, y_pred):
    metric.update(y_true=yt, y_pred=yp)
metric

In [None]:
print('Confusion matrix from all data (scikit-learn):')
print(confusion_matrix(y_true, y_pred))
print('Confusion matrix from updatable metric (River):')
print(metric.cm)

## Decision trees for data streams
---

### `NEWeather` data set

**Description:** The National Oceanic and Atmospheric Administration (NOAA),
has compiled a database of weather measurements from over 7,000 weather 
stations worldwide. Records date back to the mid-1900s. Daily measurements
include a variety of features (temperature, pressure, wind speed, etc.) as
well as a series of indicators for precipitation and other weather-related
events. The `NEweather` data set contains data from this database, specifically
from the Offutt Air Force Base in Bellevue, Nebraska ranging for over 50 years
(1949-1999).

**Features:** 8 Daily weather measurements
 
|       Attribute      | Description |
|:--------------------:|:-----------------------------|
| `temp`                   | Temperature
| `dew_pnt`                | Dew Point
| `sea_lvl_press`          | Sea Level Pressure
| `visibility`             | Visibility
| `avg_wind_spd`           | Average Wind Speed
| `max_sustained_wind_spd` | Maximum Sustained Wind Speed
| `max_temp`               | Maximum Temperature
| `min_temp`               | Minimum Temperature


**Class:** `rain` | 0: no rain, 1: rain
 
**Number of samples:** 18,159

---

In this example, we load the data from a csv file with `pandas.read_csv`. 

Next, we use the `iter_pandas` utility method to iterate over the `DataFrame`. The model in this example is a `HoeffdingTreeClassifier`, a type of decision tree designed for data streams.

In [None]:
import pandas as pd

from river import tree

df = pd.read_csv("../datasets/NEweather.csv")
features = df.columns[:-2]
data = stream.iter_pandas(X=df[features], y=df['rain'])

model = tree.HoeffdingTreeClassifier()
metric = metrics.Accuracy()

evaluate.progressive_val_score(dataset=data,
                               model=model,
                               metric=metric,
                               print_every=1000)

- Tree-based models are popular due to their interpretability.
- They use a tree data structure to model the data.
- When a sample arrives, it traverses the tree until it reaches a leaf node.
- Internal nodes define the path for a data sample based on the values of its features.
- Leaf nodes are models that provide predictions for unlabeled-samples and can update their internal state using the labels from labeled samples.

In [None]:
model.draw()

### Incremental tree growth

In [None]:
import itertools

data = stream.iter_pandas(X=df[features], y=df['rain'])
metric = metrics.Accuracy()

model = tree.HoeffdingTreeClassifier()

Running the code chunk within the `for` loop in the following cell multiple times we can "see" the tree growing. 

We have simplified this by using `IPython.display` to return multiple outputs.

In [None]:
from IPython.display import display

for i in range(len(df) // 1000 + 1):
    chunk = itertools.islice(data, 1000)     # Get chunks of data from the stream
    for x, y in chunk:
        y_pred = model.predict_one(x)
        metric.update(y_true=y, y_pred=y_pred)
        model.learn_one(x, y)

    print(f'{metric} - n_samples: {metric.cm.n_samples}')
    display(model.draw())

## Pipelines
---

Pipelines are powerful tools that "chain" a sequence of operations. 

For example, the logistic regression model is sensitive to the scale of the features. Scaling the data so that each feature has mean $\mu=0$ and variance $\sigma^2=1$ is generally considered good practice. This is provided by the `StandardScaler` method within the `preprocessing` module of `River`.

We can apply the scaling and train the logistic regression sequentially in an elegant manner using a Pipeline.


In [None]:
from river import linear_model
from river import preprocessing
from river import compose

df = pd.read_csv("../datasets/NEweather.csv")
features = df.columns[:-2]
stream = stream.iter_pandas(X=df[features], y=df['rain'])

scaler = preprocessing.StandardScaler()
pipeline = compose.Pipeline(
    ('scale', preprocessing.StandardScaler()),
    ('log_reg', linear_model.LogisticRegression())
)

metric = metrics.Accuracy()

evaluate.progressive_val_score(dataset=stream,
                               model=pipeline,
                               metric=metric,
                               print_every=1000)

We can visualize the pipeline by simply calling itself.

In [None]:
pipeline

## Regression
---

### Bike Sharing data set

Contains the **hourly and daily count of rental bikes** between the years 2011 and 2012 in the Capital bike-share system with the corresponding weather and seasonal information. Attributes include weather, temperature, date, time, etc.

|      Attribute      | Description |
|:-------------------:|:-----------------------------------------------------------------|
| `instant`             | record index
| `dteday`              | date
| `season`              | (1:winter, 2:spring, 3:summer, 4:fall)
| `yr`                  | year (0: 2011, 1:2012)
| `mnth`                | month ( 1 to 12)
| `hr`                  | hour (0 to 23)
| `holiday`             | weather day is holiday or not
| `weekday`             | day of the week
| `workingday`          | if day is neither weekend nor holiday is 1, otherwise is 0.
| `weathersit`          | 1: Clear, Few clouds, Partly cloudy, Partly cloudy<br>2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist<br>3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds<br>4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog
| `temp`                | Normalized temperature in Celsius.
| `atemp`               | Normalized feeling temperature in Celsius.
| `hum`                 | Normalized humidity.
| `windspeed`           | Normalized wind speed.
| `casual`              | count of casual users
| `registered`          | count of registered users

**Target:** `cnt` | number of bikes rented including both casual and registered
 
**Number of samples:** 17,389

For this example we will use the k-nearest neighbor algorithm.

The implementation of KNN Regressor in `River` involves 3 parameters: 
- `n_neighbors`: Number of neighbors to search for.
- `engine`: Search engine used to store the instances and perform search queries. Based on the chosen engines, the search will be exact or approximate. The default engine used for the regressor would be SWINN (Sliding Window-based Nearest Neighbor search using Graphs).
- `aggregation_methos`: Method to aggregate the target value of neighbors. Including three methods: `mean`, `median`, and `weighted_mean`.

Basically, KNN regressor perform search queries in the data buffer using the defined engine, with predictions obtained by aggregating the values of the closest neighbors stored samples with respect to a query sample.

SWINN [(Saolo et al., 2022)](https://papers.ssrn.com/sol3/papers.cfm?abstract_id=4292758) is a fairly new algorithm. It extends the NNDescend algorithm to handle vertex addition and removal in a FIFO data ingestion policy. The basic idea of SWINN (or NNDescend) is that "the neighbor of my neighbors might as well be my neighbors", and the algorithm can be described generally as follows:

- A random neighborhood graph is initiated.
- For each node in the search graph: refine the current neighborhood by checking if there are better neighborhood options among neighbors of the current neighbors.
- If the total number of neighborhood changes is smaller than a given stopping criterion, stop.

The algorithm is more effective when the maximum size of the data buffer is greater than $500$. For smaller data sizes, lazy search might be preferred.

In [None]:
from river import neighbors
from river.stream import iter_pandas

df = pd.read_csv("../datasets/bike.csv")
features = df.columns[:-2]

data = iter_pandas(X=df[features], y=df['cnt'])
model = neighbors.KNNRegressor(n_neighbors=3, aggregation_method="weighted_mean")
metric = metrics.R2()

evaluate.progressive_val_score(dataset=data,
                               model=model,
                               metric=metric,
                               print_every=1000)

The k-nearest neighbor algorithm assumes that all the features are numeric.

In the `bike` data, there are some categorical features. We can easily filter them in a pipeline:

In [None]:
data = iter_pandas(X=df[features], y=df['cnt'])
pipeline = (
    compose.Discard('season', 'mnth', 'weekday', 'weathersit') |
    neighbors.KNNRegressor(n_neighbors=3, aggregation_method="weighted_mean")
)
metric = metrics.R2()

evaluate.progressive_val_score(dataset=data,
                               model=pipeline,
                               metric=metric,
                               print_every=1000)

In [None]:
pipeline

## Concept Drift
---

Dynamic environments are challenging for machine learning methods because data changes over time.

For this example, we will generate a synthetic data stream by concatenating data from 3 different distributions:
- $dist_a$: $\mu=0.8$, $\sigma=0.05$
- $dist_b$: $\mu=0.4$, $\sigma=0.02$
- $dist_c$: $\mu=0.6$, $\sigma=0.1$.

In [None]:
import numpy as np
from utils import plot_data

# Generate data for 3 distributions
random_state = np.random.RandomState(seed=42)
dist_a = random_state.normal(0.8, 0.05, 1000)
dist_b = random_state.normal(0.4, 0.02, 1000)
dist_c = random_state.normal(0.6, 0.1, 1000)

# Concatenate data to simulate a data stream with 2 drifts
data = np.concatenate((dist_a, dist_b, dist_c))


plot_data(dist_a, dist_b, dist_c)

As observed above, the synthetic data stream has 3 **abrupt drifts**. An analogy could be trending topics on Twitter which can change quickly.

The goal is to detect that drift has occurred, after samples **1000** and **2000** in the synthetic data stream.

In this example, we will use the ADaptive WINdowing (`ADWIN`) drift detection method.



In [None]:
from river import drift

drift_detector = drift.ADWIN()
drifts = []

for i, val in enumerate(data):
    drift_detector.update(val)           # Data is processed one sample at a time
    if drift_detector.change_detected:
        print(f'Change detected at index {i}')
        drifts.append(i)
        drift_detector.reset()           # As a best practice, we reset the detector

plot_data(dist_a, dist_b, dist_c, drifts)

### Impact on predictive performance

Concept drift can negatively impact learning methods if not properly handled. Multiple real-world applications suffer **model degradation** as the models can not adapt to changes in the data.

In this example, we will use two popular streaming models:

1. The `Hoeffding Tree`
2. The `Hoeffding Adaptive Tree`

The `Hoeffding Adaptive Tree` uses `ADWIN` to detect changes. If change is detected in a given branch, an alternate branch is created and eventually replaces the original branch if it shows better performance on new data.

---
### `AGRAWAL` dataset

We will load the data from a csv file. The data was generated using the `AGRAWAL` data generator with 3 **gradual drifts** at the 5k, 10k, and 15k marks.

The `AGRAWAL` synthetic data generator produces 9 features, 6 numeric and 3 categorical.

There are 10 functions for generating binary class labels from the features. These functions determine whether a **loan** should be approved.

| Feature    | Description            | Values                                                                |
|------------|------------------------|-----------------------------------------------------------------------|
| `salary`     | salary                 | uniformly distributed from 20k to 150k                                |
| `commission` | commission             | if (salary <   75k) then 0 else uniformly distributed from 10k to 75k |
| `age`        | age                    | uniformly distributed from 20 to 80                                   |
| `elevel`     | education level        | uniformly chosen from 0 to 4                                          |
| `car`        | car maker              | uniformly chosen from 1 to 20                                         |
| `zipcode`    | zip code of the town   | uniformly chosen from 0 to 8                                          |
| `hvalue`     | value of the house     | uniformly distributed from 50k x zipcode to 100k x zipcode            |
| `hyears`     | years house owned      | uniformly distributed from 1 to 30                                    |
| `loan`       | total loan amount      | uniformly distributed from 0 to 500k                                  |

**Class:** `y` | 0: no loan, 1: loan
 
**Samples:** 20,000

`elevel`, `car`, and `zipcode` are categorical features.

In [None]:
df = pd.read_csv("../datasets/agr_a_20k.csv")
features = df.columns[:-2]

data = iter_pandas(X=df[features], y=df['class'])
model = tree.HoeffdingTreeClassifier(nominal_attributes=['elevel', 'car', 'zipcode'])
metric = metrics.Accuracy()

evaluate.progressive_val_score(dataset=data,
                               model=model,
                               metric=metric,
                               show_memory=True,
                               print_every=1000)

In [None]:
model.draw()

In [None]:
df = pd.read_csv("../datasets/agr_a_20k.csv")
features = df.columns[:-2]

data = iter_pandas(X=df[features], y=df['class'])
model = tree.HoeffdingAdaptiveTreeClassifier(nominal_attributes=['elevel', 'car', 'zipcode'], seed=42)
metric = metrics.Accuracy()

evaluate.progressive_val_score(dataset=data,
                               model=model,
                               metric=metric,
                               show_memory=True,
                               print_every=1000)

In [None]:
model.draw()

In [None]:
from river.tree import HoeffdingAdaptiveTreeClassifier
from river.stream import iter_pandas
from river.metrics import Accuracy

import pandas as pd

data = pd.read_csv("../datasets/agr_a_20k.csv")
features = data.columns[:-2]
stream = iter_pandas(X=data[features], y=data['class'])
model = HoeffdingAdaptiveTreeClassifier(nominal_attributes=['elevel', 'car', 'zipcode'], 
                                        seed=42)
metric = Accuracy()

evaluate.progressive_val_score(dataset=stream, 
                               model=model, 
                               metric=metric, 
                               print_every=1000)

In [None]:
model.draw()

# Text clustering

In [None]:
corpus = [
    {"text":'This is the first document.',"idd":1, "cluster": 1, "cluster":1},
    {"text":'This document is the second document.',"idd":2,"cluster": 1},
    {"text":'And this is super unrelated.',"idd":3,"cluster": 2},
    {"text":'Is this the first document?',"idd":4,"cluster": 1},
    {"text":'This is super unrelated as well',"idd":5,"cluster": 2},
    {"text":'Test text',"idd":6,"cluster": 5}
]

stopwords = ['stop', 'the', 'to', 'and', 'a', 'in', 'it', 'is', 'I']

metric = metrics.AdjustedRand()

model = compose.Pipeline(
    feature_extraction.BagOfWords(lowercase=True, ngram_range=(1, 2), stop_words=stopwords),
    cluster.TextClust(real_time_fading=False, fading_factor=0.001, tgap=100, auto_r=True,
    radius=0.9)
)

for x in corpus:
    y_pred = model.predict_one(x["text"])
    y = x["cluster"]
    metric = metric.update(y,y_pred)
    model = model.learn_one(x["text"])

print(metric)

# Deep-river
## Multi target regression

In [1]:
from river import evaluate, compose
from river import metrics
from river import preprocessing
from river import stream
from sklearn import datasets
from torch import nn
from deep_river.regression.multioutput import MultiTargetRegressor

class MyModule(nn.Module):
    def __init__(self, n_features):
        super(MyModule, self).__init__()
        self.dense0 = nn.Linear(n_features, 3)

    def forward(self, X, **kwargs):
        X = self.dense0(X)
        return X

dataset = stream.iter_sklearn_dataset(
    dataset=datasets.load_linnerud(),
    shuffle=True,
    seed=42
)
model = compose.Pipeline(
    preprocessing.StandardScaler(),
    MultiTargetRegressor(
        module=MyModule,
        loss_fn='mse',
        lr=0.3,
        optimizer_fn='sgd',
    )
)

metric = metrics.multioutput.MicroAverage(metrics.MAE())
ev = evaluate.progressive_val_score(dataset, model, metric)
print(f"MicroAverage(MAE): {metric.get():.2f}")

MicroAverage(MAE): 28.36


## Classification

In [8]:
from river import metrics, datasets, preprocessing, compose
from deep_river import classification
from torch import nn
from torch import optim
from torch import manual_seed
import time

_ = manual_seed(42)

class MyModule(nn.Module):
    def __init__(self, n_features):
        super(MyModule, self).__init__()
        self.dense0 = nn.Linear(n_features, 5)
        self.nonlin = nn.ReLU()
        self.dense1 = nn.Linear(5, 2)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, X, **kwargs):
        X = self.nonlin(self.dense0(X))
        X = self.nonlin(self.dense1(X))
        X = self.softmax(X)
        return X

model_pipeline = compose.Pipeline(
    preprocessing.StandardScaler(),
    classification.Classifier(module=MyModule, loss_fn='binary_cross_entropy', optimizer_fn='adam')
)

dataset = datasets.Phishing()
metric = metrics.Accuracy()

time_start = time.time()
for x, y in dataset:
    y_pred = model_pipeline.predict_one(x)  # make a prediction
    metric = metric.update(y, y_pred)  # update the metric
    model_pipeline = model_pipeline.learn_one(x, y)  # make the model learn
time_end = time.time()

print(f"Accuracy: {metric.get():.4f}")
print(f"Time conducted: {time_end-time_start} seconds.")

Accuracy: 0.6728
Time conducted: 0.33247828483581543 seconds.


## Anomaly detection

In [9]:
from deep_river.anomaly import Autoencoder
from river import metrics
from river.datasets import CreditCard
from torch import nn
import math
from river.compose import Pipeline
from river.preprocessing import MinMaxScaler

dataset = CreditCard().take(5000)
metric = metrics.ROCAUC(n_thresholds=50)

class MyAutoEncoder(nn.Module):
    def __init__(self, n_features, latent_dim=3):
        super(MyAutoEncoder, self).__init__()
        self.linear1 = nn.Linear(n_features, latent_dim)
        self.nonlin = nn.LeakyReLU()
        self.linear2 = nn.Linear(latent_dim, n_features)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, X, **kwargs):
        X = self.linear1(X)
        X = self.nonlin(X)
        X = self.linear2(X)
        return self.sigmoid(X)

ae = Autoencoder(module=MyAutoEncoder, lr=0.005)
scaler = MinMaxScaler()
model = Pipeline(scaler, ae)

for x, y in dataset:
    score = model.score_one(x)
    model = model.learn_one(x=x)
    metric = metric.update(y, score)

print(f"ROCAUC: {metric.get():.4f}")


URLError: <urlopen error [Errno 8] nodename nor servname provided, or not known>