In [1]:
PROJECT_ID = "ml-demos-garrido"
REGION = "us-central1"
PIPELINE_ROOT = "gs://garrido-dsml/pipelines"

In [2]:
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component)

from kfp.v2 import compiler
from datetime import datetime
from google.cloud import aiplatform

import pickle
import pandas as pd
import numpy as np

In [None]:
from sklearn.datasets import load_iris
import sklearn
from sklearn.model_selection import train_test_split

# Carregando o Iris Datasets 
iris = load_iris()

# Construindo Dataframe
df = pd.DataFrame(data= np.c_[iris['data'], iris['target']],
                 columns= iris['feature_names'] + ['target'])

df['species'] = pd.Categorical.from_codes(iris.target, iris.target_names)
columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species', 'targets']

# Adicionando timestamp
df.columns = columns

In [None]:
df.head()

## 👷🏽‍♀️ Realizando a ingestão do dataset para o BigQuery

A partir do Iris dataset em memória (i.e., o nosso `Pandas Dataframe`), podemos gerar um arquivo `Parquet` de forma bem simples para persistir os dados de forma binária no disco. A partir disso, será possível realizar um processo de ingestão em batch para o `BigQuery`, um processo *sem custo* que disponibilizará os nossos dados em uma estrutura serverless, de armazenamento colunar, com baixo custo e orientada à recuperação e processamento massivo de dados. 

In [None]:
%%bash
# Vamos começar criando uma camada lógica de dados no BigQuery, o nosso dataset
{
    bq --location=us-central1 mk -d iris
} || { # catch
    echo "Dataset já existente"
}

In [None]:
%%bigquery 
# Em seguida, vamos garantir que a nossa tabela será recriada do zero
DROP TABLE IF EXISTS iris.iris

In [None]:
%%bigquery
# Vamos definir a nossa tabela, utilizando o statement DDL do BigQuery
CREATE OR REPLACE TABLE iris.iris(
    petal_length FLOAT64,
    sepal_length FLOAT64,
    petal_width FLOAT64,
    sepal_width FLOAT64,
    species FLOAT64,
    targets STRING
)

In [None]:
# Extraindo os dados em Parquet para realizar ingestão
df.to_parquet('features.parquet.gzip',
              compression='gzip',
             index=False)

In [None]:
%%bash
bq load \
    --source_format=PARQUET \
    iris.iris \
    features.parquet.gzip

In [None]:
%%bigquery
SELECT * FROM iris.iris LIMIT 5

## 👷🏾 Desenvolvendo o pipeline de treinamento utilizando Kubeflow Pipelines

Para orquestrar fluxos de trabalho de ML no Vertex Pipelines, primeiro precisamos descrever o fluxo de trabalho como um pipeline. Os pipelines de ML são fluxos de trabalho de portáteis e escaláveis, baseados em contêineres. Os pipelines de ML são compostos por um conjunto de parâmetros de entrada e uma lista de etapas. Cada tarefa é uma instância de um componente do pipeline.

Podemos usar os pipelines de ML para:

- Aplicar estratégias MLOps para automatizar e monitorar processos repetíveis
- Testar diferentes conjuntos de hiperparâmetros, número de etapas de treinamento ou iterações, etc
- Reutilizar etapas de pipeline para treinar uma nova versão ou um novo modelo

É possível usar o Vertex Pipelines para executar pipelines que foram criados usando o SDK do **Kubeflow Pipelines** ou o **TensorFlow Extended**.

A seguir, utilizaremos o framework Kubeflow para criar um pipeline simples de treinamento de um classificador para o Iris Dataset.

In [9]:
from google.cloud import aiplatform

In [3]:
@dsl.component(
    packages_to_install=['sklearn', 'google-cloud-bigquery[bqstorage,pandas]']
)
def get_data_from_bq(
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
):
    import os
    from google.cloud import bigquery

    project_number = os.environ['CLOUD_ML_PROJECT_ID']

    bqclient = bigquery.Client(project=project_number)

    # Download query results
    query_string = """
    SELECT * FROM `ml-demos-garrido.iris.iris`
    """

    df = (
        bqclient.query(query_string)
        .result()
        .to_dataframe(
            create_bqstorage_client=True,
        )
    )
              
    from sklearn.model_selection import train_test_split

    train, test = train_test_split(df, test_size=0.2, random_state=42)
    
    train.to_csv(dataset_train.path, index=False)
    test.to_csv(dataset_test.path, index=False)


In [4]:
@dsl.component(
    packages_to_install=['sklearn', 'pandas']
)
def train_sklearn_model(
    dataset_train: Input[Dataset],
    model: Output[Model]
    
):
    import pandas as pd
    
    data = pd.read_csv(dataset_train.path)
    
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.tree import DecisionTreeClassifier
    
    clf = DecisionTreeClassifier()
    clf.fit(data.drop(columns=['species', 'targets']), 
                      data.species)
            
    import pickle
    
    # Persistindo o modelo 
    filename = 'model.pkl'
    pickle.dump(clf, open(model.path, 'wb'))


