In [1]:
import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Input, Output, component, Dataset)
from typing import NamedTuple

In [3]:
@component(
    packages_to_install=[
    "google-cloud-bigquery",
    "google-cloud-bigquery-storage",
    "pandas",
    "db-dtypes",
    "pyarrow"
],
)

def process_data(
    project: str,
    source_x_train_table: str,
    features_table: str,
    data: Output[Dataset],
):
    
    from google.cloud import bigquery
    import pandas as pd
    import pyarrow.parquet as pq
    
    client = bigquery.Client(project = project)
    
    x_train = client.query(
    '''SELECT * FROM `{dsorce_table}`'''.format(dsorce_table = source_x_train_table)).to_dataframe()
    

    features = client.query(
    '''SELECT * FROM `{dsorce_table}`'''.format(dsorce_table = features_table))
    
    df = features.to_dataframe()
    
    features = df["string_field_0"].tolist()
    
    x_train = x_train[features]
    
    x_train.to_parquet(f'{data.path}.parquet', engine='pyarrow', index=False)

In [70]:
@component(
    packages_to_install=[
    "google-cloud-bigquery",
    "google-cloud-bigquery-storage",
    "pandas",
    "db-dtypes",
    "scikit-learn",
    "joblib",
    "pandas-gbq",
    "google-cloud-storage",
    "pytz"
],
)

def prediction(
    project: str,
    source_x_train_table: str,
    features_table: str,
    table_id: str,
    path_model: str,
):

    import sys
    from datetime import datetime
    import pandas as pd
    from google.cloud import bigquery
    from google.cloud import storage
    import pandas_gbq
    from joblib import load
    from io import BytesIO
    from pytz import timezone
    
    TZ = timezone("America/Lima")
    FORMAT_DATE = "%Y-%m-%d"
    
    client = bigquery.Client(project = project)
    
    x_train = client.query(
    '''SELECT * FROM `{dsorce_table}`'''.format(dsorce_table = source_x_train_table)).to_dataframe()
    
    features = client.query(
    '''SELECT * FROM `{dsorce_table}`'''.format(dsorce_table = features_table)).to_dataframe()
    
    
    features = features["string_field_0"].tolist()
    
    x_train = x_train[features]
    
    
    
    def generate_datetime_created():
        return datetime.now()
    
    
    def generate_date_created():
        return datetime.now(TZ).date().strftime(FORMAT_DATE)
    
    
    def load_model_from_gcs(path_model):
        storage_client = storage.Client()
 
        bucket_name, bloab_name = path_model.replace("gs://", "").split("/", 1)
        
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(bloab_name)
        model_bytes = blob.download_as_string()
        
        classifier = load(BytesIO(model_bytes))
        
        return classifier
    
    
    classifier = load_model_from_gcs(path_model)
    
    predictions = classifier.predict(x_train)
    predictions = pd.DataFrame(predictions, columns = ['prediction'])
    
    user_id = client.query("SELECT SESSION_USER()").to_dataframe().iloc[0,0]
    
    start_time = generate_datetime_created()
    execute_date = generate_date_created()
    
    
    predictions["creation_user"] = user_id
    predictions["process_date"] = datetime.strptime(execute_date, '%Y-%m-%d')
    predictions["process_date"] = pd.to_datetime(predictions['process_date']).dt.date
    predictions["load_date"] = pd.to_datetime(start_time)
    
    pandas_gbq.to_gbq(predictions, table_id, if_exists = 'append', project_id = project)
    
    print("prediction done")
    

    

In [71]:
@kfp.dsl.pipeline(
    name="pipeline-training-model",
    description="intro",
    pipeline_root="gs://vertex_mlops"
)


def main_pipeline(
    project: str,
    source_x_train_table: str,
    features_table: str,
    table_id: str,
    path_model: str,
    gcp_region: str = "us-central1",
):

    prediction_task = prediction(
        project = project,
        source_x_train_table = source_x_train_table,
        features_table = features_table,
        table_id = table_id,
        path_model = path_model,
    )
    prediction_task.set_display_name("PREDICTION_MODEL")
    

In [72]:
from kfp.v2 import compiler as v2_compiler


v2_compiler.Compiler().compile(
    pipeline_func=main_pipeline,
    package_path="pipeline_prediction_model.json"
)

In [73]:
from google.cloud import storage

In [74]:
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"Archivo {source_file_name} subido a {destination_blob_name} en el bucket {bucket_name}.")

# Define las variables
bucket_name = "vertex_mlops"
destination_blob_name = "pipeline_prediction/pipeline_prediction_model.json"
pipeline_file = "pipeline_prediction_model.json"
# Llamar a la función para subir el archivo
upload_to_gcs(bucket_name, pipeline_file, destination_blob_name)

Archivo pipeline_prediction_model.json subido a pipeline_prediction/pipeline_prediction_model.json en el bucket vertex_mlops.


In [67]:
from google.cloud import aiplatform

In [68]:
aiplatform.init(project="trim-odyssey-390415", location="us-central1")

In [77]:


job = aiplatform.PipelineJob(

    display_name="pipeline de prediccion del modelo",
    template_path="gs://vertex_mlops/pipeline_prediction/pipeline_prediction_model.json",
    enable_caching=False,
    project="trim-odyssey-390415",
    location="us-central1",
    parameter_values={"project": "trim-odyssey-390415",
                      "source_x_train_table": "trim-odyssey-390415.laybarm.xtrain",
                      "features_table": "trim-odyssey-390415.laybarm.selected_features",
                      "path_model": "gs://vertex_mlops/demo/data/model/model.joblib",
                      "table_id": "trim-odyssey-390415.laybarm.predictions"
    }

)

print('submit pipeline job ...')

job.submit("dev-mlops-vertex@trim-odyssey-390415.iam.gserviceaccount.com")

submit pipeline job ...
Creating PipelineJob
PipelineJob created. Resource name: projects/668142834453/locations/us-central1/pipelineJobs/pipeline-training-model-20240817161222
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/668142834453/locations/us-central1/pipelineJobs/pipeline-training-model-20240817161222')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-training-model-20240817161222?project=668142834453
