In [1]:
from kfp import dsl
from kfp.dsl import component
from kfp.v2 import compiler
from google.cloud import aiplatform
from google.cloud import storage
import yaml
import json

def load_config(gcs_path):
    client = storage.Client()
    bucket_name, blob_name = gcs_path.replace("gs://", "").split("/", 1)
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)
    config_yaml = blob.download_as_text()
    return yaml.safe_load(config_yaml)

@component
def get_model_name(config: dict) -> str:
    return f"{config['model_name']}_exp_1"

@component
def create_bq_query(config: dict, experiment_id: str) -> str:
    query = f"""
    CREATE OR REPLACE MODEL `{config['project_id']}.{config['dataset_id']}.{config['model_name']}_exp_{experiment_id}`
    OPTIONS(
        MODEL_TYPE='{config['model_options']['model_type']}',
        BATCH_SIZE={config['model_options']['batch_size']},
        HIDDEN_UNITS=HPARAM_CANDIDATES([STRUCT({config['model_options']['hidden_units']})]),
        HPARAM_TUNING_OBJECTIVES=['MEAN_SQUARED_ERROR'],
        HPARAM_TUNING_ALGORITHM='{config['hyperparameter_tuning']['algorithm']}',
        NUM_TRIALS={config['hyperparameter_tuning']['num_trials']},
        ACTIVATION_FN = HPARAM_CANDIDATES({config['model_options']['activation_fn']}),
        OPTIMIZER = HPARAM_CANDIDATES({config['model_options']['optimizer']})
    ) AS (
     SELECT
       {', '.join(config['input_features'])}
     FROM
       `{config['project_id']}.{config['dataset_id']}.{config['input_table']}`
    );
    """
    return query

@component(packages_to_install=["google-cloud-storage==2.18.1","PyYAML==6.0.2"])
def load_config_component(gcs_path: str, project_id: str) -> dict:
    from google.cloud import storage
    import yaml
    
    client = storage.Client(project=project_id)
    bucket_name, blob_name = gcs_path.replace("gs://", "").split("/", 1)
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)
    config_yaml = blob.download_as_text()
    return yaml.safe_load(config_yaml)

@component(packages_to_install=["google-cloud-bigquery==3.24.0"])
def run_bq_query(query: str, project_id: str) -> str:
    from google.cloud import bigquery
    client = bigquery.Client(project=project_id)
    job = client.query(query)
    job.result()
    return f"Query completed. Job ID: {job.job_id}"

@component(packages_to_install=["google-cloud-bigquery==3.24.0"])
def detect_anomalies(config: dict, model_name: str, project_id: str) -> str:
    from google.cloud import bigquery
    client = bigquery.Client(project=project_id)
    
    query = f"""
    CREATE OR REPLACE TABLE `{config['project_id']}.{config['dataset_id']}.{config['results_table']}` AS (
    SELECT
      *
    FROM
      ML.DETECT_ANOMALIES(MODEL `{config['project_id']}.{config['dataset_id']}.{model_name}`,
        STRUCT({config['detect_anomalies']['contamination']} AS contamination),
        TABLE `{config['project_id']}.{config['dataset_id']}.{config['input_table']}`)
    )
    """
    
    job = client.query(query)
    job.result()
    return f"Anomaly detection completed. Results stored in {config['project_id']}.{config['dataset_id']}.{config['results_table']}"

@component(packages_to_install=["google-cloud-bigquery==3.24.0"])
def export_results_to_gcs(config: dict, project_id: str) -> str:
    from google.cloud import bigquery
    client = bigquery.Client(project=project_id)
    
    destination_uri = f"gs://{config['gcs_bucket']}/{config['results_table']}.csv"
    dataset_ref = bigquery.DatasetReference(config['project_id'], config['dataset_id'])
    table_ref = dataset_ref.table(config['results_table'])
    
    job_config = bigquery.job.ExtractJobConfig()
    job_config.destination_format = bigquery.DestinationFormat.CSV

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        job_config=job_config
    )
    extract_job.result()
    
    return f"Results exported to {destination_uri}"

@dsl.pipeline(name="Hyperparameter Tuning Pipeline")
def pipeline_a(config_path: str, project_id: str):
    config_task = load_config_component(gcs_path=config_path, project_id=project_id)
    
    exp1_query_task = create_bq_query(config=config_task.output, experiment_id="1")
    exp2_query_task = create_bq_query(config=config_task.output, experiment_id="2")
    
    run_bq_query(query=exp1_query_task.output, project_id=project_id)
    run_bq_query(query=exp2_query_task.output, project_id=project_id)

@dsl.pipeline(name="Anomaly Detection Pipeline")
def pipeline_b(config_path: str, project_id: str):
    config_task = load_config_component(gcs_path=config_path, project_id=project_id)
    
    model_name_task = get_model_name(config=config_task.output)
    
    detect_task = detect_anomalies(config=config_task.output, model_name=model_name_task.output, project_id=project_id)
    export_results_to_gcs(config=config_task.output, project_id=project_id).after(detect_task)

def compile_and_run_pipeline(pipeline_func, pipeline_name, config_path, project_id, location):
    
    experiment_name=f"bqml-experiment-{pipeline_name}"
    experiment_description="BigQuery ML Vertexi AI Piplines Demo"
    
    aiplatform.init(
        project=project_id, 
        location=location,
        experiment=experiment_name,
        experiment_description=experiment_description,
        experiment_tensorboard=False,
    )
    
    compiler.Compiler().compile(
        pipeline_func=pipeline_func,
        package_path=f'{pipeline_name}.json'
    )

    config = load_config(config_path)
    job = aiplatform.PipelineJob(
        display_name=pipeline_name,
        template_path=f'{pipeline_name}.json',
        pipeline_root=f"gs://{config['gcs_bucket']}/pipeline_root",
        parameter_values={'config_path': config_path, 'project_id': project_id},
        project=project_id,
        location=location
    )
    job.submit(experiment=experiment_name)

  from kfp.v2 import compiler
  return component_factory.create_component_from_func(


In [2]:
config_path = "gs://avoxi_workshop_bucket/data_pipeline/bq_config.yaml"
project_id = "gurkomal-playground"  # Replace with your actual project ID
location = "us-central1"  # Replace with your desired location

In [4]:
compile_and_run_pipeline(pipeline_a, "hyperparameter-tuning-pipeline", config_path, project_id, location)

Creating PipelineJob
PipelineJob created. Resource name: projects/506365831141/locations/us-central1/pipelineJobs/hyperparameter-tuning-pipeline-20240912041902
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/506365831141/locations/us-central1/pipelineJobs/hyperparameter-tuning-pipeline-20240912041902')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hyperparameter-tuning-pipeline-20240912041902?project=506365831141
Associating projects/506365831141/locations/us-central1/pipelineJobs/hyperparameter-tuning-pipeline-20240912041902 to Experiment: bqml-experiment-hyperparameter-tuning-pipeline


In [3]:
compile_and_run_pipeline(pipeline_b, "anomaly-detection-pipeline", config_path, project_id, location)

Creating PipelineJob
PipelineJob created. Resource name: projects/506365831141/locations/us-central1/pipelineJobs/anomaly-detection-pipeline-20240912041837
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/506365831141/locations/us-central1/pipelineJobs/anomaly-detection-pipeline-20240912041837')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/anomaly-detection-pipeline-20240912041837?project=506365831141
Associating projects/506365831141/locations/us-central1/pipelineJobs/anomaly-detection-pipeline-20240912041837 to Experiment: bqml-experiment-anomaly-detection-pipeline
