In [7]:
%load_ext autoreload
%autoreload 2
import kfp.dsl as dsl
import os
from dotenv import load_dotenv
from typing import NamedTuple
from kfp import compiler
from kfp.dsl import component, OutputPath, Output, Dataset, Input, Model, Metrics

from google.cloud import aiplatform as aip

#loading enviroment variables
load_dotenv()
bucket = os.getenv("bucket")
gcp_project = os.getenv("gcp_project")
gcp_service_account = os.getenv("gcp_service_account")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [8]:
def train_model(df_train_path:Input[Dataset], df_test_path:Input[Dataset], model_path: Output[Model], metrics: Output[Metrics]) -> float:
    import numpy as np
    import pandas as pd
    from sklearn.model_selection import cross_val_score
    from sklearn.metrics import mean_squared_error
    import xgboost as xgb

    train_data = pd.read_csv(df_train_path.path)
    test_data = pd.read_csv(df_test_path.path)

    X_train, y_train = train_data.drop("MEDV", axis=1), train_data["MEDV"]
    X_test, y_test = test_data.drop("MEDV", axis=1), test_data["MEDV"]

    # Create an XGBoost regressor
    xgb_regressor = xgb.XGBRegressor(objective="reg:squarederror", random_state=42)

    # Perform cross-validation on the train dataset
    cv_scores = cross_val_score(xgb_regressor, X_train, y_train, cv=5, scoring='neg_mean_squared_error')
    print("Cross-validation MSE scores:", -cv_scores)
    print("Average cross-validation MSE:", -np.mean(cv_scores))

    # Train the XGBoost regressor on the entire train dataset
    xgb_regressor.fit(X_train, y_train)

    xgb_regressor.save_model(model_path.path)

    # Predict and calculate the MSE on the test dataset
    y_pred = xgb_regressor.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    print("Test dataset MSE:", mse)
    metrics.log_metric('mse on test dataset', mse)
    return mse

def ingetst_data(df_train_path: Output[Dataset], df_test_path: Output[Dataset]):
    import pandas as pd
    from sklearn.datasets import load_boston
    from sklearn.model_selection import train_test_split

    boston_data = load_boston()
    X, y = boston_data.data, boston_data.target

    # Split the dataset into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Save the train and test sets as CSV files
    train_data = pd.DataFrame(X_train, columns=boston_data.feature_names)
    train_data['MEDV'] = y_train
    train_data.to_csv(df_train_path.path, index=False)

    test_data = pd.DataFrame(X_test, columns=boston_data.feature_names)
    test_data['MEDV'] = y_test
    test_data.to_csv(df_test_path.path, index=False)

# Create components for the ingestion and training functions
ingest_data_component = component(ingetst_data,base_image='europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-4:latest') # https://cloud.google.com/vertex-ai/docs/training/pre-built-containers
train_component = component(train_model,base_image='europe-docker.pkg.dev/vertex-ai/training/tf-cpu.2-4:latest') # https://cloud.google.com/vertex-ai/docs/training/pre-built-containers


# Define the pipeline using the Kubeflow Pipelines SDK
@dsl.pipeline(
    name="ltv-train",
)
def add_pipeline():
    # Instantiate the ingest_data_component and store its output
    ingest_data = ingest_data_component()
    
    # Instantiate the train_component, passing the output from the ingest_data_component
    train_model = train_component(df_train_path=ingest_data.outputs['df_train_path'],df_test_path=ingest_data.outputs['df_test_path'])
    
    # Disable caching for the train_model component to ensure it runs every time
    train_model.set_caching_options(False)

# Compile the pipeline to generate a JSON file for execution
compiler.Compiler().compile(pipeline_func=add_pipeline, package_path="local_run.yaml")

In [9]:
job = aip.PipelineJob(
    #job_id='test' # TODO se in the future
    display_name="First kubeflow pipeline",
    template_path="local_run.yaml",
    pipeline_root=bucket,
    location="europe-west1",
    project=gcp_project,
)

job.submit(
    service_account=gcp_service_account
)



Creating PipelineJob
PipelineJob created. Resource name: projects/559395553199/locations/europe-west1/pipelineJobs/ltv-train-20230323142803
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/559395553199/locations/europe-west1/pipelineJobs/ltv-train-20230323142803')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/ltv-train-20230323142803?project=559395553199
