In [21]:
import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component, Dataset)
from google.cloud import storage
from typing import NamedTuple
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

from google_cloud_pipeline_components.v1 import bigquery as gcc_bq
import os

In [None]:
PRC_COBRANZAS = os.getenv("PRC_COBRANZAS", "mlops12-469915.github.cencus_filter_by_age")
PIPELINE_LABEL = os.getenv("PIPELINE_LABEL", '{"a": "a", "b": "b"}')

In [34]:
QUERY_SP_1='''
    DECLARE input_age INT64 DEFAULT ${input_age};
    CALL `${PRC_COBRANZAS}` (input_age);
'''

In [35]:
def generate_query(query_text: str, **replacements) -> str:
    from string import Template
    
    query_base = Template(query_text).substitute(**replacements)
    return query_base

In [36]:
def query_job_configuration(query: str, project: str, labels: dict, location:str='US') -> dict:
    query_job = {
        "project": project,
        "labels": labels,
        "query": query,
        "location": location
    }
    return query_job

In [37]:
@component(packages_to_install=["google-cloud-bigquery"])
def validate_data(
    source_x_train_table: str,
) -> NamedTuple(
    "Outputs",
    [
        ("condition", str)
    ],
):
    from google.cloud import bigquery

    client = bigquery.Client()

    try:
        client.get_table(source_x_train_table)
        condition = "true"
    except Exception as e:
        condition = "false"

    return (condition,)

In [38]:
@component
def error_op(msg: str):
    raise(msg)

In [39]:
@kfp.dsl.pipeline(
    name="pipeline-prediction-model", 
    description="intro",
    pipeline_root="gs://testing-mlp12/financiera_oh_segmentacion/pipeline_prediction"
)

def main_pipeline(
    project: str,
    source_x_train_table: str,
    input_age: int,
    gcp_region: str = "us-central1",
):
    
    notify_email_task = VertexNotificationEmailOp(recipients=["a.luisaybar@gmail.com"])
    notify_email_task.set_display_name('Notification Email')
    
    with dsl.ExitHandler(notify_email_task, name="Execute pipeline prediction"):
        
        validate_tables_job = validate_data(
            source_x_train_table = source_x_train_table
        )
        validate_tables_job.set_display_name('Validate Data')

        with dsl.Condition(
            validate_tables_job.outputs['condition']=="false",
            name="no-execute",
        ):
            error_op(
                msg="No se logro validar las tablas de ingesta."
            )


        with dsl.Condition(
            validate_tables_job.outputs['condition']=="true",
            name="execute",
        ):
            

            query_prc_1 = generate_query(
                QUERY_SP_1,
                PRC_COBRANZAS=PRC_COBRANZAS,
                input_age=input_age
            )
            
            query_prc_1_job = query_job_configuration(query_prc_1, project, PIPELINE_LABEL)
            
            bq_query_prc = gcc_bq.BigqueryQueryJobOp(**query_prc_1_job)
            bq_query_prc.set_display_name("BIGQUERY SP")

In [40]:
compiler.Compiler().compile(
    pipeline_func=main_pipeline,
    package_path="pipeline_prediction.json"
)



In [41]:
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"Archivo {source_file_name} subido a {destination_blob_name} en el bucket {bucket_name}.")

# Define las variables
bucket_name = "testing-mlp12"
destination_blob_name = "financiera_oh_segmentacion/pipeline_prediction/pipeline_prediction.json"
pipeline_file = "pipeline_prediction.json"
# Llamar a la función para subir el archivo
upload_to_gcs(bucket_name, pipeline_file, destination_blob_name)

Archivo pipeline_prediction.json subido a financiera_oh_segmentacion/pipeline_prediction/pipeline_prediction.json en el bucket testing-mlp12.


In [42]:
aiplatform.init(project="mlops12-469915", location="us-central1")

In [43]:
job = aiplatform.PipelineJob(
    display_name="pipeline de prueba",
    template_path="gs://testing-mlp12/financiera_oh_segmentacion/pipeline_prediction/pipeline_prediction.json",
    enable_caching=True,
    project="mlops12-469915",
    location="us-central1",
    parameter_values={"project": "mlops12-469915",
                      "source_x_train_table": "mlops12-469915.mlops12.xtrain",
                      "input_age": 60
                      
                     }
    #labels={"module": "ml", "application": "app", "chapter": "mlops", "company": "datapat", "environment": "dev", "owner": "xxxx"}
)

print('submit pipeline job ...')
job.submit(service_account="mlops-process@mlops12-469915.iam.gserviceaccount.com")

submit pipeline job ...
Creating PipelineJob
PipelineJob created. Resource name: projects/534043014924/locations/us-central1/pipelineJobs/pipeline-prediction-model-20250924005450
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/534043014924/locations/us-central1/pipelineJobs/pipeline-prediction-model-20250924005450')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-prediction-model-20250924005450?project=534043014924
