# Experimentation cycle

In [7]:
import pandas as pd
from sklearn.datasets import load_diabetes
from sklearn.linear_model import Ridge
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error,mean_absolute_error


### Load

In [8]:
diabetes = load_diabetes(as_frame=True)
X = diabetes.data
y = diabetes.target

X_train = X.iloc[:300]
X_test = X.iloc[300:]

y_train = y.iloc[:300]
y_test = y.iloc[300:]


### Preprocess

In [9]:
def preprocess_data(df_input,scaler):
    df = df_input.copy()
    df.loc[(df_input['sex'] == -0.044642),['sex']] = 1
    df.loc[df_input['sex'] != -0.044642,['sex']] = 0
    df = scaler.transform(df)    
    return df

scaler = MinMaxScaler()
scaler.fit(X_train)

X_train_features = preprocess_data(X_train,scaler=scaler)
X_test_features = preprocess_data(X_test,scaler=scaler)

### Train

In [10]:
model = Ridge()
model.fit(X_train_features,y_train)

Ridge()

### Predict

In [11]:
y_pred = model.predict(X_test_features)

### Evaluate

In [12]:
mean_squared_error(y_pred=y_pred,y_true=y_test,squared=False)

53.410131367814344

# Putting code into production format

## Defining Contract Classes

In [13]:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, OrderedDict,Union
from cd4ml.task import Task
from cd4ml.workflow import Workflow
import pandas as pd

class DataProcessor(ABC):
    '''
    Abstract class representing data processing steps that go into production.
    '''
    @abstractmethod
    def load_data(self) -> Any:
        '''Method to load the data into memory from a given path. Data should be stored in this class using the data property.'''
        pass

    @abstractmethod
    def preprocess(self,raw_data:Any)-> Any:
        '''Method to preprocess the data. Data should be stored in this class using the data property.'''
        pass

    @property
    def raw_data(self):
        return self._raw_data

    @raw_data.setter
    def raw_data(self,raw_data):
        self._raw_data = raw_data

    @property
    def processed_data(self):
        return self._processed_data

    @processed_data.setter
    def processed_data(self,processed_data):
        self._processed_data = processed_data

class FeatureGenerator(ABC):
    '''
    Abstract class representing feature generation process that goes into production.
    '''
    @abstractmethod
    def get_features(self, data:pd.DataFrame)-> pd.DataFrame:
        '''Method to generate feature data. Data should be stored in this class using the data property.'''
        pass

    @abstractmethod
    def get_target(self, data:pd.DataFrame):
        pass

    @property
    def features(self):
        return self._features
    
    @features.setter
    def features(self,features):
        self._features = features
        
    @property
    def target(self):
        return self._target
    
    @target.setter
    def target(self,target):
        self._target = target

@dataclass
class Artifact:
    name:str
    object:Any
    params:dict
    path:str = ''

class Model(ABC):
    
    def __init__(self, artifacts:Union[Artifact,list[Artifact]]) -> None:
        '''
        Base class representing the model contract that should be used to put a machine learning model into production
        Any complex object that is used during training or prediction should be stored in the artifacts property.
        .
        '''
        self.artifacts = artifacts
        self._check_artifacts()

    @abstractmethod
    def fit(self,X,y):
        '''
        Method to train the machine learning model. 
        Any complex object that is used during training should be stored as an Artifact class.
        ''' 
        pass
    
    @abstractmethod
    def predict(self,X):
        '''
        Method to make predictions with the machine learning model.
        Any complex object that is used during prediction should be retrieved from the artifacts property.
        '''
        pass    
    
    def _check_artifacts(self):
        if self.artifacts is None or self.artifacts == []:
            raise ValueError("A proper artifacts should be set in order to use the model.")

    @property
    def artifacts(self)-> list[Artifact]:
        '''
        This property contains all object artifacts (Transformers,Estimators, etc) used during fit and that will be used during predict.
        This is to ensure the proper logging into the experiment tracking tool.
        '''
        return self._artifacts

    @artifacts.setter
    def artifacts(self,artifacts:Union[Artifact,list[Artifact]]):
        if isinstance(artifacts,list) and isinstance(artifacts[0],Artifact):
            self._artifacts = artifacts
        elif isinstance(artifacts,Artifact):
            self._artifacts = [artifacts]
        else:
            raise TypeError("artifacts object should be a list of Artifact classes or a single Artifact.")

    @artifacts.getter
    def artifacts_objects(self):
        return [x.object for x in self._artifacts]

    @artifacts.getter
    def artifacts_params(self):
        return [x.params for x in self._artifacts]

