In [95]:
import google.cloud.aiplatform as aiplatform
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component, Condition
from kfp.registry import RegistryClient
from typing import NamedTuple
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform

KFP SDK version: 2.5.0
google-cloud-aiplatform==1.51.0


In [96]:
PROJECT_ID = "demoespecialidadgcp"
REGION = "us-central1"
BUCKET_URI = f"gs://demo-2-black-friday"
SERVICE_ACCOUNT = "502688298240-compute@developer.gserviceaccount.com"
PIPELINE_ROOT = f"{BUCKET_URI}/pipelines"
DATASET_ID = "demo_2_black_friday"
TABLE_TRAIN = "raw_train"
TABLE_TEST = "raw_test"

! gcloud config set project {PROJECT_ID}
BQ_REGION = REGION.split("-")[0].upper()

Updated property [core/project].


In [97]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [98]:
@component(
    packages_to_install=["google-cloud-bigquery[pandas]==3.15.0"],
    base_image="python:3.10"
)
def export_datasets(
    project_id: str,
    dataset_id: str,
    table_train: str,
    table_test: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
):
    """
    Args:
        project_id: The Project ID.
        dataset_id: The BigQuery Dataset ID. Must be pre-created in the project.
        table_train: The BigQuery train table name.
        table_test: The BigQuery test table name.
        
    Returns:
        dataset_train: The Dataset artifact with exported CSV file.
        dataset_test: The Dataset artifact with exported CSV file.
    """
    from google.cloud import bigquery
    import pandas as pd
    import numpy as np

    client = bigquery.Client(project=project_id)
    table_name = f"{project_id}.{dataset_id}.{table_train}"
    query = """
        SELECT * 
        FROM {table_name}
    """.format(
        table_name=table_name
    )
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query, job_config=job_config)    
    df_train = query_job.result().to_dataframe()
    
    table_name = f"{project_id}.{dataset_id}.{table_test}"
    query = """
        SELECT * 
        FROM {table_name}
    """.format(
        table_name=table_name
    )
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query, job_config=job_config)    
    df_test = query_job.result().to_dataframe()
    
    df_train['source'] = 'train'
    df_test['source'] = 'test'
    
    dataset = pd.concat([df_train, df_test])
    
    dataset['Age'] = dataset['Age'].apply(lambda x : str(x).replace('55+', '55'))
    dataset['Stay_In_Current_City_Years'] = dataset['Stay_In_Current_City_Years'].apply(lambda x : str(x).replace('4+', '4'))
    dataset.drop('Product_Category_3', axis = 1, inplace = True)
    dataset.drop('User_ID', axis = 1, inplace = True)
    dataset.drop('Product_ID', axis = 1, inplace = True)
    dataset['Product_Category_2'].fillna(dataset['Product_Category_2'].median(), inplace = True)
    dataset['Stay_In_Current_City_Years'] = dataset['Stay_In_Current_City_Years'].astype('int')
    dataset.drop(['Gender', 'City_Category', 'Marital_Status'], axis = 1, inplace = True)
    
    train = dataset.loc[dataset['source'] == 'train']
    test = dataset.loc[dataset['source'] == 'test']
    train.drop('source', axis = 1, inplace = True)
    test.drop('source', axis = 1, inplace = True)
    
    train.to_csv(dataset_train.path + '.csv', index=False)
    test.to_csv(dataset_test.path + '.csv', index=False)


# Model Version 1

