In [10]:
import pandas as pd

# Cargar el CSV
df = pd.read_csv('data/y_train/ytrain.csv')

df['SalePrice'] = df['SalePrice'].apply(lambda x: str(x).replace('.', ''))

df['SalePrice'] = df['SalePrice'].astype(int)

df.to_csv('data/pipeline_train_model_v1-1/ytrain.csv', index=False)

In [11]:
from google.cloud import bigquery
import os

# Cliente BigQuery
client = bigquery.Client()

project_id = "<project_id>"
dataset_id = "<dataset>"
folder_path = "data/pipeline_train_model_v1-1"

# Configurar la tabla de destino
dataset_ref = client.dataset(dataset_id)

# Iterar sobre los archivos en la carpeta
for file_name in os.listdir(folder_path):
    if file_name.endswith(".csv"):
        table_id = os.path.splitext(file_name)[0] 
        table_ref = dataset_ref.table(table_id)

        # Configurar el job de carga
        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.CSV,
            skip_leading_rows=1,
            autodetect=True,
        )

        csv_file_path = os.path.join(folder_path, file_name)

        # Leer el archivo CSV y cargarlo a BigQuery
        with open(csv_file_path, "rb") as source_file:
            load_job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

        print(f"Starting job {load_job.job_id} for {file_name}")
        load_job.result()  # Esperar a que termine el trabajo
        print(f"Job {load_job.job_id} finished for {file_name}")

        destination_table = client.get_table(table_ref)
        print(f"Loaded {destination_table.num_rows} rows into {table_ref}.")


Starting job 4b8c31dd-0927-4458-8496-952dfbc583ff for xtrain.csv
Job 4b8c31dd-0927-4458-8496-952dfbc583ff finished for xtrain.csv
Loaded 1314 rows into int-advanced-analytics-01.expl_research_laybarm.xtrain.
Starting job b70a4d54-c1c0-4d0a-8f63-fcdb729ff9bb for ytrain.csv
Job b70a4d54-c1c0-4d0a-8f63-fcdb729ff9bb finished for ytrain.csv
Loaded 1314 rows into int-advanced-analytics-01.expl_research_laybarm.ytrain.
Starting job 02cbc373-836b-49b3-a16a-bebc68df7bc1 for selected_features.csv
Job 02cbc373-836b-49b3-a16a-bebc68df7bc1 finished for selected_features.csv
Loaded 36 rows into int-advanced-analytics-01.expl_research_laybarm.selected_features.


In [47]:
import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component,Dataset)

In [48]:
@component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        "pandas",
        "db-dtypes",
        "pyarrow"
    ],
)
def process_data(
    project: str,
    source_x_train_table: str,
    features_table: str,
    dataset: Output[Dataset],
):
    
    import sys
    from google.cloud import bigquery
    import pandas as pd
    import pyarrow.parquet as pq

    client = bigquery.Client(project=project)
    
    
    X_train = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=source_x_train_table)).to_dataframe()
    
    features = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=features_table))
    
    df = features.to_dataframe()
    
    # Seleccionar características
    features = df["string_field_0"].tolist()

    X_train = X_train[features]

    X_train.to_parquet(f'{dataset.path}.parquet',engine='pyarrow', index=False)

In [49]:
@component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        "pandas",
        "scikit-learn",
        "joblib",
        "db-dtypes"
    ],
)
def train_model(
    project: str,
    source_y_train_table: str,
    inputd: Input[Dataset],
):
    
    import sys
    from google.cloud import bigquery
    from google.cloud import storage
    import pandas as pd
    from sklearn.linear_model import Lasso
    import pandas as pd
    import joblib

    client = bigquery.Client(project=project)
    

    Y_train = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=source_y_train_table)).to_dataframe()

    
    # Leer datos de Parquet del conjunto de datos de entrada   
    X_train = pd.read_parquet(f'{inputd.path}.parquet')
    
    
    # Configurar el modelo
    lin_model = Lasso(alpha=0.001, random_state=0)
    
    # Train model 
    lin_model.fit(X_train, Y_train)
    
    model_filename = 'lasso_model.joblib'
    joblib.dump(lin_model, model_filename)
    
    
    # Upload the model to GCS
    storage_client = storage.Client(project=project)
    bucket_name = "<bucket>"
    destination_blob_name = "demo/data/model/model.joblib"
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(model_filename)

In [50]:
@kfp.dsl.pipeline(
    name="pipeline-training-model", 
    description="intro",
    pipeline_root="gs://<bucket>/demo"
)
def main_pipeline(
    project: str,
    source_x_train_table: str,
    source_y_train_table: str,
    features_table: str,
    gcp_region: str = "us-central1",
):
    get_data = process_data(
        project = project,
        source_x_train_table = source_x_train_table,
        features_table = features_table
    )
    get_data.set_display_name("PROCESS_DATA")
    
    train = train_model(
        project = project,
        source_y_train_table = source_y_train_table,
        inputd = get_data.output
    ).after(get_data)
    train.set_display_name("TRAIN_MODEL")

In [51]:
compiler.Compiler().compile(
    pipeline_func=main_pipeline,
    package_path="pipeline_training.json"
)

In [52]:
aiplatform.init(project="<project_id>", location="us-central1")

In [53]:
job = aiplatform.PipelineJob(
    display_name="pipeline de prueba",
    template_path="pipeline_training.json",
    #job_id="mlops",
    enable_caching=False,
    project="<project_id>",
    location="us-central1",
    parameter_values={"project": "<project_id>", 
                      "source_x_train_table": "<project_id>.<dataset>.xtrain",
                      "source_y_train_table": "<project_id>.<dataset>.ytrain",
                      "features_table": "<project_id>.<dataset>.selected_features"
                     }
    #labels={"module": "ml", "application": "app", "chapter": "mlops", "company": "datapat", "environment": "dev", "owner": "xxxx"}
)

print('submit pipeline job ...')
job.submit(service_account="dev-dp-ml-vertex@<project_id>.iam.gserviceaccount.com")

submit pipeline job ...
Creating PipelineJob
PipelineJob created. Resource name: projects/624205664083/locations/us-central1/pipelineJobs/pipeline-training-model-20240624041020
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/624205664083/locations/us-central1/pipelineJobs/pipeline-training-model-20240624041020')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-training-model-20240624041020?project=624205664083
