In [5]:
from kfp import dsl
from kfp.dsl import Output, Dataset

# PRIMEIRO COMPONENTE --> LOAD DATA
@dsl.component(
    base_image="python:3.10",
    packages_to_install=["pandas", "gcsfs"]
)
def load_train_data(
    train_path: str,
    train_csv: Output[Dataset]
):
    import pandas as pd
    df = pd.read_csv(train_path)
    df.to_csv(train_csv.path, index=False)

In [6]:
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Artifact


# SEGUNDO COMPONENTE DIVISÃO EM X E Y
@dsl.component(
    base_image="python:3.10",
    packages_to_install=["pandas", "scikit-learn", "gcsfs"]
)
def preprocess_encode(
    train_csv: Input[Dataset],       # entrada: CSV do componente anterior
    X_csv: Output[Dataset],          # saída: features
    y_csv: Output[Dataset],          # saída: target
    encoders_json: Output[Artifact]  # saída: dicionário dos encoders
):
    import pandas as pd
    from sklearn.preprocessing import LabelEncoder
    import json

    print(f"Lendo dataset: {train_csv.path}")
    df = pd.read_csv(train_csv.path)

    target = "accident_risk"
    categorical_cols = [
        "road_type", "lighting", "weather", "time_of_day",
        "holiday", "school_season", "public_road", "road_signs_present"
    ]

    X = df.drop(columns=[target])
    y = df[[target]]

    encoders = {}
    for col in categorical_cols:
        le = LabelEncoder()
        X[col] = le.fit_transform(X[col].astype(str))
        encoders[col] = dict(
            zip(le.classes_.tolist(), le.transform(le.classes_).tolist())
        )

    print("Salvando arquivos de saída...")
    X.to_csv(X_csv.path, index=False)
    y.to_csv(y_csv.path, index=False)
    with open(encoders_json.path, "w") as f:
        json.dump(encoders, f)

    print("✅ Pré-processamento concluído!")

In [7]:
# TERCEIRO COMPONENTE DIVISÃO DOS ARQUIVOS

from kfp import dsl
from kfp.dsl import Input, Output, Dataset

@dsl.component(
    base_image="python:3.10",
    packages_to_install=["pandas", "scikit-learn", "gcsfs"]
)
def train_val_split(
    X_csv: Input[Dataset],
    y_csv: Input[Dataset],
    X_train_csv: Output[Dataset],
    X_val_csv: Output[Dataset],
    y_train_csv: Output[Dataset],
    y_val_csv: Output[Dataset],
    test_size: float = 0.2,
    random_state: int = 42,
):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    print(f"Lendo dados de entrada:")
    print(f" - X: {X_csv.path}")
    print(f" - y: {y_csv.path}")

    X = pd.read_csv(X_csv.path)
    y = pd.read_csv(y_csv.path)

    print(f"Tamanho inicial: X={X.shape}, y={y.shape}")

    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )

    print(f"Salvando artefatos de saída...")
    X_train.to_csv(X_train_csv.path, index=False)
    X_val.to_csv(X_val_csv.path, index=False)
    y_train.to_csv(y_train_csv.path, index=False)
    y_val.to_csv(y_val_csv.path, index=False)

    print(f"✅ Divisão concluída! Treino: {X_train.shape}, Validação: {X_val.shape}")

In [11]:
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Artifact, Model, Metrics

# --- DEFINIÇÃO DOS COMPONENTES ---
# ... (código existente) ...
import os
import json
import numpy as np
import pandas as pd
import joblib
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
# Removida a importação de 'google.cloud.aiplatform' pois não é mais necessária


