### Load Config File

In [3]:
import yaml
from box import ConfigBox


with open("config.yaml", "r") as ymlfile:
    config = yaml.safe_load(ymlfile)

config = ConfigBox(config)

### Imports

In [28]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from typing import NamedTuple

from kfp.dsl import component
from kfp.dsl import pipeline
from kfp import compiler
from kfp.dsl import (Artifact, Model, Dataset, Input, Output, Metrics)

### Components

#### Preprocess

In [29]:
@component(
    base_image="python:3.11-slim",
    packages_to_install=[
        config.packages.BIGQUERY, 
        config.packages.PANDAS,
        config.packages.DB_TYPES
    ]
)
def fetch_big_data_table(
    project_id: str,
    dataset_id: str,
    table_id: str,
    dataset_artifact: Output[Dataset]
):
    
    from google.cloud import bigquery
    import logging
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    logger.info('Fetching big query data')
    client = bigquery.Client(project=project_id)
    
    query = f"""
        SELECT *
        FROM `{project_id}.{dataset_id}.{table_id}`
        """
    logger.info(f'Query: {query}')
    data = client.query(query).to_dataframe()

    logger.info('Data fetched')
    data.to_csv(dataset_artifact.path, index=False)
    

In [30]:
@component(
    base_image="us-central1-docker.pkg.dev/pebolas-sandbox/sample-model/prepare_data:latest"
)
def prepare_data(
    input_data: Input[Dataset],
    scaler_artifact: Output[Artifact],
    dataset_artifact: Output[Dataset]
    
):
    import joblib
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import logging
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info('Preparing data')
    data = pd.read_csv(input_data.path, index_col=False)
    
    data.Age = data.Age.fillna(data.Age.mean())
    data.Embarked = data.Embarked.fillna(
        data.Embarked.mode()[0]
    )

    data.drop(columns=["Cabin", "Name", "Ticket", "PassengerId"], inplace=True)
    data.Sex = data.Sex.map({"male": 0, "female": 1})

    data = pd.get_dummies(
        data, columns=["Embarked"], drop_first=True
    )
    data = pd.get_dummies(
        data, columns=["Pclass"], drop_first=True
    )
    
    
    columns_to_scale = ["Age", "Fare"]
    scaler = StandardScaler()
    data[columns_to_scale] = scaler.fit_transform(data[columns_to_scale])

    logger.info('Data prepared')
    
    logger.info('Saving artifacts')
    joblib.dump(scaler, scaler_artifact.path)
    data.to_csv(dataset_artifact.path, index=False)
    logger.info('Artifacts saved')

    
    

In [31]:
@component(
    base_image="python:3.11-slim",
    packages_to_install=[
        config.packages.AIPLATFORM
    ]
)
def fetch_dataset(
    display_name: str,
    dataset_artifact: Input[Dataset],
) -> str:
    from google.cloud import aiplatform
    import logging
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info(f"Fetching dataset {display_name}")
    dataset_list = aiplatform.TabularDataset.list(
        filter=f"display_name={display_name}"
    )
    
    logger.info(f"Dataset list: {dataset_list}")
    if len(dataset_list) > 0:
        logger.info(f"Dataset {display_name} already exists")
        data = dataset_list[0]
    else:
        logger.info(f"Creating dataset {display_name}")
        data = aiplatform.TabularDataset.create(
            display_name=display_name,
            gcs_source=dataset_artifact.uri
        )
        
    logger.info(f"Dataset {display_name} fetched")
    return data.display_name

### Train

In [32]:
@component(
    base_image="python:3.11-slim",
    packages_to_install=[
        config.packages.AIPLATFORM
    ]
)
def fetch_model(
    display_name: str
) -> str:
    from google.cloud import aiplatform
    import logging
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info(f"Fetching model {display_name}")
    
    model_list = aiplatform.Model.list(
        filter=f"display_name={display_name}"
    )
    
    logger.info(f"Model list: {model_list}")
    
    if len(model_list) > 0:
        model = model_list[0]
        parent_model = model.resource_name
        logger.info(f"Model found: {model}")
    else:
        parent_model = ''
        logger.info(f"Model not found")
    
    return parent_model