In [99]:
@component(
    packages_to_install=[
        "pandas==2.2.2",        
        "scikit-learn==1.4.2",
        "scipy==1.13.0",
        "xgboost==2.0.3"
    ],
    base_image="python:3.10"
)
def train_model(
    dataset_train: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
):
    """Training XGBoost Regressor model for demo-2-black-friday.

    Args:
        dataset_train: The training dataset.

    Returns:
        model: The model artifact stores the model.joblib file.
        metrics: The metrics of the trained model.
    """
    
    import pandas as pd
    import numpy as np
    import time, os, joblib
    from sklearn.model_selection import RandomizedSearchCV, train_test_split
    from sklearn.metrics import r2_score, mean_squared_error
    from sklearn.preprocessing import OrdinalEncoder, StandardScaler
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from xgboost import XGBRegressor
    
    with open(dataset_train.path + '.csv', "r") as train_data:
        dataset = pd.read_csv(train_data)
        
    X = dataset.drop("Purchase", axis = 1)
    Y = dataset["Purchase"]
    
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size = 0.2, random_state = 42)
    
    preprocessor = ColumnTransformer(transformers=[
        ('cat', OrdinalEncoder(), ['Age'])],
    )
    
    max_depth = [int(x) for x in np.linspace(start = 5, stop = 20, num = 15)]
    learning_rate = ['0.01', '0.05', '0.1', '0.25', '0.5', '0.75', '1.0']
    min_child_weight = [int(x) for x in np.linspace(start = 45, stop = 70, num = 15)]
    
    params = {
     "regressor__learning_rate"    : learning_rate,
     "regressor__max_depth"        : max_depth,
     "regressor__min_child_weight" : min_child_weight,
     "regressor__gamma"            : [0.0, 0.1, 0.2 , 0.3, 0.4],
     "regressor__colsample_bytree" : [0.3, 0.4, 0.5 , 0.7]
    }
    
    xgb = XGBRegressor(verbosity = 0, random_state = 42)
       
    regr = Pipeline([
        ('preprocessor', preprocessor),
        ('standard-scaler', StandardScaler()),
        ('regressor', xgb)
    ])
    
    xgb_cv = RandomizedSearchCV(regr, param_distributions = params, cv = 5, random_state = 42)
    
    xgb_cv.fit(X_train, Y_train)
      
    xgb_best = xgb_cv.best_estimator_
    
    Y_pred_xgb_best = xgb_best.predict(X_test)
    
    r2 = r2_score(Y_test, Y_pred_xgb_best)
    rmse = np.sqrt(mean_squared_error(Y_test, Y_pred_xgb_best))
    
    metrics.log_metric("Framework", "XGBoost")
    metrics.log_metric("Train_samples_size", len(X_train))
    metrics.log_metric("Validation_samples_size", len(X_test))
    metrics.log_metric("RMSE", round(rmse,2))
    metrics.log_metric("R2 score", round(r2,2))
    
    print("XGB regression:")
    print("RMSE:",rmse)
    print("R2 score:", r2)
    
    # Export the model to a file
    os.makedirs(model.path, exist_ok=True)
    joblib.dump(xgb_best, os.path.join(model.path, "model.joblib"))
    
    

# Model Version 2

In [100]:
@component(
    packages_to_install=[
        "pandas==2.2.2",        
        "scikit-learn==1.4.2",
        "joblib==1.2.0",
        "xgboost==2.0.3"
    ],
    base_image="python:3.10"
)
def evaluate_model(
    dataset_test: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics]
) -> NamedTuple('EvaluationOutput', [('r2', float), ('rmse', float)]):
    """Evaluate the trained model with test data.

    Args:
        dataset_test: The testing dataset.
        model: The trained model.
        
    Returns:
        metrics: The evaluation metrics of the model.
        r2: The R2 score of the model.
        rmse: The RMSE of the model.
    """
    import pandas as pd
    import numpy as np
    import joblib
    from sklearn.metrics import r2_score, mean_squared_error
    from xgboost import XGBRegressor
    try:
        # Load the test dataset
        with open(dataset_test.path + '.csv', "r") as test_data:
            test_dataset = pd.read_csv(test_data)

        X_test = test_dataset.drop("Purchase", axis=1)
        Y_test = test_dataset["Purchase"]

        # Load the trained model
        model_file = model.path + "/model.joblib"
        trained_model = joblib.load(model_file)

        # Predict with the model
        Y_pred = trained_model.predict(X_test)

        # Calculate evaluation metrics
        r2 = r2_score(Y_test, Y_pred)
        rmse = np.sqrt(mean_squared_error(Y_test, Y_pred))

        metrics.log_metric("RMSE", round(rmse, 2))
        metrics.log_metric("R2 score", round(r2, 2))

        print("Evaluation results:")
        print("RMSE:", rmse)
        print("R2 score:", r2)

        from collections import namedtuple
        EvaluationOutput = namedtuple('EvaluationOutput', ['r2', 'rmse'])
        return EvaluationOutput(r2=float(r2), rmse=float(rmse))
    except Exception as e:
        print(f"Error during model evaluation: {str(e)}")
        raise



