In [1]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
import kfp.compiler as cpl
from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
from typing import Dict # Biblioteca nativa do python
import os 

In [2]:
# Parâmetros Opcionais
BASE_IMAGE = 'tensorflow/tensorflow:1.11.0-py3'

# O Dataset já vem pré-processado e normalizado
DATASET = 'https://raw.githubusercontent.com/ambientelivre/samples_kubeflow/main/datasets/bolsa.csv'

PIPELINE_NAME = 'Pipeline Previsao Dolar'
PIPELINE_DESCRIPTION = 'Dados Bolsa X Dolar'

PIPELINES_PATH = 'pipelines'
OUTPUT_PIPELINE_PATH = os.path.join(PIPELINES_PATH,'pipe_bolsa.yaml')

if not os.path.exists(PIPELINES_PATH):
    os.makedirs(PIPELINES_PATH)

EXPERIMENT_NAME = 'TESTES-AMBIENTE-BOLSA-X-DOLAR'
RUN_NAME = 'PIPELINE-RUN-BOLSA-X-DOLAR'

# MODEL_NAME = 'bolsa_pipeline_model_' + str(int(time.time())) 
# OUTPUT_MODEL_PATH = 'models/'

# MODEL_VERSION = 'bolsa_pipeline_model_v1' + str(int(time.time())) 

In [3]:
def treina(feat:str, label:str, dataset:str, output_model_path:OutputPath(str)) :
    """
    A função retorna o caminho até o binário do arquivo que contém o modelo treinado.
    Isso fica implícito quando declaramos OutputPath(str).
    """

    import pickle
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    # Recebe o dataset e treina o algoritmo
    df = pd.read_csv(dataset)
    reglin = LinearRegression()
    reglin.fit(df[[feat]], df[label])
    
    # Registra o binário do modelo treinado no caminho até o arquivo "output_model_path" no minIO
    with open(output_model_path, 'wb') as f:
        pickle.dump(reglin, f)
    

In [4]:
comp_treina = comp.create_component_from_func(treina, output_component_file='treina_component.yaml',
                                              base_image=BASE_IMAGE)

In [5]:
def valida(feat:str, label:str, dataset:str, model_path:InputPath()) -> Dict[str,float]:
    
    import pickle
    import pandas as pd
    from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
    
    # Importando o dataset
    df = pd.read_csv(dataset)
    y_true = df[label]
    
    # Importando o modelo serializado como um objeto python
    with open(model_path, 'rb') as f:
        trained_model = pickle.load(f)
    
    y_pred = trained_model.predict(df[[feat]])
    
    # Calcula as métricas de desempenho do modelo para o problema de regressão
    r2 = r2_score(y_true, y_pred)
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    
    valida_metrics = {
        'r2' : r2,
        'mse' : mse,
        'mae' : mae
    }
    
    # Retorna um dicionário com o cálculo das métricas

    return valida_metrics

In [6]:
comp_valida = comp.create_component_from_func(valida, output_component_file='valida_component.yaml'
                                              ,base_image=BASE_IMAGE)

In [7]:
def previsao(feat:str, label:str, model_path:InputPath(), valor:float) -> float:

    import pandas as pd
    import pickle
    
    future = pd.DataFrame({feat:[valor]})
    with open(model_path, 'rb') as f:
        trained_model = pickle.load(f)

    # Retorna a previsão do valor futuro do dólar quando a bolsa atinge ${valor} pontos
        
    return trained_model.predict(future)[0]
    

In [8]:
comp_previsao = comp.create_component_from_func(previsao, output_component_file='previsao_component.yaml'
                                                ,base_image=BASE_IMAGE)

In [9]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description=PIPELINE_DESCRIPTION
)
def pipe(feat, label, valor, dataset):

    # Treinamento do algoritmo
    task_treina = comp_treina(feat, label, dataset)

    # Validação do modelo
    task_valida = comp_valida(feat, label, dataset, task_treina.output)

    # Previsão final
    task_previsao = comp_previsao(feat, label, task_treina.output, valor)

In [10]:
# Instanciando um objeto-cliente para integração com o Kubeflow
client = kfp.Client(host='http://localhost:8080/pipeline')

In [11]:
args = {
'feat': 'bolsa',
'label': 'usd',
'valor': '110',
'dataset': DATASET
}

print(client.list_experiments())

{'experiments': [{'created_at': datetime.datetime(2023, 3, 19, 22, 38, 16, tzinfo=tzutc()),
                  'description': 'All runs created without specifying an '
                                 'experiment will be grouped here.',
                  'id': '46e29c0a-18ee-4f8b-bee2-c413e627b904',
                  'name': 'Default',
                  'resource_references': None,
                  'storage_state': 'STORAGESTATE_AVAILABLE'},
                 {'created_at': datetime.datetime(2023, 3, 30, 20, 10, 8, tzinfo=tzutc()),
                  'description': None,
                  'id': '5ebd0a7a-c96b-4213-b67f-015ebe36395d',
                  'name': 'TESTES-AMBIENTE-BOLSA-X-DOLAR',
                  'resource_references': None,
                  'storage_state': 'STORAGESTATE_AVAILABLE'},
                 {'created_at': datetime.datetime(2023, 3, 31, 11, 32, 51, tzinfo=tzutc()),
                  'description': 'aaa',
                  'id': 'cefe222e-36de-4096-acab-ff6e7dc1ad1

In [12]:
# A execução do pipeline pode ser feita através do método create_run_from_pipeline_func()
# que carrega as componentes "comp_*" já instanciadas no código e também o pipeline
client.create_run_from_pipeline_func(pipe, arguments=args, experiment_name=EXPERIMENT_NAME, 
                                     run_name=RUN_NAME) 

RunPipelineResult(run_id=e988402e-d1a3-4f9e-ba4e-b31d359d3ebb)

In [13]:
# Outra forma de executarmos o pipeline é compilando ele gerando um .yaml 
# Desde que 
# que pode ser usado para criar e executar um pipeline
cpl.Compiler().compile(
    pipeline_func=pipe,
    package_path=OUTPUT_PIPELINE_PATH
)    

In [14]:
client.create_run_from_pipeline_package(
    pipeline_file=OUTPUT_PIPELINE_PATH,
    arguments=args
)

RunPipelineResult(run_id=b4b5c2c7-944a-405a-864e-70d2403f6811)