@dsl.component(
    base_image="python:3.10",
    packages_to_install=["pandas", "xgboost", "scikit-learn", "joblib", "gcsfs"]
)
def train_model(
    X_train_csv: Input[Dataset],
    y_train_csv: Input[Dataset],
    model_output: Output[Model],
    train_metrics: Output[Metrics],  # Alterado de dsl.OutputPath(str)
    n_estimators: int = 200,
    learning_rate: float = 0.1,
    max_depth: int = 5,
    subsample: float = 0.8,
    colsample_bytree: float = 0.8,
    random_state: int = 42,
):
    """
    Treina um modelo XGBoost Regressor e salva o artefato do modelo
    e as métricas de treino.
    """
    # --- IMPORTAÇÕES NECESSÁRIAS DENTRO DO COMPONENTE ---
    import os
    import json
    import numpy as np
    import pandas as pd
    import joblib
    from xgboost import XGBRegressor
    from sklearn.metrics import mean_squared_error, r2_score
    # ---------------------------------------------------

    try:
        print("Iniciando o componente train_model...")

        # 1) lê dados
        print(f"Lendo X_train de: {X_train_csv.path}")
        X_train = pd.read_csv(X_train_csv.path)
        print(f"Lendo y_train de: {y_train_csv.path}")
        y_train = pd.read_csv(y_train_csv.path).values.ravel()
        print("Dados de treino lidos com sucesso.")

        # 2) treina
        print("Iniciando o treinamento do modelo XGBRegressor...")
        model = XGBRegressor(
            n_estimators=n_estimators,
            learning_rate=learning_rate,
            max_depth=max_depth,
            subsample=subsample,
            colsample_bytree=colsample_bytree,
            random_state=random_state,
            n_jobs=-1,
        )
        model.fit(X_train, y_train)
        print("Treinamento concluído.")

        # 3) salva modelo
        model_save_path = model_output.path + "/model.joblib"
        
        # Garante que o diretório de saída exista antes de salvar
        print(f"Garantindo que o diretório de saída exista: {model_output.path}")
        os.makedirs(model_output.path, exist_ok=True) # <-- CORREÇÃO 1

        print(f"Salvando o modelo em: {model_save_path}")
        joblib.dump(model, model_save_path)
        print("Modelo salvo com sucesso.")

        # 4) métricas simples no treino (só para tracking)
        print("Calculando métricas de treino...")
        y_pred_tr = model.predict(X_train)
        rmse_tr = float(np.sqrt(mean_squared_error(y_train, y_pred_tr)))
        r2_tr = float(r2_score(y_train, y_pred_tr))
        
        metrics_data = {"rmse_train": rmse_tr, "r2_train": r2_tr}
        print(f"Métricas de treino: {metrics_data}")

        # Garante que o diretório para o arquivo de métricas exista
        # Usa .path pois agora é um Artefato
        print(f"Garantindo que o diretório de métricas exista: {os.path.dirname(train_metrics.path)}")
        os.makedirs(os.path.dirname(train_metrics.path), exist_ok=True)

        # Usa .path pois agora é um Artefato
        with open(train_metrics.path, "w") as f:
            json.dump(metrics_data, f, indent=4)
        
        print("Métricas de treino salvas. Componente train_model concluído.")

    except Exception as e:
        print(f"Erro no componente train_model: {e}")
        raise



