In [1]:
# !pip install --upgrade google-cloud-aiplatform \
#                         google-cloud-storage \
#                         kfp \
#                         google-cloud-pipeline-components

In [2]:
!python -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 2.11.0


In [3]:
PROJECT_ID = "mimetic-card-436014-r9"  

# !gcloud config set project {PROJECT_ID}

In [4]:
REGION = "europe-west4"

In [5]:
# !gcloud auth login

## Create a Cloud Storage bucket

In [6]:
BUCKET_URI = "gs://price_lingerie_predict_eu"

In [7]:
# !gsutil mb -l {REGION} {BUCKET_URI}

In [8]:
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline/"

In [9]:
SERVICE_ACCOUNT = '93305744778-compute@developer.gserviceaccount.com'

## Otro

## Librerías

In [10]:
from typing import NamedTuple
import typing
import json
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        component,
                        OutputPath,
                        InputPath)

from kfp.v2 import compiler
from google.cloud import bigquery
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

  from kfp.v2 import dsl


## Leer data y dividir entrenamiento/prueba

In [12]:
@component(
    packages_to_install=["pandas",
                         "google-cloud-storage",
                         "scikit-learn==1.5.2"],
    base_image="python:3.9",

)
def get_data_storage(
    path: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
) -> NamedTuple("GetDataOutput",[("product_category_dict",str),("brand_name_dict",str),("color_dict",str)]):
    
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from google.cloud import storage
    import json

    client = storage.Client()
    bucket_name, blob_name = path.replace("gs://", "").split("/", 1)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    temp_file = "/tmp/temp.csv"
    blob.download_to_filename(temp_file)

    df = pd.read_csv(temp_file)

    train, test = train_test_split(df,test_size=0.3)

    product_category_dict = dict()
    brand_name_dict = dict()
    color_dict = dict()

    for i,j in enumerate(train['product_category'].unique().tolist()):
        product_category_dict[j] = i
    train['product_category'] = train['product_category'].map(product_category_dict)
    test['product_category'] = test['product_category'].map(product_category_dict)

    for i,j in enumerate(train['brand_name'].unique().tolist()):
        brand_name_dict[j] = i
    train['brand_name'] = train['brand_name'].map(brand_name_dict)
    test['brand_name'] = test['brand_name'].map(brand_name_dict)

    for i,j in enumerate(train['color'].unique().tolist()):
        color_dict[j] = i
    train['color'] = train['color'].map(color_dict)
    test['color'] = test['color'].map(color_dict)

    train.to_csv(dataset_train.path + ".csv", index=False)
    test.to_csv(dataset_test.path + ".csv", index=False)

    return (json.dumps(product_category_dict),json.dumps(brand_name_dict),json.dumps(color_dict))
    

## Entrenamiento

In [13]:
@component(
    packages_to_install=["scikit-learn==1.3.2",
                         "numpy==1.22.4",
                         "pandas"],
    base_image="python:3.10",  
)
def train_model(
    dataset: Input[Dataset],
    model: Output[Model],
):

    import pandas as pd
    import joblib
    from sklearn.ensemble import RandomForestRegressor

    data = pd.read_csv(dataset.path +".csv")
    model_rf = RandomForestRegressor(max_depth=30,
                                  min_samples_split=6,
                                  n_estimators=200,
                                  random_state=42)
    
    model_rf.fit(
        data.drop(columns='price'),
        data.price
    )

    score = model_rf.score(
        data.drop(columns='price'),
        data.price
    )

    model.metadata["train_score"] = float(score)
    model.metadata["framework"] = 'RF'

    joblib.dump(model_rf,model.path + ".joblib")


## Evaluación

