![MIoT_GDPI](img/MIOT_GDPI_header.png)

# Unidad 02 - Operaciones comunes en Stream Learning

El objetivo principal de esta práctica es que os familiaricéis con las operaciones básicas para trabajar en *online learning* y como integrarlas en un *pipeline* de [River](https://riverml.xyz/). Podréis comprobar que su uso es similar al de `scikit-learn` que ya conocéis.

La mayor parte del contenido de este Notebook se dedica a explicar el uso de los comandos y de su integración en un *pipeline* apoyándose en ejemplos concretos que ilustran su aplicación a un problema real. Es crucial que dediquéis tiempo a leer y comprender el material, en lugar de simplemente ejecutar el código. Os invitamos a experimentar modificando y variando el código proporcionado para que podáis explorar las distintas opciones y profundizar en su funcionamiento.



**importante**: El Notebook contiene varios ejercicios sencillos. Debéis desarrollarlos durante la clase y enviarlos por el aula virtual del curso, en la tarea correspondiente.



## Referencias útiles para la práctica
1. API Pandas: [https://pandas.pydata.org/docs/reference/index.html](https://pandas.pydata.org/docs/reference/index.html)
2. Api River: [https://riverml.xyz](https://riverml.xyz)
3. Api Scikit-Learn: [https://scikit-learn.org/stable/api/index.html](https://scikit-learn.org/stable/api/index.html)
4. Bahri, M., Bifet, A., Gama, J., Gomes, H. M., & Maniu, S. (2021). [Data stream analysis: Foundations, major tasks and tools](https://doi.org/10.1002/widm.1405). Wiley Interdisciplinary Reviews: Data Mining and Knowledge Discovery, 11(3), e1405.
5. Gomes, H. M., Read, J., Bifet, A., Barddal, J. P., & Gama, J. (2019). [Machine learning for streaming data: state of the art, challenges, and opportunities](https://www.google.com/url?sa=t&source=web&rct=j&opi=89978449&url=https://kdd.org/exploration_files/3._CR_7._Machine_learning_for_streaming_data_state_of_the_art-Final.pdf). ACM SIGKDD Explorations Newsletter, 21(2), 6-22.


## Del Batch ML tradicional al Online ML

Este tutorial está inspirado en los [ejemplos](https://riverml.xyz/latest/recipes/reading-data/) proporcionados por River en su [página web](https://riverml.xyz/latest/), por  tanto, os animamos a consultar la documentación original para una exploración más profunda de cada uno de los temas particulares que se muestren a continuación.

En general, casi cualquier enfoque asociado al aprendizaje automático contiene los siguientes pasos:

1. Entender y definir el problema y su contexto.
2. Obtener los datos.
3. Explorar, analizar y entender los datos.
4. Preprocesar los datos.
5. Seleccionar, optimizar y entrenar los modelos ML.
6. Evaluar y el modelo seleccionado.
7. Desplegar, monitorizar y mantener la solución.

Antes de comenzar, vamos a examinar un ejemplo clásico que utiliza todos estos pasos utilizando la librería `scikit-learn:



In [None]:
from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import make_scorer, roc_auc_score
from sklearn.model_selection import cross_val_score, KFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from rich import print


# Carga de datos
dataset = load_breast_cancer()
X, y = dataset.data, dataset.target

# Preparación del pipeline con los pasos de preprocesado y el modelo seleccionado
pipe = Pipeline([
    ('scale', StandardScaler()),                       # Estandarización de los datos avg 0 y std 1
    ('extractor', PCA(0.95)),                          # Reducción de la dimensionalidad con PCA (0.95 of the varianza)
    ('classifier', LogisticRegression(solver='lbfgs')) # Selección y configuración del modelo
])

# Definición de una validación cruzada estableciendo una semilla
cv = KFold(n_splits=5, shuffle=True, random_state=42)

# Evaluación del modelo para cada fold de la validación cruzada
scorer = make_scorer(roc_auc_score)
scores = cross_val_score(pipe, X, y, scoring=scorer, cv=cv)

# Imprime la media de la métrica y su desciación estándard
print(f'ROC AUC: {scores.mean():.4f} (± {scores.std():.4f})')

¿Cuál es el problema con esta aproximación? En realidad, no tiene ningún problema, pero existen algunas desventajas potenciales, particularmente con respecto al tamaño del conjunto de los datos, la aparición de datos nuevos y la heterogeneidad de las características a lo largo del tiempo.

* **Tamaño del conjunto de datos**: imagina que los datos para el entrenamiento fueran demasiado grandes para caber en la memoria de tu ordenador. En ese caso, el programa, probablemente,  se habría bloqueado. Aunque se pueden aplicar diferentes técnicas para minimizar este problema,  existe un límite para estas optimizaciones. De hecho, si el dataset constara de millones de observaciones (ej.  cientos de gigabytes), podría haberse requerido hardware especial solo para cargar el dataset.

* **Datos nuevos**: Otro problema potencial es la incorporación de  datos  nuevos al modelo. Los enfoques tradicionales requieren comenzar a entrenar desde cero con un nuevo conjunto de datos resultante de la combinación de los datos antiguos con las nuevas observaciones disponibles. Esto es particularmente problemático en aplicaciones en tiempo real donde tienes  datos nuevos disponibles cada poco. En muchas aplicaciones reales, la solución es desarrollar un pipeline de integración continua que pueda generar y desplegar un nuevo modelo cada poco. es importante destacar que por datos nuevos nos referimos a observaciones que son radicalmente diferentes de las empleadas para entrenar, dónde las relaciones entre las entradas (*inputs*) y salidas (*outputs*) ha cambiado de forma que el modelo no puede gestionarlas.

* **Heterogeneidad de características**: Finalmente, otra desventaja con el enfoque tradicional es que las características con las que se ha entrenado deberán estar disponibles siempre durante el tiempo de producción. Además, no pueden incorporarse nuevas características a lo largo del tiempo. En escenarios de plantas industriales es habitual que determinadas características no estén disponibles en un momento determinado (sensores en estropeados o con mal funcionamiento) para luego estar nuevamente disponibles. En estos mismos escenarios, nuevos sensores pueden desplegarse y empezar a medir y generar nuevas características. Desde un punto de vista tradicional, la única forma de gestionar un número cambiante de características es reentrenando desde cero.


## 2. *Incremental Learning*

Como se mencionó en la unidad anterior, el aprendizaje incremental (*incremental learning*) también se conoce como aprendizaje en línea (*online learning*) o aprendizaje de flujo (*stream learning*). Es habitual que se sustituya el término *online* por incremental o *stream* debido a lo confuso del concepto de "aprendizaje en línea", que, generalmente,  se refiere a una opción educativa (esto es bastante obvio si buscas en Google "aprendizaje en línea"). 

El concepto de *incremental learning* se refiere a ajustar un modelo ML a partir de las observaciones obtenidas a través de un flujo (*stream*) de datos en tiempo real. Expresado de otra manera, los datos no están disponibles en su totalidad como en el ML tradicional (dataset de entrenamiento), sino que las observaciones se proporcionan una por una a medida que se leen de un flujo de datos. Por ejemplo, imagina el caso anterior que, en lugar de disponer del conjunto de datos completo para entrenar, tenemos un punto de referencia temporal que proporciona una observación a la vez. Esto se puede simular con un simple bucle: 


In [None]:
for xi, yi in zip(X, y):#simula obtener una observación por cada vuelta del bucle
    pass

print(xi)#Imprimimos la última observación para ver su formato
print(yi)#Imprimimos la última etiqueta para ver su formato

Dado que los datos ya están en memoria, este no es el escenario ideal pero, sin embargo, tened en cuenta que en este caso particular solo tenemos acceso a una sola muestra cada vez (dentro del bucle y representada por los valores xi, yi). Este bucle simula el flujo de datos (*data stream*), donde la función zip devuelve un iterador de Python. Esto no es demasiado diferente de la forma en que podríamos iterar a través de un archivo CSV, recibir un stream de datos de Kafka u obtener los resultados de una consulta SQL, fila por fila.

Una cosa a tener en cuenta es el hecho de que en este ejemplo `xi` es una instancia de `numpy.array` (los datos se cargaron utilizando la librería de scikit-learn) pero, por su diseño, la librería  `River`, utiliza la clase `dict` como base para su trabajo trabajo. Cada observación viene representada por un diccionario y se asume que cada una de ellas equivale a una muestra del *stream* de datos.

Hay algunas consideraciones que vale la pena recordar y tener en cuenta:

1. El  uso de `numpy` es interesante en entornos de computación de alto rendimiento. Recuerda que `dict` está implementado en Python, mientras que `numpy.array` está implementado a bajo nivel en C y Fortran. Una de las ventajas de usar `dict` es su facilidad de uso y que facilita la legibilidad del programa.

2. El procesamiento en línea es diferente al procesamiento por lotes (*batch learning*), ya que **la vectorización no aporta ninguna aceleración en el proceso** (recordad que solo estamos procesando una observación de cada vez). Las librerías de procesamiento numérico esán optimizadas para operaciones vectorizadas e introducen una sobrecarga considerable si solo se procesa una única muestra. En este tipo de escenarios, el uso de  `dict` no penaliza y puede simplificar el desarrollo.

In [None]:
print(f"In numpy.array format:{xi}\n")
print(f"In dict format: {dict(zip(dataset.feature_names, xi))}")

Para facilitar la portabilidad de algoritmos y *pipelines* trabajo de ML preexistentes, `River` proporciona una función *wrapper* que transforma los datasets de `scikit-learn` al formato requerido. 

In [None]:
from river import stream
for xi, yi in stream.iter_sklearn_dataset(load_breast_cancer()):
    pass

print(xi)#Imprimimos la última observación para ver su formato
print(yi)#Imprimimos la última etiqueta para ver su formato

Si bien la mayoría de las operaciones se traducen con bastante facilidad entre el ML tradicional y el aprendizaje incremental, no todas se pueden hacer de forma directa. Vamos a ejemplificar esto con el problema del escalado de los datos, en concreto con la estandarización (escalar los datos para que tengan media 0 y varianza 1). Esta es una operación trivial sobre lotes de datos pero que en aprendizaje incremental requerirá calcular estadísticas dinámicamente. 

 se debe tener cuidado al convertir entre los bien conocidos batches (lotes) y streams (flujos). Ilustraremos esto considerando el problema del escalado de datos. Recordemos que la estandarización de los datos, es decir, escalar los datos para que tengan media 0 y varianza 1, es una operación trivial sobre lotes, mientras que para los flujos de datos se requerirán estadísticas en ejecución (running) o móviles (moving). Esta sencilla operación se vuelve un poco más complicada en el aprendizaje incremental porque no conocemos los valores de la media y la desviación estándar antes de procesar todos los datos.

Una primera aproximación podría ser realizar una primera pasada sobre todos los datos para calcular los valores necesarios y luego escalar los valores durante una segunda pasada. Sin embargo, esta solución es inconsistente con nuestro objetivo: procesar los datos una sola vez.

La solución  es usar estadísticas dinámicas donde no se usen  la media y la desviación estándar exactas, sino una estimación que se actualiza con cada nuevo valor. Más formalmente, dado la media $\mu_t$, y el conteo de observacionbes  $n_t$, ambos en el instante temporal $t$, la media móvil puede ser facilmente actualizada para cada muestra aplicando la siguiente función: 


$$\large
  n_{t+1} = n_t +1 \\   $$
$$  
  \large
  \mu_{t+1} = \mu_t +\frac{x - \mu_t}{n_{t+1}}    
$$

De la misma manera, el cálculo de la varianza ($\sigma_t$) en el instante $t$ podría actualizarse empleando:
$$
\large
  s_{t+1} = s_t + (x-\mu_t)\times(x-\mu_{t+1})\\
  \large
  \sigma_{t+1} = \frac{s_{t+1}}{n_{t+1}}
$$

 Estas fórmulas pueden ser fácilmente implementadas en Python. Veamos un ejemplo:

In [None]:

n, mean, s, variance = 0, 0, 0, 0

for xi, yi in stream.iter_sklearn_dataset(load_breast_cancer()):
    n += 1
    mean_t = mean
    mean += (xi['mean radius'] - mean_t) / n
    s += (xi['mean radius'] - mean_t) * (xi['mean radius'] - mean)
    variance = s / n

    print(f'Media dinámica: {mean:.3f} - Varianza dinámica: {variance:.3f}')
    
print(f'Media final: {mean:.3f} - Varianza final: {variance:.3f}')

Comparemos ahora el resultado con la implementación nativa de `numpy`

In [None]:
import numpy as np
i = list(dataset.feature_names).index('mean radius')
print(f'True mean: {np.mean(X[:, i]):.3f}')
print(f'True variance: {np.var(X[:, i]):.3f}')

Como era de esperar, los resultados finales  son idénticos, con una diferencia clave: la implementación de `numpy` requiere que todos los datos estén disponibles para el cálculo, mientras que en el caso *online*, se calculan progresivamente. Por lo tanto, debemos ser conscientes de estos comportamientos y darnos cuenta de que los resultados con solo unas pocas observaciones no son precisos.

Aunque la mayoría de las medidas estadísticas  se podrían desarrollar fácilmente en Python de forma dinámica, la realidad es que no es necesario, ya que River proporciona la mayoría de ellas en el módulo `stats`. Por ejemplo, para crear tanto la media móvil como la varianza móvil emplearíamos:

In [None]:
from river import stats

r_mean=stats.Mean()
r_variance=stats.Var()

for xi, yi in stream.iter_sklearn_dataset(load_breast_cancer()): 
    r_mean.update(xi['mean radius'])
    r_variance.update(xi['mean radius'])
    print(f'Running mean: {r_mean.get():.3f} - Running variance: {r_variance.get():.3f}')

Una vez que sabemos cómo calcular las diferentes estadísticas móviles sobre los datos, el siguiente paso es usarlas para estandarizar las observaciones de manera similar al enfoque tradicional. En `River`, se, dentro del módulo `preprocessin`están disponibles varias funciones  para aplicar este tipo de operaciones. Por ejemplo, para estandarizar todas las características del ejemplo anterior, podríamos realizar:



In [None]:
from river.preprocessing import StandardScaler

scaler = StandardScaler()

for xi, yi in stream.iter_sklearn_dataset(load_breast_cancer()):
    scaler.learn_one(xi)
    xi_scaled=scaler.transform_one(xi)
    print(f"xi:{xi}")
    print(f"scaled xi {xi_scaled}")

### Aplicando Machine Learning

Ahora que ya tenemos los datos están escalados, podemos aplicar un algoritmo de aprendizaje automático. En el siguiente ejemplo usamos una regresión logística con un descenso de gradiente estocástico (SGD). 


In [None]:
from river.linear_model import LogisticRegression
from river.optim import SGD

scaler = StandardScaler()
optimizer = SGD(lr=0.01)
log_reg = LogisticRegression(optimizer)

y_true = []
y_pred = []

for xi, yi in stream.iter_sklearn_dataset(load_breast_cancer(), shuffle=True, seed=42):

    # Escalado de las característcias
    scaler.learn_one(xi)
    xi_scaled =scaler.transform_one(xi)
    # Testeo del modelo actual empleando una observación nunca vista
    yi_pred = log_reg.predict_proba_one(xi_scaled)
    # Entrenamiento del modelo con la observación
    log_reg.learn_one(xi_scaled, yi)

    # Almacenamos el valor real y la predicción
    y_true.append(yi)
    y_pred.append(yi_pred[True])

print(f'ROC AUC: {roc_auc_score(y_true, y_pred):.4f}')

Los resultados parecen ser ligeramente mejores que los obtenidos con `scikit-learn` pero debemos hacer una comparación más adecuada. Para hacer una comparación más justa, ambos modelos deberían usar la misma configuración para la validación cruzada (CV). Aunque podríamos definir el mismo proceso con bastante facilidad, los dos procesos podrían hacerse completamente comparables utilizando un módulo integrado de `River` llamado `compat`. Este módulo es un *wrapper* que mejora la compatibilidad con otras librerías de Python. Esto se puede realizar llamando a la función `convert_river_to_sklearn`,  que genera un objeto perfectamente compatible con las funciones de `scikit-learn.

In [None]:
from river.compose import Pipeline
from river.compat import convert_river_to_sklearn


# Definimos el pipeline de River
model = Pipeline(
    ('scale', StandardScaler()),
    ('ml_model', LogisticRegression())
)

# Esta función devuelve un objeto del tipo  SKLRegressorWrapper 
# que es compatible con la intergaz de  sklearn
model = convert_river_to_sklearn(model)

# Ahora ya podemos emplear cross_val_score de sklearn con el modelo de River 

scores = cross_val_score(model, X, y, scoring=scorer, cv=cv)

# Comparemos los resultados
print(f'ROC AUC: {scores.mean():.4f} (± {scores.std():.4f})')

Aunque las métricas son más bajas que en la prueba anterior, los resultados en este caso si son comparables a la aproximación tradicional.

## Pipelines

El "flujo" de información en el desarrollo de modelos de  aprendizaje automático  a menudo se puede encapsular como una secuencia de pasos o *pipeline*. Para realizarlo, muchas librerías como, por ejemplo, `scikit-learn` o `pandas` proporcionan objetos que permiten implementar este patrón "declarativo". En el caso de `River`,  existen un conjunto especial de módulos que permiten implementar estos *pipelines*.

En la práctica, muchos  desarrolladores no suelen utilizar *pipelines* para representar los flujos de trabajo debido a que  provienen del mundo del ML tradicional (orientado a "lotes"), donde los normal es desarrollar modelos a través de la programación procedimental pero, sin embargo, en el aprendizaje *online*, los *pipelines* representan una forma más natural de trabajar.

En el siguiente ejemplo, comparamos tanto el método procedimental como el declarativo (a través de *pipelines*) empleando un dataset típico en competiciones de Kaggle."

In [None]:
from rich import print
from river.datasets import Restaurants

data = Restaurants()

print(data)

In [None]:
Veamos la estructura de los datos:

In [None]:
print(next(iter(data)))

### Aproximación procedimental
Veamos primero la aproximación clásica:

In [None]:
from river import feature_extraction, linear_model, metrics, preprocessing, stats, utils

# Vamos a crear el promedio de los últimos 7, 14 y 21 días
features = (
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 7),target_name="last_7_mean"),
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 14),target_name="last_14_mean"),
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 21),target_name="last_21_mean")
)

scaler = preprocessing.StandardScaler()
model = linear_model.LinearRegression()
metric = metrics.MAE()

for x, y in data:

    # Ingeniería de variables
    x['weekday'] = x['date'].weekday() #weekday is a function of datetime.date. It returns a number linked to the week day (monday=0,..., sunday=6)
    x['is_weekend'] = x['date'].weekday() in (5, 6)
    
    # Procesa la media móvil del target  
    for mean_f in features:
    
        x = {**x, **mean_f.transform_one(x)}#** unzip un diccionario. De esta forma permite concatenar 2 diccionarios   
        mean_f.learn_one(x, y)
   
    # Eliminar la variables que no queremos emplear como características
    for key in ['store_id', 'date', 'genre_name', 'area_name', 'latitude', 'longitude']:
        x.pop(key)
      
    # Escalado de los datos
    # Este ejemplo tiene solo un propósito docente
    # En un ejemplo real no necesitamos escalar características como is_holiday o is_weekend
    scaler.learn_one(x)
    x=scaler.transform_one(x)

    # Predicción y Entrenamiento
    y_pred = model.predict_one(x)
    model.learn_one(x, y)

    # Actualización de las métricas
    metric.update(y, y_pred)

print(metric)

#Comprobemos la última observación
print(x)

### Aproximación declarativa empleando *pipelines* de River

Reescribamos ahora el código anterior empleando una aproximación declarativa a través de los *pipelines* de `River

In [None]:
from river import compose

# Función para generar nuevas variables
def get_date_features(x):
    weekday =  x['date'].weekday()
    return {'weekday': weekday, 'is_weekend': weekday in (5, 6)}

# Consutrcción del pipeline con los mismos pasos
model = compose.Pipeline(
    ('features', compose.TransformerUnion(
        ('date_features', compose.FuncTransformer(get_date_features)),
        ('last_7_mean', feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(),7),target_name="last_7_mean")),
        ('last_14_mean', feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(),14), target_name="last_14_mean")),
        ('last_21_mean', feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(),21), target_name="last_21_mean"))
    )),
    ('drop_non_features', compose.Discard('store_id', 'date', 'genre_name', 'area_name', 'latitude', 'longitude')),
    ('scale', preprocessing.StandardScaler()),
    ('lin_reg', linear_model.LinearRegression())
)

metric = metrics.MAE()

for x, y in data:

    # Genera la predicción
    y_pred = model.predict_one(x)

    # Actualiza el modelo una vez recuperada la salida real
    model.learn_one(x, y)

    # Actualiza las métricas con la predicción y la salida real
    metric.update(y, y_pred)

print(x)
print(metric)


Como podrás observas, todos los cálculos y transformaciones se han organizado en el objeto `TransformerUnion`, que nos permite agrupar diferentes operaciones de procesamiento en un solo "transformador" facilitando el desarrollo de un *pipeline* de aprendizaje automático. El objeto `TransformerUnion` incorpora  una lista de "transformadores" y los aplica de forma secuencial a los datos de entrada.

El bucle `for`que gestiona la secuencia predecir-aprender-actualizar es tna común que `River`proporciona una función que integra todas las operaciones de forma automática, tal y como se observa en el siguiente código:


In [None]:
from river import evaluate

model = compose.Pipeline(
    ('features', compose.TransformerUnion(
        ('date_features', compose.FuncTransformer(get_date_features)),
        ('last_7_mean', feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 7),target_name="last_7_mean" )),
        ('last_14_mean', feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 14),target_name="last_14_mean")),
        ('last_21_mean', feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 21),target_name="last_21_mean"))
    )),
    ('drop_non_features', compose.Discard('store_id', 'date', 'genre_name', 'area_name', 'latitude', 'longitude')),
    ('scale', preprocessing.StandardScaler()),
    ('lin_reg', linear_model.LinearRegression())
)

evaluate.progressive_val_score(dataset=data, model=model, metric=metrics.MAE()) #Reemplaza el bucle "for"

A pesar de que el código es correcto, algunas simplificaciones adicionales pueden ser integradas. Por ejemplo, al igual que sucede en `scikit-learn`, los nombres de los *steps* no son obligatorios (se asigna uno por defecto, si no se añade, basándose en el orden de la operación proporcionada). Veamos cómo quedaría esta simplificación:



In [None]:
model = compose.Pipeline(
    compose.TransformerUnion(
        compose.FuncTransformer(get_date_features),
        feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 7),target_name="last_7_mean"),
        feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 14),target_name="last_14_mean"),
        feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 21),target_name="last_21_mean")
    ),
    compose.Discard('store_id', 'date', 'genre_name', 'area_name', 'latitude', 'longitude'),
    preprocessing.StandardScaler(),
    linear_model.LinearRegression()
)

evaluate.progressive_val_score(dataset=data, model=model, metric=metrics.MAE())


La siguiente simplificación proviene de la posibilidad de declarar el *pipeline* empleando operaciones matemáticas. Primero, usa "+" para crear el objeto `TransformerUnion` y asignarle operaciones

In [None]:
model = compose.Pipeline(
    compose.FuncTransformer(get_date_features) + \
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 7),target_name="last_7_mean") + \
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 14),target_name="last_14_mean") + \
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 21),target_name="last_21_mean"),

    compose.Discard('store_id', 'date', 'genre_name', 'area_name', 'latitude', 'longitude'),
    preprocessing.StandardScaler(),
    linear_model.LinearRegression()
)

evaluate.progressive_val_score(dataset=data, model=model, metric=metrics.MAE())

A continuación, emplea el operador "|" (tubería) para unir *steps* dentro del *pipeline*. Al igual que en `bash`, este operador permite que la salida de un *step* sea la entrada del siguiente, lo que permite generar un flujo de operaciones.

In [None]:
model = (
    compose.FuncTransformer(get_date_features) +
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 7),target_name="last_7_mean") +
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 14),target_name="last_14_mean") +
    feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), 21),target_name="last_21_mean")
)

to_discard = ['store_id', 'date', 'genre_name', 'area_name', 'latitude', 'longitude']

model = model | compose.Discard(*to_discard)#* unzip el objeto y emplea los elementos individuales como argumentos de la función
model |= preprocessing.StandardScaler()
model |= linear_model.LinearRegression()

evaluate.progressive_val_score(dataset=data, model=model, metric=metrics.MAE())

Una simplificación final proviene del hecho de que `River` encapsula automáticamente las funciones en un objeto `FuncTransform` (no es necesario declararlo expresamente!), por lo que el ejemplo final podría ser algo como:

In [None]:
model = get_date_features

for n in [7, 14, 21]:
    model += feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), n),target_name="last_"+str(n)+"_mean")

model |= compose.Discard(*to_discard)#* unzip el objeto y emplea los elementos individuales como argumentos de la función
model |= preprocessing.StandardScaler()
model |= linear_model.LinearRegression()

evaluate.progressive_val_score(dataset=data, model=model, metric=metrics.MAE(), print_every=20_000)

Se ha incluido incluido un argumento adicional en `progressive_val_score` para imprimir cómo cambia la evaluación a lo largo del tiempo (`print_every=20_000`) . El uso de un enfoque procedimental o declarativo no afecta el rendimiento general, sino más bien la forma en que pensamos sobre el modelo y, obviamente, al código final resultante. De hecho, ambos modelos, ya sean procedimentales o declarativos, producirán los mismos resultados y rendimiento. Como punto final sobre los *pipelines*, cabe mencionar que podemos explorar gráficamente el pipeline consultanto el objeto `modelo`.

In [None]:
model

Por último, para depurar el comportamiento de los diferentes *steps*, se puede usar la función `debug_one`. Imagina que entrenamos el modelo con los primeros 120,000 ejemplos y queremos saber qué sucede con el siguiente. El siguiente fragmento de código muestra cómo usar la función `debug_one` en este caso:

In [None]:
import itertools

model = get_date_features

for n in [7, 14, 21]:
    model += feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), n),target_name="last_"+str(n)+"_mean")


model |= compose.Discard(*to_discard)#* unzip el objeto y emplea los elementos individuales como argumentos de la función
model |= preprocessing.StandardScaler()
model |= linear_model.LinearRegression()

for x, y in itertools.islice(data, 120_000):
    y_pred = model.predict_one(x)
    model.learn_one(x, y)

x, y = next(iter(data))
print(model.debug_one(x))

## Aprendizaje incremental con árboles de decisión

In [None]:
%%time

from river import evaluate, metrics, stats, utils, feature_extraction, preprocessing, compose
from river.tree import HoeffdingTreeRegressor
import itertools

data = Restaurants()
to_discard = ['store_id', 'date', 'genre_name', 'area_name', 'latitude', 'longitude']
def get_date_features(x):
    weekday =  x['date'].weekday()
    return {'weekday': weekday, 'is_weekend': weekday in (5, 6)}

model = get_date_features

for n in [7, 14, 21]:
    model += feature_extraction.TargetAgg(by='store_id', how=utils.Rolling(stats.Mean(), n),target_name="last_"+str(n)+"_mean")

model |= compose.Discard(*to_discard)
model |= preprocessing.StandardScaler()
model |= HoeffdingTreeRegressor(grace_period=250) #Number of instances a leaf should observe between split attempts

for x, y in data:
    model.learn_one(x, y)

In [None]:

Una vez desarrollado el modelo, podemos analizarlo desde diferentes puntos de vista:

In [None]:
#Muestra la estructura
model['HoeffdingTreeRegressor']

Esto nos da una idea de los parámetros internos del objeto pero también podemos solicitar un resumen:

In [None]:
#Resumen del árbol
model['HoeffdingTreeRegressor'].summary

O lo podemos transformar en un `DataFrame` de `Pandas`:

In [None]:
#Pandas data frame 
(model['HoeffdingTreeRegressor']).to_dataframe()

Este último comando podría ser interesante en un enfoque de clasificación debido a que nos proporciona la capacidad de analizar la decisión interna del modelo y verificar los umbrales y pesos asignados pero, sin embargo, en un modelo de regresión no es tan útil. Veamos un ejemplo en el contexto de la clasificación:

In [None]:
from river import evaluate, metrics, stats, utils, feature_extraction, preprocessing, compose
from river.tree import HoeffdingTreeClassifier
from river.datasets import Phishing #
import itertools

data = Phishing()

model = HoeffdingTreeClassifier(grace_period=50)

for x, y in data:
    model.learn_one(x, y)
    
    
#resumen del modelo
print(model.summary)

#convierte el modelo en un DataFrame
model.to_dataframe()

In [None]:
Otra opción interesante cara a analizar e interpretar el modelo es dibujarlo:


In [None]:
#Es necesario tener la librería graphviz instalada.
model.draw()

`River` tiene muchas implementaciones basadas en árboles y, lal y como se describe en su [web](https://riverml.xyz/latest/recipes/on-hoeffding-trees/#1-trees-trees-everywhere-gardening-101-with-river), estas podían organizarse según la siguiente tabla: 

| Name | Acronym | Task | Non-stationary data? | Comments | Source |
| :- | :-: | :- | :-: | :- | :-: |
| Hoeffding Tree Classifier | HTC | Classification | No | Basic HT for classification tasks | [[1]](https://dl.acm.org/doi/pdf/10.1145/347090.347107)|
| Hoeffding Adaptive Tree Classifier | HATC | Classification | Yes | Modifies HTC by adding an instance of ADWIN to each node to detect and react to drift detection | [[2]](https://link.springer.com/chapter/10.1007/978-3-642-03915-7_22)|
| Extremely Fast Decision Tree Classifier | EFDT | Classification | No | Deploys split decisions as soon as possible and periodically revisit decisions and redo them if necessary. Not as fast in practice as the name implies, but it tends to converge faster than HTC to the model generated by a batch DT | [[3]](https://dl.acm.org/doi/abs/10.1145/3219819.3220005)|
| Hoeffding Tree Regressor | HTR | Regression | No | Basic HT for regression tasks. It is an adaptation of the [FIRT/FIMT](https://link.springer.com/article/10.1007/s10618-010-0201-y) algorithm that bears some semblance to HTC | [[4]](https://link.springer.com/article/10.1007/s10618-010-0201-y)|
| Hoeffding Adaptive Tree Regressor | HATR | Regression | Yes | Modifies HTR by adding an instance of ADWIN to each node to detect and react to drift detection |-|
| incremental Structured-Output Prediction Tree Regressor| iSOUPT | Multi-target regression | No | Multi-target version of HTR | [[5]](https://link.springer.com/article/10.1007/s10844-017-0462-7)|
| Label Combination Hoeffding Tree Classifier | LCHTC | Multi-label classification | No | Creates a numerical code for each combination of the binary labels and uses HTC to learn from this encoded representation. At prediction time, decodes the modified representation to obtain the original label set |-| 


Como podéis ver, cada variante de árbol tiene un objetivo específico, aunque a veces puedan solaparse.