In [1]:
USER_FLAG = "--user"

In [2]:
!pip3 install {USER_FLAG} google-cloud-aiplatform --upgrade
!pip3 install {USER_FLAG} kfp==2.2.0 --upgrade



In [5]:
import matplotlib.pyplot as plt
import pandas as pd

#from kfp.v2 import compiler, dsl
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath

from google.cloud import aiplatform

# We'll use this namespace for metadata querying
from google.cloud import aiplatform_v1

In [6]:
import os
PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  fabian260497


In [7]:
BUCKET_URI="gs://bucket-fabian-2"

In [8]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_URI}/vertexai_pipeline_prueba/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://bucket-fabian-2/vertexai_pipeline_prueba/'

In [9]:
#First Component in the pipeline to fetch data from big query.
@component
def get_data_bigquery(
    bq_table: str,
    output_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    import pandas as pd

    bqclient = bigquery.Client()
    table = bigquery.TableReference.from_string(
        bq_table
    )
    rows = bqclient.list_rows(
        table
    )
    dataframe = rows.to_dataframe(
        create_bqstorage_client=True,
    )
    dataframe.to_csv(output_data_path)

In [22]:
@component
def extract_csv_from_gcs(
    bucket_name: str,
    file_path: str,
    secret_sa: str,
    output_data_path: OutputPath("Dataset")
):
    import io
    import pandas as pd
    from google.cloud import storage
    
    # Cargar las credenciales del secreto
    with open(secret_sa, "r") as secret_file:
        secret_data = json.load(secret_file)

    # Reemplaza 'mi-bucket' con el nombre de tu bucket y 'archivo/datos.csv' con la ruta del archivo en el bucket
    bucket_name = bucket_name #"bucket-fabian-2"
    blob_name = file_path #'data_prueba/iris_dataset_2.csv'

    #credenciales_json = 'ruta-a-tu-archivo-de-credenciales.json'
    #storage_client = storage.Client.from_service_account_json(credenciales_json)

    #storage_client = storage.Client()
    storage_client = storage.Client.from_service_account_info(secret_data)

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    # Lee los datos del Blob en un objeto bytes
    datos_bytes = blob.download_as_bytes()

    # Convierte los datos bytes en un DataFrame de pandas
    data_frame = pd.read_csv(io.BytesIO(datos_bytes))
    data_frame.to_csv(output_data_path)

In [11]:
#Second component in the pipeline to train the classification model using decision Trees or Randomforest
@component
def training_classmod(
    data: Input[Dataset],
    metrics: Output[Metrics],
    model: Output[Model]
):
    
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from joblib import dump
    from sklearn.metrics import confusion_matrix
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    
    # 1. Cargar el conjunto de datos Iris
    iris_df=pd.read_csv(data1.path)
    
    # 2. Preprocesamiento de datos
    X = iris_df.drop("target", axis=1)
    y = iris_df["target"]
    

    # Dividir los datos en conjuntos de entrenamiento y prueba
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=10)
    
    #Entrenar el modelo
    model_classifier = DecisionTreeClassifier()
    #model_classifier = RandomForestClassifier()
    model_classifier.fit(X_train,y_train)
    
    #Evaluar el modelo
    y_pred=model_classifier.predict(X_test)
    accuracy = model_classifier.score(X_test,y_test)
    print('accuracy is:',accuracy)
    
    metrics.log_metric("accuracy",(accuracy * 100.0))
    metrics.log_metric("model", "Decision tree")
    #metrics.log_metric("model", "RandomForest")
    
    #Guardamos el modelo entrenado
    dump(model_classifier, model.path + ".joblib")