In [14]:
@component(
    packages_to_install=["scikit-learn==1.3.2",
                         "numpy==1.22.4",
                         "pandas"],
    base_image="python:3.10",
)
def eval_model(
    test_set: Input[Dataset],
    rf_model: Input[Model],
    metrics: Output[Metrics]
) -> NamedTuple("EvalModelOutput",[("deploy",str)]):
    
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    import pandas as pd
    import joblib

    data = pd.read_csv(test_set.path + '.csv')
    model_path = rf_model.path + ".joblib"
    model = joblib.load(model_path)

    X_test = data.drop(columns='price')
    y_test = data.price
    y_pred = model.predict(X_test)

    rmse = mean_squared_error(y_test, y_pred,squared=False)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    metrics.log_metric('rmse', rmse)
    metrics.log_metric('mae', mae)
    metrics.log_metric('r2', r2)

    deploy = 'true'

    return(deploy,)

## Despliegue

In [15]:
@component(
    packages_to_install=["google-cloud-aiplatform",
                         "kfp",
                         "scikit-learn==1.3.2"],
    base_image="python:3.10",
)
def depl_model(
    model: Input[Model],
    project: str,
    region: str,
    serving_container_image_uri: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    import os
    aiplatform.init(project=project, location=region)

    DISPLAY_NAME = "lingerieprice"
    ENDPOINT_NAME = "lingerieprice-endpoint"

    path, _ = os.path.split(model.uri)

    upload_model = aiplatform.Model.upload(
        display_name = DISPLAY_NAME,
        artifact_uri = path,
        serving_container_image_uri = serving_container_image_uri,  
    )

    def crear_endpoint():
        endpoints = aiplatform.Endpoint.list(
                filter=f'display_name="{ENDPOINT_NAME}"',
                order_by="create_time desc",
                project=project,
                location=region)
        
        if len(endpoints) > 0:
            return endpoints[0]
        else:
            return aiplatform.Endpoint.create(
                display_name=ENDPOINT_NAME,
                project=project,
                location=region)
        
    endpoint = crear_endpoint()

    deploy_model =  upload_model.deploy(
        machine_type = "n1-standard-4",
        endpoint = endpoint,
        min_replica_count=1, 
        max_replica_count=3,   
        deployed_model_display_name = DISPLAY_NAME
    )

    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deploy_model.resource_name


## Pipeline

In [18]:
PIPELINE_ROOT

'gs://price_lingerie_predict_eu/pipeline/'

In [19]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name= "pipeline-pricelignerie-2"
)
def pipeline(
    path: str = 'gs://price_lingerie_predict_eu/data/DATA_MODELO.csv',
    project: str = PROJECT_ID,
    region: str = REGION,
    serving_container_image_uri: str = 'europe-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest'
    
):
    
    data_op = get_data_storage(path=path)
    train_model_op = train_model(dataset=data_op.outputs['dataset_train'])
    model_evaluation_op = eval_model(
        test_set=data_op.outputs['dataset_test'],
        rf_model=train_model_op.outputs['model']
    )

    with dsl.Condition(
        model_evaluation_op.outputs['deploy'] == "true",
        name="deploy"
    ):
        deploy_model_op = depl_model(
            model=train_model_op.outputs['model'],
            project=project,
            region=region,
            serving_container_image_uri=serving_container_image_uri
        )

  with dsl.Condition(


## Compilar y ejecutar pipeline

In [20]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="model_pipeline.json"
)

In [21]:
aiplatform.init(project=PROJECT_ID, location=REGION)

In [22]:
job = pipeline_jobs.PipelineJob(
    display_name="lingerieprice-pipeline",
    template_path="model_pipeline.json",
    enable_caching=True
)

In [23]:
job.run(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/93305744778/locations/europe-west4/pipelineJobs/pipeline-pricelignerie-2-20250205225347
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/93305744778/locations/europe-west4/pipelineJobs/pipeline-pricelignerie-2-20250205225347')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west4/pipelines/runs/pipeline-pricelignerie-2-20250205225347?project=93305744778
PipelineJob projects/93305744778/locations/europe-west4/pipelineJobs/pipeline-pricelignerie-2-20250205225347 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/93305744778/locations/europe-west4/pipelineJobs/pipeline-pricelignerie-2-20250205225347 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/93305744778/locations/europe-west4/pipelineJobs/pipeline-pricelignerie-2-20250205225347 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob proje

------