class ArtifactsHandler(ABC):
    
    @abstractmethod
    def save(artifacts:list[Artifact],path:str):
        pass
    
    @abstractmethod
    def load(parameters:dict,path:str)->list[Artifact]:
        pass

class ModelEvaluator:

    def __init__(self,model:Model):
        self.model = model

    def evaluate(self,X,y,metrics:Union[Callable,list[Callable]]):

        y_pred = self.model.predict(X) #Feature Improvement: some metrics might not work with prediction, but with prediction_proba.

        metric_values = {}

        for metric in metrics:
            metric_name = metric.__name__
            metric_values[metric_name] = metric(y_pred,y) 
        self.metrics = metric_values
        
        return self.metrics
        
    @property
    def metrics(self):
        return self._metrics

    @metrics.setter
    def metrics(self,metrics:dict):
        self._metrics = metrics

# Transforming experimentation into production code

## Implementing concrete classes

In [33]:
from joblib import dump, load

class DiabetesDataProcessor(DataProcessor):
    
    def __init__(self,loader) -> None:
        self._loader = loader
    
    def load_data(self) -> pd.DataFrame:
        diabetes = self._loader(as_frame=True)
        df = diabetes.data
        df['target'] = diabetes.target
        self.raw_data = df
        return df

    def preprocess(self,raw_data:pd.DataFrame) -> pd.DataFrame:
        df = self._encode_sex_feature(raw_data)
        self.processed_data = df
        return df

    def _encode_sex_feature(self,df:pd.DataFrame):
        df_encoded = df.copy()
        df_encoded.loc[(df['sex'] == -0.044642),['sex']] = 1
        df_encoded.loc[df['sex'] != -0.044642,['sex']] = 0
        return df_encoded

class DiabetesFeatureGenerator(FeatureGenerator):
    
    def get_features(self, data: pd.DataFrame) -> pd.DataFrame:
        if 'target' in data.columns:
            return data.drop(columns=['target'])
        else:
            return data
        
    def get_target(self, data: pd.DataFrame) -> pd.DataFrame:
        try:
            return data['target']
        except Exception as e:
            print(e)

class DiabetesModel(Model):
    
    def __init__(self, model, model_params, scaler, scaler_params) -> None:
        self.model = model(**model_params)
        self.scaler = scaler(**scaler_params)
        super().__init__(artifacts = [
                                Artifact(name='model',object = self.model,params = model_params),
                                Artifact(name='scaler',object = self.scaler,params = scaler_params)])

    def fit(self, X:pd.DataFrame, y:pd.DataFrame):
        estimator,scaler = self.artifacts_objects
        scaler.fit(X)
        X_norm = scaler.transform(X)
        estimator.fit(X_norm,y)
    
    def predict(self, X):
        estimator,scaler = self.artifacts_objects
        X_norm = scaler.transform(X)
        return estimator.predict(X_norm)

