In [None]:
!pip install kfp google-cloud-pipeline-components --upgrade
!pip install google-cloud-aiplatform
!pip install --upgrade google-cloud-storage

In [36]:
import kfp
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Markdown, HTML

# Defina a imagem base que contém as bibliotecas necessárias.
# Você pode criar uma imagem customizada ou usar uma pré-existente
# que tenha as dependências instaladas.
BASE_IMAGE = 'python:3.9'
PACKAGES_TO_INSTALL = [
    'google-cloud-bigquery[pandas]',
    'pandas',
    'statsmodels',
    'scikit-learn', # Usado para joblib
    'db-dtypes',
    'tabulate',
    'google-cloud-aiplatform',
    'google-cloud-storage'
]

PROJECT_ID = "project-poc-purple"
BUCKET_NAME=f"gs://000-pipelines-root_{PROJECT_ID}"

# Create bucket
PIPELINE_ROOT = f"{BUCKET_NAME}/insurance_risk_pricing_pipeline/"

#=========================================================================================
# COMPONENTE 1: Carregar Dados do BigQuery
#=========================================================================================
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=PACKAGES_TO_INSTALL
)
def load_data_from_bq(
    project_id: str,
    location: str,
    bq_table: str,
    output_dataset: Output[Dataset]
):
    """Carrega dados de uma tabela do BigQuery e salva como um arquivo CSV."""
    from google.cloud import bigquery
    import pandas as pd

    client = bigquery.Client(project=project_id, location=location)

    print(f"Lendo dados da tabela {bq_table}...")
    sql_query = f"SELECT * FROM `{bq_table}`"
    df = client.query(sql_query).to_dataframe()

    print("Ajuste formato colunas")
    colunas_para_converter = df.select_dtypes(include='Int64').columns
    for col in colunas_para_converter:
        df[col] = df[col].astype('int64')

    print("✅ Sucesso! Dados carregados no DataFrame.")
    print(df.head())

    # Salva o dataframe como um arquivo CSV, que é o artefato de saída.
    df.to_csv(output_dataset.path, index=False)
    print(f"Dataset salvo em: {output_dataset.path}")

#=========================================================================================
# COMPONENTE 2: Treinar Modelo de Frequência (Poisson)
#=========================================================================================
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=PACKAGES_TO_INSTALL
)
def train_frequency_model(
    input_dataset: Input[Dataset],
    target_column: str, # 'qtd_colisao_parcial' ou 'qtd_colisao_total'
    output_model: Output[Model]
):
    """Treina um modelo GLM Poisson para frequência de sinistros."""
    import pandas as pd
    import numpy as np
    import statsmodels.api as sm
    import joblib

    df = pd.read_csv(input_dataset.path)

    print(f"Treinando modelo de frequência para a coluna: {target_column}")

    features_freq = ['classe_bonus', 'idade', 'rns']
    X_freq = sm.add_constant(df[features_freq])

    model = sm.GLM(
        endog=df[target_column],
        exog=X_freq,
        family=sm.families.Poisson(),
        offset=np.log(df['exposicao'])
    )
    results = model.fit()

    # Salva o modelo treinado usando joblib
    joblib.dump(results, output_model.path)
    print(f"Modelo de frequência salvo em: {output_model.path}")
    print(results.summary())

#=========================================================================================
# COMPONENTE 3: Treinar Modelo de Severidade (Gamma)
#=========================================================================================
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=PACKAGES_TO_INSTALL
)
def train_severity_model(
    input_dataset: Input[Dataset],
    output_model: Output[Model]
):
    """Treina um modelo GLM Gamma para severidade de sinistros parciais."""
    import pandas as pd
    import statsmodels.api as sm
    import joblib

    df = pd.read_csv(input_dataset.path)

    print("Treinando modelo de severidade...")

    df_com_sinistro = df[df['valor_colisao_parcial'] > 0].copy()
    y_sev = df_com_sinistro['valor_colisao_parcial'] / df_com_sinistro['qtd_colisao_parcial']

    features_sev = ['classe_bonus', 'idade', 'rns', 'valor_veiculo']
    X_sev_com_sinistro = sm.add_constant(df_com_sinistro[features_sev])

    model = sm.GLM(
        endog=y_sev,
        exog=X_sev_com_sinistro,
        family=sm.families.Gamma(link=sm.families.links.Log())
    )
    results = model.fit()
    print(results.summary())

    joblib.dump(results, output_model.path)
    print(f"Modelo de severidade salvo em: {output_model.path}")
    print(results.summary())

