In [0]:
'''from pyspark.ml import PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col, when
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator


# Carregar modelos e dados
slicer_model = PipelineModel.load("/FileStore/models/slicer_top10")
train_ready = spark.read.format("delta").load("/FileStore/data/train_ready")
val_ready = spark.read.format("delta").load("/FileStore/data/val_ready")

# Aplicar slicer
train_topk = slicer_model.transform(train_ready)
val_topk = slicer_model.transform(val_ready)

# Balanceamento leve
minority_df = train_topk.filter(col("label") == 1)
majority_df = train_topk.filter(col("label") != 1)
train_balanced = majority_df.sample(False, 0.01, seed=42).union(minority_df)

# Criar coluna de pesos (classe 1 com peso maior)
train_balanced = train_balanced.withColumn(
    "classWeightCol", when(col("label") == 1, 5.0).otherwise(1.0)
)

# GBT com weightCol
gbt = GBTClassifier(
    labelCol="label",
    featuresCol="features_topK",
    weightCol="classWeightCol",
    maxIter=20,
    maxDepth=5,
    seed=42
)

# Criar o grid de hiperparâmetros
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxIter, [10, 20, 30])  # Número de árvores
             .addGrid(gbt.maxDepth, [3, 5, 7])    # Profundidade máxima
             .addGrid(gbt.stepSize, [0.01, 0.1])  # Taxa de aprendizado
             .addGrid(gbt.subsamplingRate, [0.8, 1.0])  # Taxa de subamostragem
             .build())

# Definir o avaliador
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

# Configurar validação cruzada
crossval = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,  # Número de dobras para validação cruzada
    seed=42
)

# Treinar modelo
cv_model = crossval.fit(train_balanced)

# Obter o melhor modelo
best_model = cv_model.bestModel

# Exibir os melhores hiperparâmetros
print("\n✅ Melhores hiperparâmetros encontrados:")
print(f"maxIter: {best_model._java_obj.getMaxIter()}")
print(f"maxDepth: {best_model._java_obj.getMaxDepth()}")
print(f"stepSize: {best_model._java_obj.getStepSize()}")
print(f"subsamplingRate: {best_model._java_obj.getSubsamplingRate()}")

# Inferência
val_preds = best_model.transform(val_topk)

# Avaliação
#f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1").evaluate(val_preds)
#precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision").evaluate(val_preds)
#recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall").evaluate(val_preds)

# Avaliação do melhor modelo
f1 = evaluator.evaluate(val_preds)
precision = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision"
).evaluate(val_preds)
recall = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall"
).evaluate(val_preds)

print(f"\n✅ GBT com Pesos:")
print(f"F1-score:  {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")

# Matriz de confusão
preds_rdd = val_preds.select("prediction", "label").rdd.map(lambda r: (float(r[0]), float(r[1])))
metrics = MulticlassMetrics(preds_rdd)
print("\nConfusion Matrix:")
print(metrics.confusionMatrix().toArray())

# Guardar modelo
best_model.write().overwrite().save("/FileStore/models/gbt_top10_weighted_model")'''

In [0]:
from pyspark.ml import PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col, when
import itertools
import pandas as pd
from pyspark.sql import SparkSession

# Inicializar SparkSession para evitar FutureWarning
spark = SparkSession.builder.getOrCreate()

# Carregar modelos e dados
slicer_model = PipelineModel.load("/FileStore/models/slicer_top10")
train_ready = spark.read.format("delta").load("/FileStore/data/train_ready")
val_ready = spark.read.format("delta").load("/FileStore/data/val_ready")

# Aplicar slicer
train_topk = slicer_model.transform(train_ready)
val_topk = slicer_model.transform(val_ready)

# Balanceamento leve
minority_df = train_ready.filter(col("label") == 1)
majority_df = train_ready.filter(col("label") != 1)
train_balanced = majority_df.sample(False, 0.2, seed=42).union(minority_df)

# Criar coluna de pesos (aumentar peso da classe minoritária)
train_balanced = train_balanced.withColumn(
    "classWeightCol", when(col("label") == 1, 2.0).otherwise(1.0)
)

# Cachear dados
train_balanced.cache()
val_topk.cache()

# Depurar: Verificar contagem de classes
print("Contagem de classes em train_balanced:")
train_balanced.groupBy("label").count().show()