class DiabetesArtifactHandler(ArtifactsHandler):
    
    def __init__(self,path) -> None:
        self.path = path

    def save(self,artifacts:list[Artifact]):
        estimator, scaler = artifacts
        dump(estimator.object, self.path+f'/{estimator.name}.joblib') 
        dump(scaler.object, self.path+f'/{scaler.name}.joblib')

    def load(self,parameters:dict) -> list[Artifact]:
        artifacts = []
        
        model_params = parameters['model_params']
        estimator = load(self.path+'/model.joblib') 
        artifacts.append(Artifact(name='model',object=estimator,params=model_params))
        
        scaler_params = parameters['scaler_params']
        scaler = load(self.path+'/scaler.joblib') 
        artifacts.append(Artifact(name='scaler',object=scaler,params=scaler_params))

        return artifacts

## Setting parameters and loading dependencies

All the necessary parameters and lib dependencies should be structured in a parameters.py file that will be passed to the ModelBuilding and ModelServing pipelines

In [49]:
from sklearn.datasets import load_diabetes
from sklearn.linear_model import Ridge
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error,mean_absolute_error
from sklearn.model_selection import train_test_split

def rmse(y_true,y_pred):
    return mean_squared_error(y_true,y_pred,squared=False)

data_processor_params = {
    'loader':load_diabetes
}

feature_generator_params = {
}

model_params = {
    'model':Ridge,
    'model_params':{'fit_intercept':True,'solver':'lsqr','alpha':0.5},
    'scaler':MinMaxScaler,
    'scaler_params':{'feature_range':[0,1]}
}

evaluator_params = {
    'metrics':[rmse,mean_squared_error,mean_absolute_error]
}

artifacts_handler_parameters = {
    'model_params':model_params['model_params'],
    'scaler_params':model_params['scaler_params']
}

## Training Pipeline

In [50]:
data_processor = DiabetesDataProcessor(**data_processor_params)
raw_data = data_processor.load_data()
processed_data = data_processor.preprocess(raw_data)

feature_generator = DiabetesFeatureGenerator(**feature_generator_params)
X = feature_generator.get_features(processed_data)
y = feature_generator.get_target(processed_data)

# In a more realistic scenario, training and test sets will be chosen externally to this pipeline.
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.8,random_state=42)

model = DiabetesModel(**model_params)
model.fit(X_train,y_train)

In [51]:
evaluator = ModelEvaluator(model=model)
evaluator_params['X'] = X_train
evaluator_params['y'] = y_train
results = evaluator.evaluate(**evaluator_params)
print("Train results:",results)

evaluator_params['X'] = X_test
evaluator_params['y'] = y_test
results = evaluator.evaluate(**evaluator_params)
print("\nTest results:",results)

Train results: {'rmse': 54.96516990330306, 'mean_squared_error': 3021.1699024989725, 'mean_absolute_error': 43.681485821721225}

Test results: {'rmse': 58.9462209485551, 'mean_squared_error': 3474.6569641158762, 'mean_absolute_error': 48.04694278124167}


## Saving Artifacts (This step is orchestrated by ModelBuilding steps)

In [43]:
handler = DiabetesArtifactHandler(path = ".")
handler.save(artifacts=model.artifacts)

## Loading Artifacts of a Trained Model

In [44]:
del model,handler

handler = DiabetesArtifactHandler(path = ".")
artifacts = handler.load(parameters=artifacts_handler_parameters)
model = DiabetesModel(**model_params)
model.artifacts = artifacts

## Testing Saved Model

In [45]:
del data_processor,feature_generator

data_processor = DiabetesDataProcessor(**data_processor_params)
raw_data = data_processor.load_data()
processed_data = data_processor.preprocess(raw_data)
del raw_data

feature_generator = DiabetesFeatureGenerator(**feature_generator_params)
X = feature_generator.get_features(processed_data)
y = feature_generator.get_target(processed_data)
del processed_data


In [46]:
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.8,random_state=42)
evaluator = ModelEvaluator(model=model)
evaluator_params['X'] = X_train
evaluator_params['y'] = y_train
results = evaluator.evaluate(**evaluator_params)
print("Train results:",results)

evaluator_params['X'] = X_test
evaluator_params['y'] = y_test
results = evaluator.evaluate(**evaluator_params)
print("\nTest results:",results)

