In [0]:
# Notebook: 02_model_training.py

import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# 1. Definição da Tabela Bronze
BRONZE_TABLE = "dev_catalogue.staging_schema.bronze_insurance_costs"
SILVER_TABLE = "dev_catalogue.staging_schema.silver_insurance_features"

# Leitura da Tabela Delta
df_bronze = spark.read.table(BRONZE_TABLE)

print(f"DataFrame lido com sucesso da tabela Delta: {BRONZE_TABLE}")
df_bronze.show(5)

In [0]:
# Colunas Categóricas (features que precisam de encoding)
categorical_cols = ["sex", "smoker", "region"]

# Cria StringIndexers para as colunas categóricas
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
    for col in categorical_cols
]

# Cria OneHotEncoders para as colunas indexadas
encoders = [
    OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded", handleInvalid="keep")
    for col in categorical_cols
]

# Colunas Numéricas Originais (já prontas)
numerical_cols = ["age", "bmi", "children"]

# Colunas Finais Codificadas (as numéricas originais + as categóricas codificadas)
final_feature_cols = numerical_cols + [col + "_encoded" for col in categorical_cols]

# Cria o VectorAssembler para combinar todas as features em uma única coluna
vector_assembler = VectorAssembler(
    inputCols=final_feature_cols,
    outputCol="features"
)

# 3. Criação da Pipeline (SOMENTE DE TRANSFORMAÇÃO)
pipeline = Pipeline(stages=indexers + encoders + [vector_assembler])

# Aplica a Pipeline no DataFrame (ISSO CRIA 'features')
pipeline_model = pipeline.fit(df_bronze) 
df_with_features = pipeline_model.transform(df_bronze) # DataFrame temporário com 'features'

# 4. Selecionar e Salvar a Camada SILVER (APENAS COLUNAS BRUTAS + LABEL)
# Você deve retornar ao df_bronze ou df_with_features e selecionar APENAS as colunas que a Célula 3 vai precisar para TREINAR.

df_silver = df_bronze.select( # Use df_bronze, ou leia as colunas brutas do df_with_features
    F.col("charges").alias("label"), 
    *numerical_cols,
    *categorical_cols
)

# 5. Salvar a Camada SILVER
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(SILVER_TABLE)

print("\nDataFrame SILVER (Pré-Processado) pronto:")
df_silver.show(5, truncate=False)

df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(SILVER_TABLE)

print(f"\n✅ Camada SILVER salva com sucesso na Tabela Delta (Esquema Migrado): {SILVER_TABLE}")



In [0]:
# CÉLULA FINAL: Treinamento e Registro da PIPELINE COMPLETA (MLflow)

import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# IMPORTAÇÕES NECESSÁRIAS:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler # INCLUSÃO PARA DEFINIÇÃO DO PIPELINE
from mlflow.models.signature import infer_signature
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec

# Configuração da Tabela SILVER (leitura)
SILVER_TABLE = "dev_catalogue.staging_schema.silver_insurance_features"
df_silver_raw = spark.read.table(SILVER_TABLE)

# --- NOVO TRECHO: Selecionar APENAS as colunas que a Pipeline precisa ---
cols_to_use = ["label", "age", "bmi", "children", "sex", "smoker", "region"]
df_silver = df_silver_raw.select(*cols_to_use)

(train_df, test_df) = df_silver.randomSplit([0.8, 0.2], seed=42)

# -------------------------------------------------------------
# --- DEFINIÇÕES DO PIPELINE (REINSERIDAS PARA AUTONOMIA) ---
# -------------------------------------------------------------
categorical_cols = ["sex", "smoker", "region"]
numerical_cols = ["age", "bmi", "children"]

indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
    for col in categorical_cols
]
encoders = [
    OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded", handleInvalid="keep")
    for col in categorical_cols
]
final_feature_cols = numerical_cols + [col + "_encoded" for col in categorical_cols]
vector_assembler = VectorAssembler(inputCols=final_feature_cols, outputCol="features")
# -------------------------------------------------------------

# --- INÍCIO DA EXECUÇÃO DO MLFLOW ---
EXPERIMENT_PATH = "/Users/ronaldo.rosario.ramos@gmail.com/Insurance_Cost_Prediction_Experiment"
MODEL_REGISTRY_NAME = "Insurance_Cost_LR_Model"
ARTIFACT_PATH = "spark_pipeline_model" # Alterado para refletir o registro da Pipeline
UC_VOLUME_TMP_PATH = "/Volumes/dev_catalogue/staging_schema/raw_data_volume/mlflow_temp"

mlflow.set_experiment(EXPERIMENT_PATH)

# -----------------------------------------------------------------
# --- AÇÃO CORRETIVA: LIMPAR CACHE (RESOLVE O OVERFLOW EXCEPTION) ---
# -----------------------------------------------------------------
print("Tentando limpar variáveis antigas para evitar Model Cache Overflow...")
try:
    if 'pipeline_model_fit' in locals():
        del pipeline_model_fit
except:
    pass
# -----------------------------------------------------------------

with mlflow.start_run(run_name="Treinamento_Pipeline_Final") as run:
    
    # 1. DEFINIÇÃO E TREINAMENTO DA PIPELINE COMPLETA
    lr = LinearRegression(featuresCol='features', labelCol='label', maxIter=10)
    
    # A Pipeline COMPLETA AGORA USA VARIÁVEIS DEFINIDAS NESTA CÉLULA!
    full_pipeline = Pipeline(stages=indexers + encoders + [vector_assembler, lr])
    
    # O fit treina indexers e o LinearRegressor em uma só etapa
    pipeline_model_fit = full_pipeline.fit(train_df) 
    
    # 2. Avaliação
    # O transform() no df_test aplica TODAS as etapas (transformação e previsão)
    predictions_df = pipeline_model_fit.transform(test_df)
    
    # Cálculo de métricas...
    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")
    rmse = evaluator.evaluate(predictions_df, {evaluator.metricName: "rmse"})
    r2 = evaluator.evaluate(predictions_df, {evaluator.metricName: "r2"})
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)

    # 3. GERAÇÃO E DEFINIÇÃO DA ASSINATURA
    # A assinatura é inferida no DataFrame de predições (que possui 'features' e 'prediction')
    signature = infer_signature(
        model_input=predictions_df.select("features"),
        model_output=predictions_df.select("prediction")
    )

    # 4. REGISTRO CORRIGIDO (Registrando o PipelineModel Completo)
    mlflow.spark.log_model(
        spark_model=pipeline_model_fit, # <-- REGISTRA A PIPELINE COMPLETA E FUNCIONAL
        artifact_path=ARTIFACT_PATH,
        registered_model_name=MODEL_REGISTRY_NAME,
        dfs_tmpdir=UC_VOLUME_TMP_PATH, # <--- CORREÇÃO DE CAMINHO UC INCLUÍDA
        signature=signature            # <--- CORREÇÃO DE ASSINATURA UC INCLUÍDA
    )
    
    run_id = run.info.run_id
    
print("-" * 50)
print(f"✅ REGISTRO DA PIPELINE FINALIZADO!")
print(f"   - RMSE: {rmse:.2f} | A Pipeline completa está registrada na Versão mais recente.")
print("-" * 50)