In [12]:
@component
def model_deployment(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    deployed_model = aiplatform.Model.upload(
        display_name="custom-model-pipeline", #nombre del modelo en Vertex AI
        artifact_uri = model.uri.replace("model", ""),  #URI del artefacto del modelo
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest" #especifica la imagen del contenedor de servicio que se utilizará para el despliegue del modelo. En este caso, se utiliza una imagen de contenedor de predicción de scikit-learn.
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-1") #despliegue de modelo. El modelo cargado se despliega como un punto final (endpoint) que puede utilizarse para realizar inferencias (predicciones).

    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name #las URI de recursos del endpoint
    vertex_model.uri = deployed_model.resource_name #modelo desplegado en Vertex AI

In [13]:
@component
def generate_predictions(
    input_data: Input[Dataset],
    model: Input[Model],
    output_predictions: Output[Dataset]
):
    import pandas as pd
    from joblib import load  # Para cargar el modelo
    from sklearn.preprocessing import LabelEncoder

    # Cargar el modelo desde Vertex AI
    deployed_model = load(model.path + ".joblib")

    # Realizar las predicciones en tus datos de entrada (input_data)
    input_df = pd.read_csv(input_data.path)
    df=input_df.head(50) # usamos la misma data pero solo los 50 primeros datos
    
    predictions_df=df.drop("target", axis=1) #obtenemos solo los features
    
    predictions_y = deployed_model.predict(predictions_df) #hacemos las predicciones
    
    #concatenamos las predicciones a las otras columnas para crear un dataframe y subirlo a bigquery
    predictions_df['predicciones'] = predictions_y
    
    predictions_df.to_csv(output_predictions.path, index=False)

In [14]:
@component
def upload_to_bigquery(
    input_data: Input[Dataset],  # Input de tipo Dataset
    secret_file_path:str,
    bq_project: str,            # Proyecto de BigQuery
    bq_dataset: str,            # Conjunto de datos de BigQuery
    bq_table: str,              # Nombre de la tabla de BigQuery
):
    from google.cloud import bigquery
    import json
    
    # Cargar las credenciales del secreto
    with open(secret_file_path, "r") as secret_file:
        secret_data = json.load(secret_file)
    
    # Inicializa el cliente de BigQuery
    #bq_client = bigquery.Client()
    bq_client = bigquery.Client.from_service_account_info(secret_data)

    # Obtén la ruta del conjunto de datos y la tabla de BigQuery
    dataset_uri = input_data.uri
    table_uri = f"bq://{bq_project}.{bq_dataset}.{bq_table}"

    # Lee el esquema de la tabla de BigQuery desde un archivo JSON o una cadena
    schema=[
        bigquery.SchemaField("sepal_length", bigquery.enums.SqlTypeNames.NUMERIC, mode="NULLABLE"),
        bigquery.SchemaField("sepal_width", bigquery.enums.SqlTypeNames.NUMERIC, mode="NULLABLE"),
        bigquery.SchemaField("petal_length", bigquery.enums.SqlTypeNames.NUMERIC, mode="NULLABLE"),
        bigquery.SchemaField("petal_width", bigquery.enums.SqlTypeNames.NUMERIC, mode="NULLABLE"),
        bigquery.SchemaField("target", bigquery.enums.SqlTypeNames.NUMERIC, mode="NULLABLE"),
    ]
    try:
        schema = json.loads(bq_schema)
    except json.JSONDecodeError:
        raise ValueError("El esquema debe estar en formato JSON válido.")

    # Carga los datos en la tabla de BigQuery
    job_config = bigquery.LoadJobConfig(schema=schema)
    load_job = bq_client.load_table_from_uri(dataset_uri, table_uri, job_config=job_config)

    # Espera a que se complete la carga
    load_job.result()

In [25]:
@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="custom-pipeline"
)
def pipeline(
    #bq_table: str = "",
    bucket_name: str = "",
    file_path: str = "",
    output_data_path: str = "data.csv",
    bq_project: str = "",           # Proyecto de BigQuery
    bq_dataset: str = "",            # Conjunto de datos de BigQuery
    bq_table: str = "",              #Tabla de BQ
    secret_file_path: str = "",
    project: str = PROJECT_ID,
    region: str = REGION
):  
    #dataset_task = get_data_bigquery(bq_table=bq_table)
    #step 1 = Leer los datos desde el bucket de GCS.
    dataset_task = extract_csv_from_gcs(bucket_name=bucket_name, file_path=file_path, secret_sa=secret_file_path)
    
    #No es necesario realizar limpieza alguna
    
    #step 2 = Divide los datos en conjuntos de entrenamiento y prueba, entrena un modelo de machine learning
    model_task = training_classmod(data=dataset_task.output)
    
    #step 3 = Despliega el modelo
    deploy_task = model_deployment(model=model_task.outputs["model"],project=project,region=region)
    
    #step 4 = usaremos el modelo para hacer predicciones
    dataset_task_2 = extract_csv_from_gcs(bucket_name= bucket_name,file_path= file_path) #datos para las predicciones
    predict_task = generate_predictions(input_data=dataset_task_2.output,model=deploy_task.outputs["vertex_model"])
    
    #step 5 = Almacenar las predicciones en bigquery
    load_task=upload_to_bigquery(input_data=predict_task.output,secret_file_path=secret_file_path,bq_project=bq_project,bq_dataset=bq_dataset,bq_table=bq_table)
     

TypeError: extract-csv-from-gcs() missing 1 required argument: secret_sa.

In [19]:
#Creamos la instancia del compilador de Vertex AI Pipelines
compiler.Compiler().compile(pipeline_func=pipeline, package_path="custom-pipeline-classifier.json") #->ruta donde se guardará el resultado de la compilación

ValueError: Unsupported pipeline_func type. Expected subclass of `base_component.BaseComponent` or `Callable` constructed with @dsl.pipeline decorator. Got: <class 'function'>

In [20]:
run1 = aiplatform.PipelineJob( #->creamos una instancia de un job de Pipeline
    display_name="custom-training-vertex-ai-pipeline", #Nombre del pipeline
    template_path="custom-pipeline-classifier.json", #la ruta del archivo json que se generó utilizando el compilador, contiene la definición del pipeline compilado
    job_id="custom-pipeline-7", #identificador único para el job del pipeline
    #parameter_values={"bq_table": "fabian260497.dataset_prueba.iris"},
    parameter_values={"bucket_name": "bucket-fabian-2","file_path":"data_prueba/iris_dataset_2.csv","bq_project":"fabian260497","bq_dataset":"dataset_prueba" ,"bq_table":"iris","secret_file_path":"etc/secrets/secret-sa.json"},
    enable_caching=True,)

ValueError: The pipeline parameter secret_file_path is not found in the pipeline job input definitions.

In [15]:
run1.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/290399324087/locations/us-central1/pipelineJobs/custom-pipeline-7
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/290399324087/locations/us-central1/pipelineJobs/custom-pipeline-7')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-pipeline-7?project=290399324087


In [22]:
"""import io
import pandas as pd
from google.cloud import storage

# Reemplaza 'mi-bucket' con el nombre de tu bucket y 'archivo/datos.csv' con la ruta del archivo en el bucket
bucket_name = "bucket-fabian-2"
blob_name = 'data_prueba/iris_dataset_2.csv'

#credenciales_json = 'ruta-a-tu-archivo-de-credenciales.json'
#storage_client = storage.Client.from_service_account_json(credenciales_json)

storage_client = storage.Client()

bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)

# Lee los datos del Blob en un objeto bytes
datos_bytes = blob.download_as_bytes()

# Convierte los datos bytes en un DataFrame de pandas
data_frame = pd.read_csv(io.BytesIO(datos_bytes))
df= data_frame.head(50)
df.to_csv("prueba.csv")"""