In [12]:
@dsl.component(
    base_image="python:3.10",
    packages_to_install=["pandas", "xgboost", "scikit-learn", "joblib", "gcsfs"]
)
def evaluate_model(
    model_input: Input[Model],
    X_val_csv: Input[Dataset],
    y_val_csv: Input[Dataset],
    eval_metrics: Output[Metrics],      # Alterado de dsl.OutputPath(str)
    predictions_csv: Output[Dataset],  # Alterado de dsl.OutputPath(str)
):
    """
    Avalia um modelo treinado em dados de validação.
    """
    # --- IMPORTAÇÕES NECESSÁRIAS DENTRO DO COMPONENTE ---
    import os
    import json
    import numpy as np
    import pandas as pd
    import joblib
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

    try:
        print("Iniciando o componente evaluate_model...")

        # Carrega o modelo
        model_load_path = model_input.path + "/model.joblib"
        print(f"Carregando modelo de: {model_load_path}")
        model = joblib.load(model_load_path)
        print("Modelo carregado com sucesso.")

        # Carrega dados de validação
        print(f"Lendo X_val de: {X_val_csv.path}")
        X_val = pd.read_csv(X_val_csv.path)
        print(f"Lendo y_val de: {y_val_csv.path}")
        y_val = pd.read_csv(y_val_csv.path).values.ravel()
        print("Dados de validação lidos com sucesso.")

        # Avaliação
        print("Iniciando predições no conjunto de validação...")
        y_pred = model.predict(X_val)
        rmse = float(np.sqrt(mean_squared_error(y_val, y_pred)))
        mae = float(mean_absolute_error(y_val, y_pred))
        r2 = float(r2_score(y_val, y_pred))
        
        metrics_data = {"rmse": rmse, "mae": mae, "r2": r2}
        print(f"Métricas de avaliação: {metrics_data}")

        # Garante que o diretório para o arquivo de métricas exista
        # Usa .path pois agora é um Artefato
        print(f"Garantindo que o diretório de métricas exista: {os.path.dirname(eval_metrics.path)}")
        os.makedirs(os.path.dirname(eval_metrics.path), exist_ok=True)

        # Usa .path pois agora é um Artefato
        with open(eval_metrics.path, "w") as f:
            json.dump(metrics_data, f, indent=4)
        print("Métricas de avaliação salvas.")

        # Garante que o diretório para o arquivo de predições exista
        # Usa .path pois agora é um Artefato
        print(f"Garantindo que o diretório de predições exista: {os.path.dirname(predictions_csv.path)}")
        os.makedirs(os.path.dirname(predictions_csv.path), exist_ok=True)

        print(f"Salvando predições em: {predictions_csv.path}")
        # Usa .path pois agora é um Artefato
        pd.DataFrame({"y_true": y_val, "y_pred": y_pred}).to_csv(predictions_csv.path, index=False)
        print("Predições salvas. Componente evaluate_model concluído.")

    except Exception as e:
        
        raise

In [13]:
# Imports necessários para a *definição* do componente (ficam fora)
from kfp.dsl import (
    Input, 
    Model, 
    Metrics, 
    component
)
@component(
    base_image="python:3.10",
    packages_to_install=[
        "google-cloud-aiplatform",  # SDK principal do Vertex AI
        "gcsfs",
        "pandas",
        "joblib"
    ]
)
def register_model_in_registry(
    model_input: Input[Model],
    eval_metrics: Input[Metrics],
    project_id: str,
    project_location: str,
    model_display_name: str,      # Nome do modelo no Registry (ex: "accident-risk-model")
):
    """
    Registra um modelo treinado no Vertex AI Model Registry,
    anexando as métricas de avaliação como labels.
    """
    
    # Imports necessários para a *execução* do componente (ficam dentro)
    import json
    from google.cloud import aiplatform

    try:
        print(f"Iniciando o registro do modelo: {model_display_name}")
        print(f"Projeto: {project_id}, Local: {project_location}")
        
        # 1. Inicializa o cliente do Vertex AI
        aiplatform.init(project=project_id, location=project_location)

        # 2. Lê as métricas do arquivo JSON
        # Usa .path pois agora é um Artefato
        print(f"Lendo métricas de: {eval_metrics.path}")
        with open(eval_metrics.path, "r") as f:
            metrics_data = json.load(f)
        
        # Converte métricas em 'labels'
        labels_for_registry = {}
        for key, value in metrics_data.items():
            # Garante que a chave seja compatível com labels (minúsculas, sem caracteres especiais)
            safe_key = ''.join(c for c in key.lower() if c.isalnum() or c == '_')
            # Garante que o valor seja compatível (string, substitui '.' por '_')
            label_value = str(value).replace('.', '_').lower()
            labels_for_registry[safe_key] = label_value

        print(f"Métricas convertidas para labels: {labels_for_registry}")

        # 3. Define o contêiner de predição
        PREBUILT_CONTAINER_URI = "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest"
        print(f"Usando contêiner de predição: {PREBUILT_CONTAINER_URI}")

        # 4. Faz o upload (registro) do modelo
        print(f"Fazendo upload do modelo a partir de: {model_input.path}")
        
        registered_model = aiplatform.Model.upload(
            display_name=model_display_name,
            artifact_uri=model_input.path,
            serving_container_image_uri=PREBUILT_CONTAINER_URI,
            description=f"Versão registrada via KFP pipeline.",
            labels=labels_for_registry
        )

        print(f"Modelo registrado com sucesso!")
        print(f"Nome do recurso: {registered_model.resource_name}")
        print(f"Versão: {registered_model.version_id}")

    except Exception as e:
        print(f"Erro ao registrar o modelo: {e}")
        raise