#=========================================================================================
# COMPONENTE 4: Predizer e Calcular Prêmio
#=========================================================================================
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=PACKAGES_TO_INSTALL
)
def predict_from_registered_models(
    project_id: str,
    location: str,
    input_dataset: Input[Dataset],
    model_freq_parcial_resource_name: str,
    model_freq_total_resource_name: str,
    model_sev_parcial_resource_name: str,
    output_predictions: Output[Dataset]
):
    """Carrega modelos registrados, aplica predições e calcula o prêmio."""
    import pandas as pd
    import numpy as np
    import joblib
    import statsmodels.api as sm
    from google.cloud import aiplatform, storage
    import os

    df = pd.read_csv(input_dataset.path)
    aiplatform.init(project=project_id, location=location)

    def download_and_load_model(model_resource_name: str) -> any:
        """Baixa um modelo via Model Registry e o carrega."""
        print(f"Carregando modelo do resource name: {model_resource_name}")
        model = aiplatform.Model(model_name=model_resource_name)
        model_artifact_dir = model.uri
        print(f"URI do artefato do modelo: {model_artifact_dir}")

        model_file_gcs_path = f"{model_artifact_dir}/model.joblib"
        local_model_file = f"/tmp/{model_resource_name.split('/')[-1]}.joblib"

        path_parts = model_file_gcs_path.replace("gs://", "").split('/')
        bucket_name = path_parts[0]
        object_name = '/'.join(path_parts[1:])

        storage_client = storage.Client(project=project_id)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(object_name)
        blob.download_to_filename(local_model_file)
        print(f"Modelo baixado para {local_model_file}")

        return joblib.load(local_model_file)

    # Carrega os modelos a partir do Registry
    results_freq_parcial = download_and_load_model(model_freq_parcial_resource_name)
    results_freq_total = download_and_load_model(model_freq_total_resource_name)
    results_sev_parcial = download_and_load_model(model_sev_parcial_resource_name)
    print("Modelos carregados com sucesso do Registry.")

    # Predição da Frequência (versão corrigida, sem o np.exp extra)
    df['freq_parcial_pred'] = results_freq_parcial.predict(sm.add_constant(df[['classe_bonus', 'idade', 'rns']]))
    df['freq_total_pred'] = results_freq_total.predict(sm.add_constant(df[['classe_bonus', 'idade', 'rns']]))

    # Predição da Severidade (versão corrigida)
    df['sev_parcial_pred'] = results_sev_parcial.predict(sm.add_constant(df[['classe_bonus', 'idade', 'rns', 'valor_veiculo']]))

    # Cálculo do Prêmio de Risco
    df['premio_risco_pred'] = (
        df['freq_parcial_pred'] * df['sev_parcial_pred'] +
        df['freq_total_pred'] * df['valor_veiculo']
    )

    print("Cálculos finalizados. Salvando predições.")
    df.to_csv(output_predictions.path, index=False)
    print(df[['valor_veiculo', 'freq_parcial_pred', 'sev_parcial_pred', 'freq_total_pred', 'premio_risco_pred']].head())