In [33]:
@component(
    base_image="python:3.11-slim",
    packages_to_install=[
        config.packages.AIPLATFORM
    ]
)
def train_model(
    parent_model: str,
    dataset_name: str,
    training_job_name: str,
    train_image_uri: str,
    staging_bucket: str,
    model_serving_container_image_uri: str
) -> Model:
    from google.cloud import aiplatform
    import logging
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info(f"Fetching dataset {dataset_name}")
    
    dataset = aiplatform.TabularDataset.list(
        filter=f"display_name={dataset_name}"
    )[0]
    
    logger.info(f"Dataset found: {dataset}")
    
    logger.info(f"Training model {training_job_name}")
    
    job = aiplatform.CustomContainerTrainingJob(
        display_name=training_job_name,
        container_uri=train_image_uri,
        model_serving_container_image_uri= model_serving_container_image_uri,
        model_serving_container_health_route='/healthz',
        model_serving_container_predict_route='/predict',
        staging_bucket=staging_bucket
    )
    
    logger.info(f"Running training job")
    model = job.run(
        dataset=dataset,
        parent_model=parent_model if parent_model != '' else None,
        machine_type="n1-standard-4",
        args=["--max_iter=1000"],
        service_account="guido-owner@pebolas-sandbox.iam.gserviceaccount.com",
    )
    logger.info(f"Training job finished")
    return Model(uri=model.uri)

#### Deploy

In [34]:
@component(
    base_image="python:3.11-slim",
    packages_to_install=[
        config.packages.AIPLATFORM
    ]
)
def deploy(
    model_artifact: Input[Model],
    scaler_artifact: Input[Artifact],
):
    print("deploy :)")
    
    # model.batch_predict(
    #     job_display_name="test_batch_predict",
    #     gcs_source="gs://pebolas-sandbox-vertex-staging-us-central1/test.jsonl",
    #     gcs_destination_prefix="gs://pebolas-sandbox-vertex-staging-us-central1/",
    #     machine_type="n1-standard-2",
    #     sync=True
    # )

### Pipeline

In [35]:
@pipeline(
    name=config.pipeline.name,
    description=config.pipeline.description,
    pipeline_root=config.pipeline.root + config.pipeline.name
)
def titaninc_pipeline(
    project_id: str,
    dataset_id: str,
    table_id: str,
    dataset_display_name: str,
    train_model_name: str,
    train_job_name: str,
    train_image_uri: str,
    staging_bucket: str,
    model_serving_container_image_uri: str,
    
):
    fetch_data = fetch_big_data_table(
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=table_id
        )
    
    prepare_data_task = prepare_data(
        input_data=fetch_data.output
        )
    
    fetch_dataset_task = fetch_dataset(
        display_name=dataset_display_name,
        dataset_artifact=prepare_data_task.outputs["dataset_artifact"]
    )
    
    fetch_model_task = fetch_model(
        display_name=train_model_name
    )
    
    train_model_task = train_model(
        parent_model=fetch_model_task.output,
        dataset_name=fetch_dataset_task.output,
        training_job_name=train_job_name,
        train_image_uri=train_image_uri,
        staging_bucket=staging_bucket,
        model_serving_container_image_uri=model_serving_container_image_uri
    )
    
    deploy_task = deploy(
        model_artifact=train_model_task.output,
        scaler_artifact=prepare_data_task.outputs["scaler_artifact"]
    )
    

In [36]:
aiplatform.init(
    project=config.project_id, 
    location=config.location
)

In [37]:
compiler.Compiler().compile(
    pipeline_func=titaninc_pipeline, 
    package_path=config.pipeline.package_path
    )

In [38]:
job = pipeline_jobs.PipelineJob(
    display_name=config.pipeline.name,
    template_path=config.pipeline.package_path,
    parameter_values={
        'project_id': config.project_id,
        'dataset_id': config.parameters.dataset_id,
        'table_id': config.parameters.table_id,
        'dataset_display_name': config.parameters.dataset_display_name,
        'train_model_name': config.parameters.train_model_name,
        'train_job_name': config.parameters.train_job_name,
        'train_image_uri': config.parameters.train_image_uri,
        'staging_bucket': config.parameters.staging_bucket,
        'model_serving_container_image_uri': config.parameters.model_serving_container_image_uri,
    }
)

In [39]:
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/470842673491/locations/us-central1/pipelineJobs/titanic-pipeline-20241018133927
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/470842673491/locations/us-central1/pipelineJobs/titanic-pipeline-20241018133927')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/titanic-pipeline-20241018133927?project=470842673491
PipelineJob run completed. Resource name: projects/470842673491/locations/us-central1/pipelineJobs/titanic-pipeline-20241018133927