Train results: {'rmse': 54.96516990330306, 'mean_squared_error': 3021.1699024989725, 'mean_absolute_error': 43.681485821721225}

Test results: {'rmse': 58.9462209485551, 'mean_squared_error': 3474.6569641158762, 'mean_absolute_error': 48.04694278124167}


## Inference Pipeline with loaded Modeland Artifacts

In [47]:
data_processor = DiabetesDataProcessor(**data_processor_params)
raw_data = data_processor.load_data()
processed_data = data_processor.preprocess(raw_data)

feature_generator = DiabetesFeatureGenerator(**feature_generator_params)
X = feature_generator.get_features(processed_data)

y_pred = model.predict(X)
y_pred[:10]


array([191.2987309 ,  80.83555847, 160.00279695, 130.20762935,
       123.27026075, 105.44001573,  94.55850702, 157.08911378,
       149.93666538, 170.82674973])

## Train - Using Workflow and Tasks paradigms

In [23]:
# Inputs do Matheus:
# Requisito: MLFlow expects the folowing: model (sklearn models for now), metrics (dict), parameters(dict), tags (dict) optional.
# Resolucão do problema do sklearn: I can inherit the Model class from the sklearn BaseEstimator.
# Sugestão: Params is an yaml.
# Sugestão: herdar a classe workflow e criar um MachineLearningWorkflow para treinar ou para orquestrar o pipeline do experimento (feature generation + model training)

# Comentarios Lucas:
# Penso que esse pipeline pode ser interessante montar com o workflow do Edu.

In [24]:
# from cd4ml.task import Task
# from cd4ml.workflow import Workflow

In [25]:
# def dummy_function():#TODO: How to make this callable task run a method of a class?
#     pass

# load_data = Task(name='load_data', task=dummy_function)
# preprocess_data = Task(name='preprocess_data', task=dummy_function)
# generate_features = Task(name='generate_features', task=dummy_function)
# generate_target = Task(name='generate_target', task=dummy_function)
# train_test_split = Task(name='train_test_split', task=dummy_function)
# train_model = Task(name='train_model', task=dummy_function)
# evaluate_model_on_train = Task(name='evaluate_model_on_train', task=dummy_function)
# evaluate_model_on_test = Task(name='evaluate_model_on_test', task=dummy_function)
# # last step: save model

# w = Workflow()
# w.add_task(load_data)
# w.add_task(preprocess_data,dependency=['load_data'])
# w.add_task(generate_features,dependency=['preprocess_data'])
# w.add_task(generate_target,dependency=['preprocess_data'])
# w.add_task(train_test_split,dependency=['generate_features','generate_target']) #Question: There is a dependency but no input parameters is passed. How does it work?
# w.add_task(train_model,dependency=['train_test_split']) #Similar case here, but only a portion of inputs parameters is passed. X_train and y_train.
# w.add_task(evaluate_model_on_train,dependency=['train_model'])
# w.add_task(evaluate_model_on_test,dependency=['train_model'])


In [26]:
# run_config = {
#     # "download_folha": {
#     #     'params': {'url': "https://feeds.folha.uol.com.br/emcimadahora/rss091.xml"},
#     #     'output': 'download_folha'
#     # },
#     # "download_g1": {
#     #     'params': {'url': "https://g1.globo.com/rss/g1/"},
#     #     'output': 'download_g1'
#     # },
#     # "download_g1_brasil": {
#     #     'params': {'url': "https://g1.globo.com/rss/g1/brasil"},
#     #     'output': 'download_g1_brasil'
#     # },
#     # "feeds_aggregate": {
#     #     'params': None,
#     #     'output': 'feeds_aggregate'
#     # },
#     # "preprocess": {
#     #     'params': None,
#     #     'output': 'preprocess'
#     # }
# }

# output = w.run(run_config=run_config, executor='local')