In [14]:
# CRIAÇÃO DO PIPELINE (VERSÃO ATUALIZADA)
@dsl.pipeline(
    name="accident-risk-pipeline",
    description="Pipeline com leitura, preprocessamento, split, treino, avaliação E REGISTRO"
)
def accident_risk_pipeline(
    train_path: str = "gs://road_accident/train.csv",
    project_id: str = "vertexai-457414", # Adicione seu Project ID
    project_location: str = "us-central1", # Adicione sua Localização (ex: "us-central1")
    model_name: str = "accident-risk-model" # Nome que aparecerá no Registry
):
    """
    Pipeline completo que treina, avalia e registra
    o modelo no Vertex AI Model Registry.
    """
    
    # NOTA: Esta pipeline assume que os componentes
    # 'load_train_data', 'preprocess_encode', e 'train_val_split'
    # estão definidos e importados em algum lugar.

    # 1) carregar dados
    load_task = load_train_data(train_path=train_path)
    load_task.set_caching_options(False)   # força reexecução do primeiro passo

    # 2) preprocessar e encodar
    preprocess_task = preprocess_encode(train_csv=load_task.outputs["train_csv"])

    # 3) split train/val
    split_task = train_val_split(
        X_csv=preprocess_task.outputs["X_csv"],
        y_csv=preprocess_task.outputs["y_csv"]
    )

    # 4) treinar modelo
    # Esta função está definida ACIMA neste arquivo
    train_task = train_model(
        X_train_csv=split_task.outputs["X_train_csv"],
        y_train_csv=split_task.outputs["y_train_csv"]
        # você pode expor hiperparâmetros aqui se quiser
    )

    # 5) avaliar modelo
    # Esta função está definida ACIMA neste arquivo
    evaluate_task = evaluate_model(
        model_input=train_task.outputs["model_output"],  # Do train_model
        X_val_csv=split_task.outputs["X_val_csv"],       # do split
        y_val_csv=split_task.outputs["y_val_csv"]        # do split
    )

    # 6) --- NOVO PASSO: REGISTRAR O MODELO ---
    # Esta função está definida ACIMA neste arquivo
    register_task = register_model_in_registry(
        model_input=train_task.outputs["model_output"],          # O modelo treinado
        eval_metrics=evaluate_task.outputs["eval_metrics"],  # O JSON com as métricas
        project_id=project_id,
        project_location=project_location,
        model_display_name=model_name
    )
    
    # Garante que o registro só ocorra APÓS a avaliação
    register_task.after(evaluate_task)



In [15]:
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=accident_risk_pipeline,
    package_path="accident_risk_pipeline.json"
)

In [None]:
from google.cloud import aiplatform

aiplatform.init(project="vertexai-457414", location="us-central1")

job = aiplatform.PipelineJob(
    display_name="accident-risk-pipeline-v1",
    template_path="accident_risk_pipeline.json",
    pipeline_root="gs://road_accident/pipelines"
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/666534829864/locations/us-central1/pipelineJobs/accident-risk-pipeline-20251023114635
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/666534829864/locations/us-central1/pipelineJobs/accident-risk-pipeline-20251023114635')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/accident-risk-pipeline-20251023114635?project=666534829864
PipelineJob projects/666534829864/locations/us-central1/pipelineJobs/accident-risk-pipeline-20251023114635 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/666534829864/locations/us-central1/pipelineJobs/accident-risk-pipeline-20251023114635 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/666534829864/locations/us-central1/pipelineJobs/accident-risk-pipeline-20251023114635 current state:
PipelineState.PIPELINE_STATE_RUNNING