In [101]:
@component(
    packages_to_install=[
        "pandas==2.2.2",        
        "scikit-learn==1.4.2",
        "xgboost==2.0.3"
    ],
    base_image="python:3.10"
)
def inference(
    dataset_test: Input[Dataset],
    model: Input[Model],
    predictions: Output[Dataset]
):
    """Perform inference with the trained model on the test dataset.

    Args:
        dataset_test: The testing dataset.
        model: The trained model.
        
    Returns:
        predictions: The dataset with predictions.
    """
    import pandas as pd
    import joblib
    from xgboost import XGBRegressor

    # Load the test dataset
    with open(dataset_test.path + '.csv', "r") as test_data:
        test_dataset = pd.read_csv(test_data)
        
    X_test = test_dataset.drop("Purchase", axis=1)
    
    # Load the trained model
    model_file = model.path + "/model.joblib"
    trained_model = joblib.load(model_file)
    
    # Predict with the model
    Y_pred = trained_model.predict(X_test)
    
    # Save predictions to the output
    predictions_df = test_dataset.copy()
    predictions_df["Predicted_Purchase"] = Y_pred
    predictions_df.to_csv(predictions.path + '.csv', index=False)


In [102]:
@dsl.pipeline(
    name="demo_2_black_friday",
)
def pipeline(
    PROJECT_ID: str,
    DATASET_ID: str,
    TABLE_TRAIN: str,
    TABLE_TEST: str,
):
    export_dataset_task = export_datasets(
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_train=TABLE_TRAIN,
        table_test=TABLE_TEST,
    )
    
    train_model_task = train_model(
        dataset_train=export_dataset_task.outputs["dataset_train"],
    )
    
    evaluate_model_task = evaluate_model(
        dataset_test=export_dataset_task.outputs["dataset_train"],
        model=train_model_task.outputs["model"],
    )
    
    with dsl.Condition(evaluate_model_task.outputs['r2'] < 0.5):
        inference_task = inference(
            dataset_test=export_dataset_task.outputs["dataset_test"],
            model=train_model_task.outputs["model"],
        )

  with dsl.Condition(evaluate_model_task.outputs['r2'] < 0.5):


In [103]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipe_comps/demo_2_black_friday_pipeline.yaml")

In [45]:
@dsl.pipeline(
    name="demo_2_black_friday",
)
def pipeline(
    PROJECT_ID: str,
    DATASET_ID: str,
    TABLE_TRAIN: str,
    TABLE_TEST: str,
    ):
    export_dataset_task = export_datasets(
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_train=TABLE_TRAIN,
        table_test=TABLE_TEST,
    )
    
    train_model_task = train_model(
        dataset_train=export_dataset_task.outputs["dataset_train"],
    )


In [46]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipe_comps/demo_2_black_friday_pipeline.yaml")

In [104]:
job = aiplatform.PipelineJob(
    display_name="demo_2_black_friday",
    template_path="pipe_comps/demo_2_black_friday_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'PROJECT_ID': PROJECT_ID,
        'DATASET_ID': DATASET_ID,
        'TABLE_TRAIN': TABLE_TRAIN,
        'TABLE_TEST': TABLE_TEST
    },
    enable_caching=True,
)

job.run(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/502688298240/locations/us-central1/pipelineJobs/demo-2-black-friday-20240717190023
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/502688298240/locations/us-central1/pipelineJobs/demo-2-black-friday-20240717190023')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/demo-2-black-friday-20240717190023?project=502688298240
PipelineJob projects/502688298240/locations/us-central1/pipelineJobs/demo-2-black-friday-20240717190023 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/502688298240/locations/us-central1/pipelineJobs/demo-2-black-friday-20240717190023 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/502688298240/locations/us-central1/pipelineJobs/demo-2-black-friday-20240717190023 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/502688298240/locations/us-

In [105]:
client_registry = RegistryClient(host=f"https://us-central1-kfp.pkg.dev/demoespecialidadgcp/demo-2-black-friday")

In [106]:
templateName, versionName = client_registry.upload_pipeline(
  file_name="pipe_comps/demo_2_black_friday_pipeline.yaml",
  tags=["latest"],
  extra_headers={"description":"Pipeline para la transformacion de datos, entrenamiento de modelos, registro de metricas, predicciones y modelo, y despliegue del modelo"})