# **Temperature Streaming Prediction from Air Quality Data**🌡️📈

TODO: poner los nombres 


## Stream database 

We selected our stream database from the open repository [UCI Machine Learning Repository - Air Quality](https://archive.ics.uci.edu/dataset/360/air+quality). It consists of ~9.3k instances of hourly averaged responses from 5 metal oxide chemical sensors in a polluted area located in Italia from March 2004 to February 2005. Our database has null recordings (labeled with value -200). The **objective** of the project is modeling the temperature based on air quality sensors.

In [1]:
import pandas as pd 
df = pd.read_csv('air.csv')
df

Unnamed: 0,Date,Time,CO(GT),PT08.S1(CO),NMHC(GT),C6H6(GT),PT08.S2(NMHC),NOx(GT),PT08.S3(NOx),NO2(GT),PT08.S4(NO2),PT08.S5(O3),T,RH,AH
0,10/03/2004,18:00:00,2.6,1360.0,150.0,11.9,1046.0,166.0,1056.0,113.0,1692.0,1268.0,13.6,48.9,0.7578
1,10/03/2004,19:00:00,2.0,1292.0,112.0,9.4,955.0,103.0,1174.0,92.0,1559.0,972.0,13.3,47.7,0.7255
2,10/03/2004,20:00:00,2.2,1402.0,88.0,9.0,939.0,131.0,1140.0,114.0,1555.0,1074.0,11.9,54.0,0.7502
3,10/03/2004,21:00:00,2.2,1376.0,80.0,9.2,948.0,172.0,1092.0,122.0,1584.0,1203.0,11.0,60.0,0.7867
4,10/03/2004,22:00:00,1.6,1272.0,51.0,6.5,836.0,131.0,1205.0,116.0,1490.0,1110.0,11.2,59.6,0.7888
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9352,04/04/2005,10:00:00,3.1,1314.0,-200.0,13.5,1101.0,472.0,539.0,190.0,1374.0,1729.0,21.9,29.3,0.7568
9353,04/04/2005,11:00:00,2.4,1163.0,-200.0,11.4,1027.0,353.0,604.0,179.0,1264.0,1269.0,24.3,23.7,0.7119
9354,04/04/2005,12:00:00,2.4,1142.0,-200.0,12.4,1063.0,293.0,603.0,175.0,1241.0,1092.0,26.9,18.3,0.6406
9355,04/04/2005,13:00:00,2.1,1003.0,-200.0,9.5,961.0,235.0,702.0,156.0,1041.0,770.0,28.3,13.5,0.5139



The raw database needs a previous preprocessing step to remove non-relevant columns or adapting their format to make them suitable as input feature to a ML model.

- **Date**: The raw date should not be included as input feature, but to order our static dataset and create a data stream. We split the data into the **Season**, encoded with an integer from $0$ (winter) to $3$ (fall), and **Month** of the sample.
- **Time**: It uses the 24 hours format. We decided to normalize it using the sine and cosine functions, thus replacing the **Time** variable with two new variable: (i) **sin(time)** and (ii) **cos(time)**, defined as (i) $\sin(\pi\cdot\text{time}/12)$ and (ii)$\cos(\pi\cdot\text{time}/12)$.

In [2]:
import numpy as np 
df['Month'] = df.Date.map(lambda x: int(x.split('/')[1]))
df['Season'] = (df.Month % 12)//3
df['Time'] = df.Time.map(lambda x: x.split(':')[0]).astype(int)/12*np.pi
df['cos(t)'] = df.Time.map(np.cos)
df['sin(t)'] = df.Time.map(np.sin)
df = df.drop(['Date', 'Time'], axis=1)
df

Unnamed: 0,CO(GT),PT08.S1(CO),NMHC(GT),C6H6(GT),PT08.S2(NMHC),NOx(GT),PT08.S3(NOx),NO2(GT),PT08.S4(NO2),PT08.S5(O3),T,RH,AH,Month,Season,cos(t),sin(t)
0,2.6,1360.0,150.0,11.9,1046.0,166.0,1056.0,113.0,1692.0,1268.0,13.6,48.9,0.7578,3,1,-1.836970e-16,-1.000000e+00
1,2.0,1292.0,112.0,9.4,955.0,103.0,1174.0,92.0,1559.0,972.0,13.3,47.7,0.7255,3,1,2.588190e-01,-9.659258e-01
2,2.2,1402.0,88.0,9.0,939.0,131.0,1140.0,114.0,1555.0,1074.0,11.9,54.0,0.7502,3,1,5.000000e-01,-8.660254e-01
3,2.2,1376.0,80.0,9.2,948.0,172.0,1092.0,122.0,1584.0,1203.0,11.0,60.0,0.7867,3,1,7.071068e-01,-7.071068e-01
4,1.6,1272.0,51.0,6.5,836.0,131.0,1205.0,116.0,1490.0,1110.0,11.2,59.6,0.7888,3,1,8.660254e-01,-5.000000e-01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9352,3.1,1314.0,-200.0,13.5,1101.0,472.0,539.0,190.0,1374.0,1729.0,21.9,29.3,0.7568,4,1,-8.660254e-01,5.000000e-01
9353,2.4,1163.0,-200.0,11.4,1027.0,353.0,604.0,179.0,1264.0,1269.0,24.3,23.7,0.7119,4,1,-9.659258e-01,2.588190e-01
9354,2.4,1142.0,-200.0,12.4,1063.0,293.0,603.0,175.0,1241.0,1092.0,26.9,18.3,0.6406,4,1,-1.000000e+00,1.224647e-16
9355,2.1,1003.0,-200.0,9.5,961.0,235.0,702.0,156.0,1041.0,770.0,28.3,13.5,0.5139,4,1,-9.659258e-01,-2.588190e-01


The next cell shows the histogram of each input variable. Comments to be considered are:

1. The data is evenly distributed across time. Although we have more samples for March (the third month), no month is underrepresented in our database. 
2. For all continuous variables we might see a pattern that accumulates a considerable number of observations around an specific value (near to the mean of the distribution).


In [3]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

n_cols = 4
n_rows = int(np.ceil(len(df.columns)/n_cols))
fig = make_subplots(rows=n_rows, cols=n_cols, subplot_titles=df.columns)
for i, col in enumerate(df.columns):
    x = df[col].values 
    fig.add_trace(go.Histogram(x=x[x != -200], showlegend=False), row=i//n_cols+1, col=i%n_cols+1)
fig.update_layout(height=n_rows*200, width=n_cols*300, template='seaborn', 
                  title_text='Feature histograms without null values')
fig.show()

We know display the time series of all variables to detect some anomaly in the input features distributed across time. 

In [4]:
features = df.columns.drop(['Month', 'Season', 'cos(t)', 'sin(t)'])
n_rows = int(np.ceil(len(features)/n_cols))
fig = make_subplots(rows=n_rows, cols=n_cols, subplot_titles=features)
for i, col in enumerate(features):
    x = df[col].values 
    fig.add_trace(go.Scatter(y=x[x != -200], showlegend=False), row=i//n_cols+1, col=i%n_cols+1)
fig.update_layout(height=n_rows*200, width=n_cols*300, template='seaborn', 
                  title_text='Feature time series')

**Data Preprocessing**: To train our ML models, the continuous variables (**CO(GT)**, **PT08.S1(CO)**, **NMHC(GT)**, **C6H6(GT)**, **PT08.S2(NMHC)**, **NOx(GT)**, **PT08.S3(NOx)**, **NO2(GT)**, **PT08.S4(NO2)**, **PT08.S5(O3)**, **T**, **RH**, **AH**) will be standardized with zero-mean and, as our first approach, we will impute those null values (-200) with the mean of each feature, although more complex imputing methods will be later considered.

**Target variable**: The target variable is the temperature **T**. Since temperature is inherently continuous, our task is a classification problem where we want to estimate the temperature value for a given sample and the previous temperature obtained. As well as the other feature, we applied the zero-mean standardization to the temperature and a post-processing casting to the original values.

In [5]:
def sequential_imputation(x: np.ndarray, detect = (lambda x: x == -200)) -> np.ndarray:
    # get the first value that is not NAN
    x = np.array(x)
    last = x[~detect(x)][0]
    for i, item in enumerate(x):
        if detect(item):
            x[i] = last 
        last = x[i]
    return x

# df = df.replace(-200, np.nan).apply(sequential_imputation, axis=0)

In [6]:
features = df.columns.drop(['Month', 'Season', 'cos(t)', 'sin(t)'])
n_rows = int(np.ceil(len(features)/n_cols))
fig = make_subplots(rows=n_rows, cols=n_cols, subplot_titles=features)
for i, col in enumerate(features):
    x = df[col].values 
    fig.add_trace(go.Scatter(y=x[x != -200], showlegend=False), row=i//n_cols+1, col=i%n_cols+1)
fig.update_layout(height=n_rows*200, width=n_cols*300, template='seaborn', 
                  title_text='Feature time series')

## Concept drift 

Our database will be probably subject of concept drift. The target variable (temperature) considerably changes over the observation period (accordingly to the temperature seasons of the year), whereas some of the other variables (such as `C6H6(GT)` or `PT08.S5`) do not seem to experience these changes, although others do (such as `AH`). 

The next cells run two different concept drift detectors: [Kolmogorov-Smirnov Windowing](https://riverml.xyz/0.14.0/api/drift/KSWIN/) and [Page-Hinkley](https://riverml.xyz/0.14.0/api/drift/PageHinkley/) for the target variable `T`, and finally displays in the scatter plot the samples detected with high variation.

In [7]:
from river.drift import KSWIN, PageHinkley
from typing import Tuple

def standardize(x: np.ndarray) -> Tuple[np.ndarray, Tuple[float, float]]:
    mu, sigma = x.mean().item(), x.std().item()
    return (x-mu)/sigma, (mu, sigma)
    
target, tparams = standardize(df['T'].values)

# prepare figure
fig = go.Figure() 
fig.add_trace(go.Scatter(y=target, showlegend=False))

kswin = KSWIN(alpha=1e-5, seed=0)
ph = PageHinkley()

for i, sample in enumerate(target):
    kswin.update(sample)
    ph.update(sample)
    if kswin.drift_detected:
        fig.add_vline(x=i, line_color='green', opacity=0.5)
    if ph.drift_detected:
        fig.add_vline(x=i, line_color='red', opacity=0.5)
        
fig.update_layout(width=1600, height=600, title_text='Temperature')
fig.show()

## Batch Learning

In [8]:
import numpy as np 
from datetime import datetime  
from typing import Dict, Union
from river.compose import Pipeline, FuncTransformer, Discard, TargetTransformRegressor

def get_date_features(x: Dict[str, str]) -> Dict[str, float]:
    month = int(x['Date'].split("/")[1])
    season = ((month-1) % 12)//4

    time_format = "%H:%M:%S"
    datetime_object = datetime.strptime(x['Time'], time_format)

    hour_of_day = datetime_object.hour

    time_cos = np.cos(hour_of_day)
    time_sin = np.sin(hour_of_day)
    
    # cast to float 
    x.update({'month':month, 'season':season, 'time_cos': time_cos, 'time_sin':time_sin}) 
    return x
             

class SequentialImputer:
    def __init__(self, detect = np.isnan, target: bool = False):
        self.last = dict() if not target else 0
        self.detect = detect
        
    
    def __call__(self, x: Union[Dict[str, float], float]):
        if isinstance(x, dict):
            x = {feat: self.last[feat] if self.detect(value) else value for feat, value in x.items()}
        else:
            x = self.last if self.detect(x) else x
        self.last = x 
        return x
    
    @property
    def __name__(self):
        return 'SequentialImputer'
    

def cast_float(x: str):
    try: 
        return float(x)
    except ValueError:
        return x


In [9]:
batch_targets = df.pop("T")

len_train_set = int(len(batch_targets) * 0.75)

X_train, X_test = df[:len_train_set], df[len_train_set:]
y_train, y_test = batch_targets[:len_train_set], batch_targets[len_train_set:]

In [10]:
print(X_train.isna().sum(), X_test.isna().sum(), )

CO(GT)           0
PT08.S1(CO)      0
NMHC(GT)         0
C6H6(GT)         0
PT08.S2(NMHC)    0
NOx(GT)          0
PT08.S3(NOx)     0
NO2(GT)          0
PT08.S4(NO2)     0
PT08.S5(O3)      0
RH               0
AH               0
Month            0
Season           0
cos(t)           0
sin(t)           0
dtype: int64 CO(GT)           0
PT08.S1(CO)      0
NMHC(GT)         0
C6H6(GT)         0
PT08.S2(NMHC)    0
NOx(GT)          0
PT08.S3(NOx)     0
NO2(GT)          0
PT08.S4(NO2)     0
PT08.S5(O3)      0
RH               0
AH               0
Month            0
Season           0
cos(t)           0
sin(t)           0
dtype: int64


In [11]:
from sklearn.pipeline import Pipeline as sk_Pipeline
from sklearn.preprocessing import FunctionTransformer, StandardScaler as sk_StandardScaler
from sklearn.tree import DecisionTreeRegressor
from sklearn.base import TransformerMixin
from sklearn.compose import make_column_selector, make_column_transformer
from sklearn.metrics import mean_absolute_error

def impute_negative_values(dataset):
    data = dataset.copy(deep=True)
    for column in data.columns:
        mask = (data[column] == -200)
        data.loc[mask, column] = np.nan
        # data[column] = data[column].ffill(downcast)
        data[column].fillna(inplace=True, method = 'ffill')
        # data[column] = data[column].dropna()
        data.fillna(inplace=True, 0)
    return data

def create_pipeline(model):
    pipeline = sk_Pipeline([
        ('imputer', FunctionTransformer(impute_negative_values)),
        ('scaler', sk_StandardScaler()),
        ('model', model)
    ])
    return pipeline

In [12]:
def count_na(x):
    print(x.isna().sum())

count_na(impute_negative_values(X_test))

CO(GT)             94
PT08.S1(CO)         0
NMHC(GT)         2340
C6H6(GT)            0
PT08.S2(NMHC)       0
NOx(GT)            94
PT08.S3(NOx)        0
NO2(GT)            94
PT08.S4(NO2)        0
PT08.S5(O3)         0
RH                  0
AH                  0
Month               0
Season              0
cos(t)              0
sin(t)              0
dtype: int64



Series.fillna with 'method' is deprecated and will raise in a future version. Use obj.ffill() or obj.bfill() instead.



In [13]:
pd.__version__

'2.1.4'

In [14]:
X_test.describe()

Unnamed: 0,CO(GT),PT08.S1(CO),NMHC(GT),C6H6(GT),PT08.S2(NMHC),NOx(GT),PT08.S3(NOx),NO2(GT),PT08.S4(NO2),PT08.S5(O3),RH,AH,Month,Season,cos(t),sin(t)
count,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0,2340.0
mean,-14.523462,1026.667949,-200.0,-4.103333,800.875641,281.712821,716.593162,117.611111,1061.306838,970.514103,37.466795,-11.18154,2.471795,0.355128,-0.001993121,0.002597485
std,55.47993,366.893257,0.0,49.458598,351.474255,243.459001,325.239477,101.18449,408.778761,516.212874,61.568211,47.279618,2.130334,0.478654,0.7072551,0.7072531
min,-200.0,-200.0,-200.0,-200.0,-200.0,-200.0,-200.0,-200.0,-200.0,-200.0,-200.0,-200.0,1.0,0.0,-1.0,-1.0
25%,0.9,921.0,-200.0,2.8,642.0,142.75,559.0,97.0,908.0,636.0,37.3,0.419275,1.0,0.0,-0.7071068,-0.7071068
50%,1.6,1047.0,-200.0,5.7,798.0,243.0,727.0,135.0,1075.0,960.0,50.6,0.59565,2.0,0.0,-1.83697e-16,1.224647e-16
75%,2.7,1216.0,-200.0,11.0,1014.0,403.25,903.0,170.0,1293.25,1326.0,64.6,0.8051,3.0,1.0,0.7071068,0.7071068
max,8.7,1846.0,-200.0,43.0,1831.0,1230.0,1881.0,340.0,2147.0,2494.0,86.6,1.393,12.0,1.0,1.0,1.0


In [16]:
from sklearn.neighbors import KNeighborsRegressor
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestRegressor

models = [
    # ('knn', KNeighborsRegressor()),
    ('svr', SVR()),
    ('decision_tree', DecisionTreeRegressor()),
    ('random_forest', RandomForestRegressor()),
]

# Parámetros para búsqueda de hiperparámetros de cada modelo
param_grid = [
    # {'model__n_neighbors': [3, 5, 13]},
    {'model__C': [0.1, 1, 10], 'model__kernel': ['linear', 'rbf']},
    {'model__max_depth': [None, 10, 20]},
    {'model__n_estimators': [50, 100, 200]},
]

best_models = []
for (name, model), params in zip(models, param_grid):
    print(name, model, params)
    batch_pipeline = create_pipeline(model)
    grid_search = GridSearchCV(batch_pipeline, params, cv=5, scoring='neg_mean_absolute_error', verbose = 2, n_jobs=-1)
    grid_search.fit(X_train, y_train)
    best_models.append((name, grid_search.best_estimator_))


svr SVR() {'model__C': [0.1, 1, 10], 'model__kernel': ['linear', 'rbf']}
Fitting 5 folds for each of 6 candidates, totalling 30 fits


decision_tree DecisionTreeRegressor() {'model__max_depth': [None, 10, 20]}
Fitting 5 folds for each of 3 candidates, totalling 15 fits
random_forest RandomForestRegressor() {'model__n_estimators': [50, 100, 200]}
Fitting 5 folds for each of 3 candidates, totalling 15 fits


In [17]:
for (name, best_model) in best_models:
    print(f"Modelo {name}:")
    best_model.fit(X_train, y_train)
    y_pred = best_model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    print(f"Error absoluto medio (MAE): {mae}")

Modelo svr:
Error absoluto medio (MAE): 6.7432572863974904
Modelo decision_tree:
Error absoluto medio (MAE): 2.4549999999999996
Modelo random_forest:
Error absoluto medio (MAE): 1.5849111111111114


In [None]:
param_grid = {
    "randomforestregressor__n_estimators": [50, 100],
    "randomforestregressor__max_depth": [None, 10],
    "randomforestregressor__min_samples_split": [2, 5],
    "randomforestregressor__min_samples_leaf": [1, 2]
}

grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='neg_mean_absolute_error')
grid_search.fit(X_train, y_train)

best_pipeline = grid_search.best_estimator_
best_params = grid_search.best_params_

y_pred = best_pipeline.predict(X_test)

print(f"MAE is {mean_absolute_error(y_test, y_pred)}")
print("Best model:", best_pipeline)
print("Best hyperparameters:", best_params)

## Stream Learning 

In [44]:
import pandas as pd
from river.preprocessing import StandardScaler, TargetStandardScaler
from river.metrics import RMSE, MAE
from river.forest import OXTRegressor
from river.evaluate import progressive_val_score
from river.stream import iter_csv 
from copy import deepcopy

detect = lambda x: x == -200
cols = pd.read_csv('air.csv').columns
data = lambda: iter_csv('air.csv', target='T', converters={col: cast_float for col in cols})
preprocess = Pipeline(
    FuncTransformer(get_date_features),
    Discard('Time', 'Date'),
    FuncTransformer(SequentialImputer(detect)),
    StandardScaler()
)

pre2= deepcopy(preprocess)
imp = SequentialImputer(detect, target=True)
imp2 = SequentialImputer(detect, target=True)

model = preprocess | TargetTransformRegressor(
    OXTRegressor(n_models=50, max_features=None), func=imp, inverse_func=imp
)

model2 = pre2 | TargetTransformRegressor(
    OXTRegressor(n_models=100, max_features=None), func=imp2, inverse_func=imp2
)
# progressive_val_score(dataset=data, model=model, metric=MAE())

In [45]:
from typing import List, Optional, Callable
from sklearn.metrics import mean_squared_error

def plot_models(
        data, 
        models: Dict[str, Pipeline], 
        ft: Callable = lambda x: x,
        error: bool = False
    ):
    preds = {name: [] for name in models.keys()}
    target = []
    for x, y in data:
        for name, model in models.items():
            preds[name].append(model.predict_one(x))
            model.learn_one(x, y)
        target.append(y)
        
    fig = go.Figure() 
    target = ft(target)
    if not error:
        fig.add_trace(go.Scatter(y=target, name='Temperature'))
    for name in models.keys():
        fig.add_trace(go.Scatter(y=preds[name] if not error else abs(target-error), name=name))
    fig.update_layout(width=1000, height=600, title_text='Temperature', template='seaborn')
    return fig 
    
plot_models(data(), models={'OXTRegressor50': model, 'second': model2}, ft=sequential_imputation)

## Model Selection

In [25]:
# from river.model_selection import SuccessiveHalvingRegressor
# from river import linear_model
# from river import utils, optim 
# from river import preprocessing
# model = (
#     preprocessing.StandardScaler() |
#     linear_model.LinearRegression(intercept_lr=.1)
# )

# models = utils.expand_param_grid(model, {
#     'LinearRegression': {
#         'optimizer': [
#             (optim.SGD, {'lr': [.1, .01, .005]}),
#             (optim.Adam, {'beta_1': [.01, .001], 'lr': [.1, .01, .001]}),
#             (optim.Adam, {'beta_1': [.1], 'lr': [.001]}),
#         ]
#     }
# })
# models