# Passo 4: Modelagem e Experimentação (MLflow)

## Objetivo
Testar diversos algoritmos e hiperparâmetros usando Cross-Validation e registrar os experimentos no MLflow.

**Etapas:**
1. Setup e Leitura dos Dados
2. Feature Engineering (Pipeline)
3. Definição dos Modelos e Grids
4. Loop de Treinamento e Log no MLflow
5. Análise do Melhor Modelo

In [None]:
import mlflow
import mlflow.spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, month, year, when
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor, LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import datetime

# Initialize Spark
spark = SparkSession.builder \
    .appName("RetailPriceFeatures") \
    .config("spark.sql.warehouse.dir", "spark-warehouse") \
    .getOrCreate()

# MLflow Setup (Adjust experiment name for Databricks)
experiment_name = "/Shared/RetailPrice_Elasticity_Experiment"
try:
    mlflow.set_experiment(experiment_name)
except:
    print("Experiment might already exist or local context.")

# Load Data
try:
    df = spark.table("retail_price_clean")
except:
    # Fallback for local run without hive
    import pandas as pd
    df = spark.createDataFrame(pd.read_parquet('../data/retail_price_clean.parquet'))

print(f"Rows: {df.count()}")

In [None]:
# --- FEATURE ENGINEERING ---
# (Mesma lógica do notebook anterior, garantindo consistência)

# 1. Lags
window_spec = Window.partitionBy("product_id").orderBy("date")
df = df.withColumn("lag_qty_1", lag("qty", 1).over(window_spec)) \
       .withColumn("lag_price_1", lag("unit_price", 1).over(window_spec))

df = df.na.fill(0)

# 2. Features Derivadas
df = df.withColumn("price_diff_comp1", col("unit_price") - col("comp_1")) \
       .withColumn("month", month("date"))

# 3. Encoding Stages
indexer_prod = StringIndexer(inputCol="product_id", outputCol="product_id_idx", handleInvalid="keep")
indexer_cat = StringIndexer(inputCol="product_category_name", outputCol="product_category_idx", handleInvalid="keep")
encoder_cat = OneHotEncoder(inputCols=["product_category_idx"], outputCols=["product_category_vec"])

# 4. Assembler
num_features = [
    'freight_price', 'unit_price', 'product_name_lenght', 'product_description_lenght',
    'product_photos_qty', 'product_weight_g', 'product_score', 'customers',
    'weekday', 'weekend', 'holiday', 'month', 'year', 'volume',
    'comp_1', 'ps1', 'fp1', 'comp_2', 'ps2', 'fp2', 
    'comp_3', 'ps3', 'fp3', 'lag_price', 
    'lag_qty_1', 'lag_price_1', 'price_diff_comp1'
]

assembler = VectorAssembler(
    inputCols=num_features + ["product_id_idx", "product_category_vec"],
    outputCol="features",
    handleInvalid="keep"
)

# Split Train/Test
dates = df.select("date").distinct().orderBy("date").collect()
split_idx = int(len(dates) * 0.8)
split_date = dates[split_idx][0]
train_df = df.filter(col("date") < split_date)
test_df = df.filter(col("date") >= split_date)

print(f"Train: {train_df.count()}, Test: {test_df.count()}")

In [None]:
# --- DEFINIÇÃO DOS MODELOS ---

# Avaliador
evaluator = RegressionEvaluator(labelCol="qty", predictionCol="prediction", metricName="rmse")

# 1. GBT Regressor
gbt = GBTRegressor(featuresCol="features", labelCol="qty", seed=42)
grid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5]) \
    .addGrid(gbt.maxIter, [20, 50]) \
    .build()

# 2. Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="qty", seed=42)
grid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# 3. Linear Regression (Baseline)
lr = LinearRegression(featuresCol="features", labelCol="qty")
grid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
    .build()

models_to_run = [
    ("GBT", gbt, grid_gbt),
    ("RandomForest", rf, grid_rf),
    ("LinearRegression", lr, grid_lr)
]

In [None]:
# --- LOOP DE TREINAMENTO ---

def train_and_log(name, model, param_grid):
    with mlflow.start_run(run_name=f"Train_{name}"):
        print(f"Training {name}...")
        
        # Pipeline specific for this model
        # Note: Indexers/Assembler are stateless estimators or transformers, can be reused
        pipeline = Pipeline(stages=[indexer_prod, indexer_cat, encoder_cat, assembler, model])
        
        # Cross Validator
        cv = CrossValidator(estimator=pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=evaluator,
                            numFolds=3, seed=42)
        
        # Fit
        cv_model = cv.fit(train_df)
        best_model = cv_model.bestModel
        
        # Metrics on Test Data (Holdout)
        predictions = best_model.transform(test_df)
        rmse = evaluator.evaluate(predictions)
        r2 = RegressionEvaluator(labelCol="qty", predictionCol="prediction", metricName="r2").evaluate(predictions)
        
        print(f"  -> RMSE: {rmse:.4f}, R2: {r2:.4f}")
        
        # Log MLflow
        mlflow.log_param("model_type", name)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        
        # Log Params of Best Model
        # Extracting params from the last stage (the model itself)
        model_stage = best_model.stages[-1]
        # Simple logging of a few key params manually for clarity (or automate extraction)
        if name == "GBT":
            mlflow.log_param("maxDepth", model_stage.getMaxDepth())
            mlflow.log_param("maxIter", model_stage.getMaxIter())
        elif name == "RandomForest":
            mlflow.log_param("numTrees", model_stage.getNumTrees())
            mlflow.log_param("maxDepth", model_stage.getMaxDepth())
            
        # Save Best Model Artifact
        mlflow.spark.log_model(best_model, "model")
        
        return name, rmse, best_model

results = []
for name, model, grid in models_to_run:
    res = train_and_log(name, model, grid)
    results.append(res)

# Find Winner
results.sort(key=lambda x: x[1]) # Sort by RMSE asc
winner_name, winner_rmse, winner_model = results[0]

print(f"\nWINNER MODEL: {winner_name} with RMSE: {winner_rmse:.4f}")