In [5]:
@dsl.component(
    packages_to_install=['sklearn', 'pandas', 'google-cloud-aiplatform']
)
def eval_model(
    dataset_test: Input[Dataset],
    sklearn_model: Input[Model],
    endpoint_id: str,
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics]
):
    import pandas as pd
    import pickle
    
    data = pd.read_csv(dataset_test.path)
    model = pickle.load(open(sklearn_model.path, 'rb'))
    
    score = model.score(
        data.drop(columns=['species', 'targets']),
        data.species
        )
    
    from sklearn.metrics import confusion_matrix
    
    y_pred = model.predict(data.drop(columns=['species', 'targets']))
    
    metrics.log_confusion_matrix(
       ['Setosa', 'Versicolour', 'Virginica'],
       confusion_matrix(
           data.species, y_pred
       ).tolist(),  
    )
    
    sklearn_model.metadata['test_score'] = float(score)
    smetrics.log_metric('score', float(score))
    
    from google.cloud import aiplatform
    import os
    
    # Definindo status de promoção
    promote = False
    
    # Inicializando Client da Vertex AI
    project_number = os.environ['CLOUD_ML_PROJECT_ID']
    aiplatform.init(project=project_number)
    
    # Instanciando Endpoint de produção
    endpoint = aiplatform.Endpoint(endpoint_name=f'projects/ml-demos-garrido/locations/us-central1/endpoints/{endpoint_id}')
    
    # Recuperando id do modelo em produção
    deployed_model_id = endpoint.list_models()[0].model
    
    # Instanciando Model de produção e recuperando uri do artefato
    deployed_model = aiplatform.Model(model_name=deployed_model_id)
    deployed_model_path = deployed_model.uri
    
    # Lendo o artefato do FUSE
    fuse_path = deployed_model_path.replace('gs:/', 'gcs') + 'model.pkl'
    deployed_model_loaded = pickle.load(open(fuse_path, 'rb'))
    
    baseline = deployed_model_loaded.score(
        data.drop(columns=['species', 'targets']),
        data.species
        )
  
    print(f'O score baseline é: {baseline}')
    new_val = float(smetrics.metadata['score'])
    print(f'O score novo é: {new_val}')
    
    if new_val >= baseline:
        promote = True
        print('Modelo atinge requisitos potenciais de deployment') 
    else:
        print('Modelo não atinge requisitos de deployment')
        
    #return promote

In [6]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    name='iris-classifier-kfp-pipeline'
)
def pipeline(
    endpoint_id: str = '7206946876350791680'
):
    ingest_data = get_data_from_bq()
    train_op = train_sklearn_model(ingest_data.outputs['dataset_train'])
    eval_op = eval_model(
        dataset_test=ingest_data.outputs['dataset_test'],
        sklearn_model=train_op.outputs['model'],
        endpoint_id=endpoint_id
    )
        
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='iris_classifier_pipeline.json')



## 👩🏾‍💻 Executando o pipeline de maneira serverless utilizando o Vertex Pipelines

Após desenvolvido o pipeline, podemos compilar a DAG utilizando o módulo `compile` da classe de compilação do KFP.

Com isso, iremos gerar um arquivo de configuração, que descreverá todas as dependencias da nossa esteira.

Dessa forma, é possível gerar novas execuções do mesmo pipeline através da Vertex AI, simplesmente apontando um caminho para o arquivo de configuração.

Nesse caso, iremos utilizar o Cloud Storage (que é um repositório de leitura compatível com a API da Vertex AI) para armazenar o arquivo de configuração, obtendo uma leitura rápida e fácil para execuções posteriores.

In [7]:
!gsutil cp iris_classifier_pipeline.json gs://garrido-dsml/pipelines/iris_classifier_pipeline.json

Copying file://iris_classifier_pipeline.json [Content-Type=application/json]...
/ [1 files][ 12.6 KiB/ 12.6 KiB]                                                
Operation completed over 1 objects/12.6 KiB.                                     


In [8]:
from kfp.v2.google.client import AIPlatformClient

TIMESTAMP = datetime.now().strftime('%Y%m%d%H%M%S')
JOB_ID = 'iris-classifier-kfp-pipeline-{timestamp}'.format(timestamp=TIMESTAMP)
COMPILED_PIPELINE_PATH = 'gs://garrido-dsml/pipelines/iris_classifier_pipeline.json'

api_client = AIPlatformClient(project_id=PROJECT_ID,
                           region=REGION)

pipeline_run_name = api_client.create_run_from_job_spec(
    job_spec_path=COMPILED_PIPELINE_PATH,
    job_id=JOB_ID,
    enable_caching=False,
    service_account='mlops-services@ml-demos-garrido.iam.gserviceaccount.com',
    pipeline_root=PIPELINE_ROOT,
)

