In [14]:
import kfp
from google.cloud import aiplatform
from typing import NamedTuple
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component, Dataset)
from google.cloud import storage
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
import os

from google_cloud_pipeline_components.v1 import bigquery as gcc_bq

In [12]:
PIPELINE_LABEL = os.getenv("PIPELINE_LABEL", '{"a": "a", "b": "b"}')

In [4]:
PRC_TEST = os.getenv("PRC_COBRANZAS_GENERAR_HDCARTERA", "trim-odyssey-390415.github3.cencus_filter_by_age")

In [None]:
QUERY_SP_1 = '''
    DECLARE input_age INT64 DEFAULT ${input_age};
    CALL `trim-odyssey-390415.github3.cencus_filter_by_age` (input_age); 
'''

In [None]:
def generate_query(query_text:str, **replacements) -> str:
   from string import Template
   query_base = Template(query_text).substitute(**replacements)  
   return query_base

In [7]:
def query_job_configuration(query:str, project:str, labels:dict, location:str="US") -> dict:
    query_job = {
        "project": project,
        "labels": labels,
        "query":query,
        "location": location
    }
    return query_job

In [8]:
@component
def error_op(msg: str):
    raise(msg)

In [9]:
@component(packages_to_install=["google-cloud-bigquery"])
def validate_data(
    validate_table: str,
) -> NamedTuple(
    "Outputs",
    [
        ("condition", str)
    ],
):
    from google.cloud import bigquery

    client = bigquery.Client()

    try:
        client.get_table(validate_table)
        condition = "true"
    except Exception as e:
        condition = "false"

    return (condition,)

In [None]:
@kfp.dsl.pipeline(
    name="test", 
    description="Pipeline de prueba",
    pipeline_root="gs://laybarm_bk/demo"
)

def main_pipeline(
    project: str,
    validate_table: str,
    input_age_input: int,
    gcp_region: str = "us-central1",
):
    
    notify_email_task = VertexNotificationEmailOp(recipients=["a.luisaybar@gmail.com"])
    notify_email_task.set_display_name('Notification Email')
    
    with dsl.ExitHandler(notify_email_task, name="Execute pipeline prediction"):

        validate_tables_job = validate_data(
            validate_table = validate_table
        )
        validate_tables_job.set_display_name('Validate Data')

        with dsl.Condition(
            validate_tables_job.outputs['condition']=="false",
            name="no-execute",
        ):
            error_op(msg="No se logro validar las tablas de ingesta.")


        with dsl.Condition(
            validate_tables_job.outputs['condition']=="true",
            name="execute",
        ):
              
            query_prc_1 = generate_query(
                QUERY_SP_1, 
                PRC_TEST=PRC_TEST, 
                input_age=input_age_input
            )


            query_prc_1_job = query_job_configuration(query_prc_1, project, PIPELINE_LABEL)

            bq_prc_1_op = gcc_bq.BigqueryQueryJobOp(**query_prc_1_job)
            bq_prc_1_op.set_display_name("PRC_COBRANZAS_GENERAR_HDCARTERA")

In [18]:
compiler.Compiler().compile(
    pipeline_func=main_pipeline,
    package_path="pipeline_bigquery.json"
)

In [19]:
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"Archivo {source_file_name} subido a {destination_blob_name} en el bucket {bucket_name}.")

# Define las variables
bucket_name = "laybarm_bk"
destination_blob_name = "demo/pipeline_bigquery.json"
pipeline_file = "pipeline_bigquery.json"
# Llamar a la función para subir el archivo
upload_to_gcs(bucket_name, pipeline_file, destination_blob_name)

Archivo pipeline_bigquery.json subido a demo/pipeline_bigquery.json en el bucket laybarm_bk.


In [20]:
aiplatform.init(project="trim-odyssey-390415", location="us-central1")

In [22]:
job = aiplatform.PipelineJob(
    display_name="bigquery test pipeline",
    template_path="gs://laybarm_bk/demo/pipeline_bigquery.json",
    enable_caching=False,
    project="trim-odyssey-390415",
    location="us-central1",
    parameter_values={"project": "trim-odyssey-390415", 
                      "validate_table": "trim-odyssey-390415.github3.census_by_age",
                      "input_age_input": 60
                     }
   
)

print('submit pipeline job ...')
job.submit(service_account="mlops-process@trim-odyssey-390415.iam.gserviceaccount.com")

submit pipeline job ...
Creating PipelineJob
PipelineJob created. Resource name: projects/668142834453/locations/us-central1/pipelineJobs/test-20250111061502
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/668142834453/locations/us-central1/pipelineJobs/test-20250111061502')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/test-20250111061502?project=668142834453
