In [None]:
# general imports
import os
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt 
from matplotlib.colors import ListedColormap
import seaborn as sns

# from our documents
import preprocessing_functions as pf

# from Scikit Learn library
from sklearn.preprocessing import OneHotEncoder, RobustScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import random

#seed for random processes
seed = 42
np.random.seed(seed)
random.seed(seed)


The following pipeline aims to automatize the preprocessing of the stream data from time series.

We will assume that we already have part of the dataset, that will be used as historical data to fill missing values or study seasonalities and stationarities.

This will particular show what to do whenever new data arrive to the system.

In [None]:
file_path = 'smart_app_data.pkl'
df = pd.read_pickle(file_path)

In [None]:
# Split the data into historical and future data


Training set size: (213, 8)
Test set size: (92, 8)


In [None]:
# Divide data by machine and kpis (time series)

In [None]:
# Dictionary about specific preprocessing information for kpis
# So the kwargs that will be used in the pipelines are stored there
# The idea is to implement it somewhere else and importing it

# Preprocessing pipeline

The pipeline will receive as an input the new incoming data for a specific machine and kpi. Also, to perform the preprocessing it need also to receive the batch of a fixed amount of past data and the information about how specifically handle that kpi for that machine (given by kwargs). 

In [None]:
def preprocessing_pipeline(batch, new_input, kwargs):

    ####### Preprocessing of the data

    # Resampling (if needed)
    if kwargs.get('resample', False):
        dataframe = pf.resample_data(dataframe, kwargs) #not implemented yet

    # Smoothing (if needed, based on kwargs)
    if kwargs.get('smooth', False):
        dataframe = pf.smooth_data(dataframe, kwargs) #not implemented yet

    ### DATA CLEANING

    ## Data type standardization

    ## Check for inconsistencies
    
    ## Fill missing values

    ### FEATURE ENGINEERING
    dataframe = pd.concat([batch, new_input]).sort_values(by='timestamp') 

    ## Check stationarity
    is_stationary = pf.adf_test(dataframe.dropna()) #False if not stationary, True if it is, None if test couldn't be applied
    
    ## Check seasonality 
    period_of_observation = kwargs.get('period_of_observation', None)
    if period_of_observation is None:
        raise ValueError("Period of observation must be provided in kwargs.")
    
    trend, seasonalilty, residual = pf.seasonal_additive_decomposition(dataframe, period_of_observation) 

    #Make data stationary / Detrend / Deseasonalize (if needed)
    
    make_stationary = kwargs.get('need_stationarity', False)  # Set default to False if not provided
    detrend = kwargs.get('detrend', False) 
    deseasonalize = kwargs.get('deseasonalize', False) 
    
    if make_stationary and not is_stationary:
        dataframe = pf.make_stationary(dataframe, kwargs)

    if detrend and deseasonalize:
        dataframe = pf.rest_trend(dataframe, kwargs)
        dataframe = pf.rest_seasonality(dataframe, kwargs)
    elif detrend:
        dataframe = pf.rest_trend(dataframe, kwargs)
    elif deseasonalize:
        dataframe = pf.rest_seasonality(dataframe, kwargs)

    # Normalize data and apply encoding (if needed)
    categorical_columns = dataframe.select_dtypes(include=['object']).columns

    # Determine if encoder or scaler should be applied based on kwargs
    encoder = kwargs.get('encoder', False)  # Set default to False if not provided
    scaler = kwargs.get('scaler', True)  # Set default to True if not provided
    
    # Apply transformations based on encoder and scaler flags
    if not encoder:  # Only scaler
        transformers = [('num', RobustScaler(), ~dataframe.columns.isin(categorical_columns))]
    elif not scaler:  # Only encoder
        transformers = [('cat', OneHotEncoder(handle_unknown='ignore'), categorical_columns)]
    else:  # Both encoder and scaler
        transformers = [
            ('num', RobustScaler(), ~dataframe.columns.isin(categorical_columns)),
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_columns),
        ]
    
    preprocessor = ColumnTransformer(transformers=transformers)

    # Fit and transform the data
    dataframe_transformed = preprocessor.fit_transform(dataframe)

    return dataframe_transformed

    

In [None]:
def ML_pipeline(time_series, kwargs):
    
    ## Definition of metrics for goodness of model 

    ## Drift detection algorithms

    ### ML ALGORTIHMS (this should be divided on training phase and prediction phase)

    ## Check Outliers 

    ## Feature selection

    ## Parameters setting

    ## Data forecasting (if selected)

    ## Anomalies detection (if selected)

    ## Models comparison (if needed)
        
    return result_of_ML #forecasting prediction or detected anomaly
    