#=========================================================================================
# COMPONENTE 5: Gerar Relatório de Sumarização
#=========================================================================================
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=PACKAGES_TO_INSTALL
)
def generate_summary_report(
    predictions_dataset: Input[Dataset],
    output_report: Output[Markdown]
):
    """Calcula métricas sumarizadas e gera um relatório de comparação."""
    import pandas as pd
    import numpy as np

    df = pd.read_csv(predictions_dataset.path)

    # Métricas Realizadas
    soma_exposicao = df['exposicao'].sum()
    soma_qtd_parcial = df['qtd_colisao_parcial'].sum()
    soma_valor_parcial = df['valor_colisao_parcial'].sum()
    soma_qtd_total = df['qtd_colisao_total'].sum()

    freq_parcial_real = soma_qtd_parcial / soma_exposicao
    sev_parcial_real = soma_valor_parcial / soma_qtd_parcial if soma_qtd_parcial > 0 else 0
    freq_total_real = soma_qtd_total / soma_exposicao
    premio_risco_real = freq_parcial_real * sev_parcial_real + freq_total_real * np.average(df['valor_veiculo'], weights=df['exposicao'])

    # Métricas Previstas
    premio_risco_prev_total = (df['premio_risco_pred'] * df['exposicao']).sum()
    premio_risco_prev_medio = premio_risco_prev_total / soma_exposicao

    soma_qtd_parcial_pred = (df['freq_parcial_pred'] * df['exposicao']).sum()
    soma_valor_parcial_pred = (df['freq_parcial_pred'] * df['exposicao'] * df['sev_parcial_pred']).sum()
    soma_qtd_total_pred = (df['freq_total_pred'] * df['exposicao']).sum()

    freq_parcial_prev = soma_qtd_parcial_pred / soma_exposicao
    sev_parcial_prev = soma_valor_parcial_pred / soma_qtd_parcial_pred if soma_qtd_parcial_pred > 0 else 0
    freq_total_prev = soma_qtd_total_pred / soma_exposicao
    premio_risco_prev = freq_parcial_prev * sev_parcial_prev + freq_total_prev * np.average(df['valor_veiculo'], weights=df['exposicao'])

    sumario = pd.DataFrame({
        'Métrica': [
            'Frequência de Colisão Parcial',
            'Severidade de Colisão Parcial',
            'Frequência de Colisão Total',
            'Prêmio de Risco (Puro Médio)'
        ],
        'Valor Realizado': [
            freq_parcial_real,
            sev_parcial_real,
            freq_total_real,
            premio_risco_real
        ],
        'Valor Previsto pelo Modelo': [
            freq_parcial_prev,
            sev_parcial_prev,
            freq_total_prev,
            premio_risco_prev
        ]
    })

    # Gerando o relatório em Markdown
    markdown_report = "# Relatório de Comparação Real x Previsto\n\n"
    markdown_report += sumario.to_markdown(index=False)

    with open(output_report.path, 'w') as f:
        f.write(markdown_report)

    print("Relatório gerado com sucesso!")
    print("RELATÓRIO DE COMPARAÇÃO REAL X PREVISTO")
    print("="*75)
    print(sumario.to_string(index=False))
    print("="*75)

#=========================================================================================
# COMPONENTE 6: Registrar Modelo no Vertex AI
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=PACKAGES_TO_INSTALL
)
def upload_and_register_model(
    project_id: str,
    location: str,
    model_display_name: str,
    model_artifact: Input[Model],
) -> str:  # MUDANÇA 1: Declara o retorno de uma string
    """Faz o upload de um artefato de modelo e retorna seu resource name."""
    from google.cloud import aiplatform, storage
    import os

    SERVING_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-1:latest"
    aiplatform.init(project=project_id, location=location)

    gcs_path = model_artifact.uri
    path_parts = gcs_path.replace("gs://", "").split('/')
    bucket_name = path_parts[0]
    object_prefix = '/'.join(path_parts[1:])
    model_upload_dir = os.path.dirname(object_prefix)
    new_file_name = "model.joblib"
    new_object_path = f"{model_upload_dir}/{new_file_name}"

    storage_client = storage.Client(project=project_id)
    source_bucket = storage_client.bucket(bucket_name)
    source_blob = source_bucket.blob(object_prefix)
    source_bucket.copy_blob(source_blob, source_bucket, new_object_path)
    print(f"Artefato do modelo copiado para: gs://{bucket_name}/{new_object_path}")

    artifact_uri_for_upload = f"gs://{bucket_name}/{model_upload_dir}"
    print(f"Registrando o modelo '{model_display_name}' do diretório {artifact_uri_for_upload}...")

    model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=artifact_uri_for_upload,
        serving_container_image_uri=SERVING_IMAGE,
        description=f"Modelo de precificação de risco: {model_display_name}",
    )
    model.wait()

    resource_name = model.resource_name
    print(f"✅ Modelo registrado! Resource Name: {resource_name}")

    # MUDANÇA 2: Retorna a string diretamente
    return resource_name