# Definir o grid de hiperparâmetros (maior variação)
param_grid = {
    "maxIter": [10],
    "maxDepth": [10],
    "stepSize": [0.5],
    "subsamplingRate": [0.5]
}

# Criar combinações de hiperparâmetros
keys = param_grid.keys()
combinations = list(itertools.product(*param_grid.values()))

# Lista para armazenar resultados
results = []

# Avaliador
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

# Verificar e definir caminho de saída
output_path = "/dbfs/FileStore/tuning_results"
try:
    dbutils.fs.mkdirs(output_path)
    print(f"Diretório {output_path} criado ou já existe.")
except Exception as e:
    print(f"Erro ao criar {output_path}: {str(e)}")
    output_path = "/tmp/tuning_results"  # Fallback para /tmp
    try:
        dbutils.fs.mkdirs(output_path)
        print(f"Usando caminho alternativo: {output_path}")
    except Exception as e:
        print(f"Erro ao criar {output_path}: {str(e)}")
        output_path = "/dbfs/tmp/tuning_results"  # Último fallback
        dbutils.fs.mkdirs(output_path)
        print(f"Usando caminho final: {output_path}")

# Loop manual sobre combinações de hiperparâmetros
for i, combo in enumerate(combinations):
    # Extrair hiperparâmetros
    params = dict(zip(keys, combo))
    max_iter = params["maxIter"]
    max_depth = params["maxDepth"]
    step_size = params["stepSize"]
    subsampling_rate = params["subsamplingRate"]

    print(f"\nTreinando modelo {i+1}/{len(combinations)} com parâmetros: {params}")

    # Criar e treinar o modelo GBT
    gbt = GBTClassifier(
        labelCol="label",
        featuresCol="features_topK",
        weightCol="classWeightCol",
        maxIter=max_iter,
        maxDepth=max_depth,
        stepSize=step_size,
        subsamplingRate=subsampling_rate,
        seed=42
    )

    # Treinar o modelo
    model = gbt.fit(train_balanced)

    # Fazer previsões no conjunto de validação
    val_preds = model.transform(val_topk)

    # Depurar: Verificar previsões
    print(f"Amostra de previsões para modelo {i+1}:")
    val_preds.select("prediction", "label").show(5, truncate=False)

    # Avaliar o modelo
    f1 = evaluator.evaluate(val_preds)
    precision = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="weightedPrecision"
    ).evaluate(val_preds)
    recall = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="weightedRecall"
    ).evaluate(val_preds)

    # Matriz de confusão
    preds_rdd = val_preds.select("prediction", "label").rdd.map(lambda r: (float(r[0]), float(r[1])))
    metrics = MulticlassMetrics(preds_rdd)
    confusion_matrix = metrics.confusionMatrix().toArray().tolist()

    # Armazenar resultados
    result = {
        "maxIter": max_iter,
        "maxDepth": max_depth,
        "stepSize": step_size,
        "subsamplingRate": subsampling_rate,
        "f1": f1,
        "precision": precision,
        "recall": recall,
        "confusion_matrix": confusion_matrix
    }
    results.append(result)

    print(f"F1-score: {f1:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
    print(f"Confusion Matrix:\n{confusion_matrix}")

    # Salvar resultados intermediários no driver node e copiar para DBFS
    partial_path_local = f"/tmp/tuning_results_partial_{i}.csv"
    results_df = pd.DataFrame(results)
    results_df.to_csv(partial_path_local, index=False)
    dbutils.fs.cp(f"file:{partial_path_local}", f"{output_path}/tuning_results_partial_{i}.csv")
    print(f"Resultados parciais salvos em {output_path}/tuning_results_partial_{i}.csv")

# Salvar resultados finais
results_df = pd.DataFrame(results)
final_path_local = "/tmp/tuning_results_final.csv"
results_df.to_csv(final_path_local, index=False)
dbutils.fs.cp(f"file:{final_path_local}", f"{output_path}/tuning_results_final.csv")
print(f"\nResultados finais salvos em {output_path}/tuning_results_final.csv")



Contagem de classes em train_balanced:
+-----+------+
|label| count|
+-----+------+
|  0.0|168283|
|  1.0|501811|
+-----+------+

Diretório /dbfs/FileStore/tuning_results criado ou já existe.

Treinando modelo 1/1 com parâmetros: {'maxIter': 25, 'maxDepth': 10, 'stepSize': 0.5, 'subsamplingRate': 0.7}