#=========================================================================================
# DEFINIÇÃO DA PIPELINE ATUALIZADAV2
#=========================================================================================
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name='Pipeline de Precificação de Risco com Registro de Modelo',
    description='Pipeline que treina, registra e usa modelos para calcular o prêmio de risco.'
)
def risk_pricing_pipeline2(
    project_id: str = 'project-poc-purple',
    location: str = 'us-central1',  # Região para o Vertex AI (BQ pode estar em outra)
    bq_location: str = 'US', # Localização do dataset no BigQuery
    bq_table: str = 'project-poc-purple.demos.dados_apolices_v1',
    model_name_prefix: str = 'risk-pricing-auto-v2'
):
    # Passo 1: Carregar os dados
    load_task = load_data_from_bq(
        project_id=project_id,
        location=bq_location,
        bq_table=bq_table
    )
     # Definição de recursos a serem alocados para esta tarefa
    load_task.set_cpu_limit("2").set_memory_limit("8G")

    # Passo 2: Treinar os 3 modelos (em paralelo)
    train_freq_parcial_task = train_frequency_model(
        input_dataset=load_task.outputs['output_dataset'],
        target_column='qtd_colisao_parcial'
    )
    # Definição de recursos a serem alocados para esta tarefa
    train_freq_parcial_task.set_cpu_limit("2").set_memory_limit("8G")

    train_freq_total_task = train_frequency_model(
        input_dataset=load_task.outputs['output_dataset'],
        target_column='qtd_colisao_total'
    )
    # Definição de recursos a serem alocados para esta tarefa
    train_freq_total_task.set_cpu_limit("2").set_memory_limit("8G")

    train_sev_parcial_task = train_severity_model(
        input_dataset=load_task.outputs['output_dataset']
    )
    # Definição de recursos a serem alocados para esta tarefa
    train_sev_parcial_task.set_cpu_limit("2").set_memory_limit("8G")

    # Passo 3: Registrar os 3 modelos no Vertex AI (em paralelo)
    register_freq_parcial_task = upload_and_register_model(
        project_id=project_id,
        location=location,
        model_display_name=f"{model_name_prefix}-freq-parcial",
        model_artifact=train_freq_parcial_task.outputs['output_model']
    )
    # Definição de recursos a serem alocados para esta tarefa
    register_freq_parcial_task.set_cpu_limit("2").set_memory_limit("8G")

    register_freq_total_task = upload_and_register_model(
        project_id=project_id,
        location=location,
        model_display_name=f"{model_name_prefix}-freq-total",
        model_artifact=train_freq_total_task.outputs['output_model']
    )
    # Definição de recursos a serem alocados para esta tarefa
    register_freq_total_task.set_cpu_limit("2").set_memory_limit("8G")

    register_sev_parcial_task = upload_and_register_model(
        project_id=project_id,
        location=location,
        model_display_name=f"{model_name_prefix}-sev-parcial",
        model_artifact=train_sev_parcial_task.outputs['output_model']
    )
    # Definição de recursos a serem alocados para esta tarefa
    register_sev_parcial_task.set_cpu_limit("2").set_memory_limit("8G")

    # Passo 4: Fazer predições usando os 3 modelos REGISTRADOS
    predict_task = predict_from_registered_models(
        project_id=project_id,
        location=location,
        input_dataset=load_task.outputs['output_dataset'],
        model_freq_parcial_resource_name=register_freq_parcial_task.output,
        model_freq_total_resource_name=register_freq_total_task.output,
        model_sev_parcial_resource_name=register_sev_parcial_task.output
    )
    # Definição de recursos a serem alocados para esta tarefa
    predict_task.set_cpu_limit("2").set_memory_limit("8G")


    # Passo 5: Gerar relatório final
    generate_summary_report(
        predictions_dataset=predict_task.outputs['output_predictions']
    )
    # Definição de recursos a serem alocados para esta tarefa
    generate_summary_report.set_cpu_limit("2").set_memory_limit("8G")

In [37]:
#=========================================================================================
# COMPILAÇÃO DA PIPELINE
#=========================================================================================
from kfp.compiler import Compiler
Compiler().compile(
    pipeline_func=risk_pricing_pipeline2,
    package_path='risk_pricing_pipeline.yaml' # ou .json
)
print("Pipeline compilada com sucesso para 'risk_pricing_pipeline.yaml'")

Pipeline compilada com sucesso para 'risk_pricing_pipeline.yaml'


In [38]:
#=========================================================================================
# EXECUÇÃO DA PIPELINE
#=========================================================================================
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

PROJECT_ID = "project-poc-purple"
REGION = "us-central1"
aiplatform.init(project=PROJECT_ID, location=REGION)

start_pipeline = pipeline_jobs.PipelineJob(
  display_name="risk_pricing_pipeline-pipeline",
  template_path="risk_pricing_pipeline.yaml",
  enable_caching=False,
  location=REGION
)

start_pipeline.run()