In [None]:
import os
import yaml
import joblib
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.ml import PipelineModel
from pyspark.ml.classification import (
    LogisticRegression, DecisionTreeClassifier, RandomForestClassifier,
    GBTClassifier, LinearSVC, NaiveBayes, MultilayerPerceptronClassifier,
    FMClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
from pyspark.ml.classification import OneVsRest
from sklearn.metrics import roc_curve, auc, precision_recall_curve, calibration_curve
import optuna

In [None]:
# Função para calcular métricas
def evaluate_model(model, train_df, test_df, variavel_resposta):
    fitted_model = model.fit(train_df)
    predictions = fitted_model.transform(test_df)
    
    metrics = {}
    evaluator = MulticlassClassificationEvaluator(labelCol=variavel_resposta, predictionCol="prediction")
    for metric in ["accuracy", "weightedPrecision", "weightedRecall", "f1"]:
        metrics[metric] = evaluator.evaluate(predictions, {evaluator.metricName: metric})
    
    if len(predictions.select(variavel_resposta).distinct().collect()) == 2:
        auc_evaluator = BinaryClassificationEvaluator(labelCol=variavel_resposta, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
        metrics["AUC"] = auc_evaluator.evaluate(predictions)
        metrics["KS"] = calculate_ks(predictions, variavel_resposta)
    
    metrics["ConfusionMatrix"] = predictions.groupBy(variavel_resposta, "prediction").count().orderBy(variavel_resposta, "prediction").collect()
    return metrics, predictions

# Função para calcular o KS
def calculate_ks(predictions, variavel_resposta):
    sorted_predictions = predictions.orderBy(F.desc("probability"))
    tpr = sorted_predictions.withColumn("tpr", F.sum(variavel_resposta).over(Window.orderBy(F.desc("probability"))))
    fpr = sorted_predictions.withColumn("fpr", F.sum(1 - F.col(variavel_resposta)).over(Window.orderBy(F.desc("probability"))))
    ks = tpr.withColumn("ks", F.col("tpr") - F.col("fpr")).agg(F.max("ks")).collect()[0][0]
    return ks

# Função para salvar o modelo e os hiperparâmetros
def save_model_and_params(model, params, model_name, save_dir):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    model_path = os.path.join(save_dir, f"{model_name}_model")
    model.save(model_path)
    params_path = os.path.join(save_dir, f"{model_name}_params.yaml")
    with open(params_path, 'w') as file:
        yaml.dump(params, file)
    print(f"Modelo e hiperparâmetros salvos em: {save_dir}")

# Função para plotar a matriz de confusão
def plot_confusion_matrix(confusion_matrix):
    labels = sorted(set([row[0] for row in confusion_matrix] + [row[1] for row in confusion_matrix]))
    matrix = np.zeros((len(labels), len(labels)))
    for row in confusion_matrix:
        matrix[labels.index(row[0])][labels.index(row[1])] = row[2]
    fig = px.imshow(matrix, labels=dict(x="Predicted", y="Actual", color="Count"), x=labels, y=labels, text_auto=True, color_continuous_scale="Blues")
    fig.update_layout(title="Confusion Matrix", xaxis_title="Predicted", yaxis_title="Actual")
    fig.show()

# Função para plotar a curva ROC
def plot_roc_curve(predictions, variavel_resposta):
    pdf = predictions.select("probability", variavel_resposta).toPandas()
    prob = np.array([p[1] for p in pdf["probability"]])
    true_labels = pdf[variavel_resposta]
    fpr, tpr, thresholds = roc_curve(true_labels, prob)
    roc_auc = auc(fpr, tpr)
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=fpr, y=tpr, mode="lines", name=f"ROC curve (area = {roc_auc:.2f})", line=dict(color="darkorange", width=2)))
    fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode="lines", name="Random (area = 0.5)", line=dict(color="navy", width=2, dash="dash")))
    fig.update_layout(title="Receiver Operating Characteristic (ROC)", xaxis_title="False Positive Rate", yaxis_title="True Positive Rate")
    fig.show()

# Função para plotar a curva Precision-Recall
def plot_pr_curve(predictions, variavel_resposta):
    pdf = predictions.select("probability", variavel_resposta).toPandas()
    prob = np.array([p[1] for p in pdf["probability"]])
    true_labels = pdf[variavel_resposta]
    precision, recall, thresholds = precision_recall_curve(true_labels, prob)
    pr_auc = auc(recall, precision)
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=recall, y=precision, mode="lines", name=f"PR curve (area = {pr_auc:.2f})", line=dict(color="blue", width=2)))
    fig.update_layout(title="Precision-Recall Curve", xaxis_title="Recall", yaxis_title="Precision")
    fig.show()

# Função para plotar a distribuição de probabilidades
def plot_probability_distribution(predictions, variavel_resposta):
    pdf = predictions.select("probability", variavel_resposta).toPandas()
    prob = np.array([p[1] for p in pdf["probability"]])
    true_labels = pdf[variavel_resposta]
    fig = go.Figure()
    fig.add_trace(go.Histogram(x=prob[true_labels == 1], name="Classe Positiva", opacity=0.75, marker_color="green"))
    fig.add_trace(go.Histogram(x=prob[true_labels == 0], name="Classe Negativa", opacity=0.75, marker_color="red"))
    fig.update_layout(title="Distribuição de Probabilidades", xaxis_title="Probabilidade da Classe Positiva", yaxis_title="Frequência", barmode="overlay")
    fig.show()

# Função para plotar a importância das features
def plot_feature_importance(model, feature_names):
    if hasattr(model, "featureImportances"):
        importances = model.featureImportances.toArray()
        importance_df = pd.DataFrame({"Feature": feature_names, "Importance": importances}).sort_values(by="Importance", ascending=False)
        fig = px.bar(importance_df, x="Feature", y="Importance", title="Importância das Features")
        fig.show()
    else:
        print("Este modelo não suporta importância de features.")

# Função para plotar a curva de calibração
def plot_calibration_curve(predictions, variavel_resposta):
    pdf = predictions.select("probability", variavel_resposta).toPandas()
    prob = np.array([p[1] for p in pdf["probability"]])
    true_labels = pdf[variavel_resposta]
    fraction_of_positives, mean_predicted_value = calibration_curve(true_labels, prob, n_bins=10)
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=mean_predicted_value, y=fraction_of_positives, mode="lines+markers", name="Curva de Calibração", line=dict(color="blue", width=2)))
    fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode="lines", name="Ideal", line=dict(color="red", width=2, dash="dash")))
    fig.update_layout(title="Curva de Calibração", xaxis_title="Probabilidade Média Prevista", yaxis_title="Fração de Positivos Reais")
    fig.show()

# Função para plotar a curva KS
def plot_ks_curve(predictions, variavel_resposta):
    sorted_predictions = predictions.orderBy(F.desc("probability"))
    tpr = sorted_predictions.withColumn("tpr", F.sum(variavel_resposta).over(Window.orderBy(F.desc("probability"))))
    fpr = sorted_predictions.withColumn("fpr", F.sum(1 - F.col(variavel_resposta)).over(Window.orderBy(F.desc("probability"))))
    ks_df = tpr.withColumn("ks", F.col("tpr") - F.col("fpr")).toPandas()
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=ks_df.index, y=ks_df["tpr"], mode="lines", name="TPR (True Positive Rate)", line=dict(color="blue", width=2)))
    fig.add_trace(go.Scatter(x=ks_df.index, y=ks_df["fpr"], mode="lines", name="FPR (False Positive Rate)", line=dict(color="red", width=2)))
    fig.add_trace(go.Scatter(x=ks_df.index, y=ks_df["ks"], mode="lines", name="KS", line=dict(color="green", width=2)))
    fig.update_layout(title="Curva KS", xaxis_title="Ranking das Probabilidades", yaxis_title="Taxa")
    fig.show()

# Função para plotar a curva de Lift
def plot_lift_curve(predictions, variavel_resposta):
    sorted_predictions = predictions.orderBy(F.desc("probability"))
    lift_df = sorted_predictions.withColumn("cumulative_gain", F.sum(variavel_resposta).over(Window.orderBy(F.desc("probability")))).toPandas()
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=lift_df.index, y=lift_df["cumulative_gain"], mode="lines", name="Lift Curve", line=dict(color="blue", width=2)))
    fig.update_layout(title="Curva de Lift", xaxis_title="Ranking das Probabilidades", yaxis_title="Ganho Cumulativo")
    fig.show()

# Função para otimizar hiperparâmetros com Optuna
def optimize_model(model_name, train_df, test_df, variavel_resposta, seed):
    def objective(trial):
            if model_name == "LogisticRegression":
                model = LogisticRegression(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    regParam=trial.suggest_float("regParam", 0.01, 10.0, log=True),
                    elasticNetParam=trial.suggest_float("elasticNetParam", 0.0, 1.0)
                )
            elif model_name == "DecisionTreeClassifier":
                model = DecisionTreeClassifier(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    maxDepth=trial.suggest_int("maxDepth", 2, 10),
                    minInstancesPerNode=trial.suggest_int("minInstancesPerNode", 1, 10)
                )
            elif model_name == "RandomForestClassifier":
                model = RandomForestClassifier(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    numTrees=trial.suggest_int("numTrees", 10, 100),
                    maxDepth=trial.suggest_int("maxDepth", 2, 10)
                )
            elif model_name == "GBTClassifier":
                model = GBTClassifier(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    maxIter=trial.suggest_int("maxIter", 10, 100),
                    maxDepth=trial.suggest_int("maxDepth", 2, 10)
                )
            elif model_name == "LinearSVC":
                model = LinearSVC(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    regParam=trial.suggest_float("regParam", 0.01, 10.0, log=True),
                    maxIter=trial.suggest_int("maxIter", 10, 100)
                )
            elif model_name == "NaiveBayes":
                model = NaiveBayes(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    smoothing=trial.suggest_float("smoothing", 0.0, 10.0)
                )
            elif model_name == "MultilayerPerceptronClassifier":
                model = MultilayerPerceptronClassifier(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    layers=[3, trial.suggest_int("hiddenLayerSize", 2, 10), 2],
                    maxIter=trial.suggest_int("maxIter", 10, 100)
                )
            elif model_name == "FMClassifier":
                model = FMClassifier(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    factorSize=trial.suggest_int("factorSize", 2, 10),
                    regParam=trial.suggest_float("regParam", 0.01, 10.0, log=True)
                )
            elif model_name == "LightGBMClassifier":
                model = LightGBMClassifier(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    numLeaves=trial.suggest_int("numLeaves", 10, 100),
                    maxDepth=trial.suggest_int("maxDepth", 2, 10),
                    learningRate=trial.suggest_float("learningRate", 0.01, 0.3, log=True)
                )
            elif model_name == "OneVsRest":
                base_model = LogisticRegression(
                    featuresCol='features',
                    labelCol=variavel_resposta,
                    regParam=trial.suggest_float("regParam", 0.01, 10.0, log=True),
                    elasticNetParam=trial.suggest_float("elasticNetParam", 0.0, 1.0)
                )
                model = OneVsRest(classifier=base_model)
                
                # Treinar e avaliar
                fitted_model = model.fit(train_df)
                predictions = fitted_model.transform(test_df)
                evaluator = MulticlassClassificationEvaluator(labelCol=variavel_resposta, predictionCol="prediction", metricName="accuracy")
                return evaluator.evaluate(predictions)
    
    study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler(seed=seed))
    study.optimize(objective, n_trials=10)
    return study.best_params

# Função principal
def main(df, features_cols, variavel_resposta, metricas_disponiveis, metrica_vencedora, seed, save_dir):
    spark = SparkSession.builder.appName("AutoML").getOrCreate()
    assembler = VectorAssembler(inputCols=features_cols, outputCol="features")
    df = assembler.transform(df)
    train_df, test_df = df.randomSplit([0.8, 0.2], seed=seed)
    
    # Definir todos os modelos
    models = [
        ("LogisticRegression", LogisticRegression(featuresCol='features', labelCol=variavel_resposta)),
        ("DecisionTreeClassifier", DecisionTreeClassifier(featuresCol='features', labelCol=variavel_resposta, seed=seed)),
        ("RandomForestClassifier", RandomForestClassifier(featuresCol='features', labelCol=variavel_resposta, seed=seed)),
        ("GBTClassifier", GBTClassifier(featuresCol='features', labelCol=variavel_resposta, seed=seed)),
        ("LinearSVC", LinearSVC(featuresCol='features', labelCol=variavel_resposta)),
        ("NaiveBayes", NaiveBayes(featuresCol='features', labelCol=variavel_resposta)),
        ("MultilayerPerceptronClassifier", MultilayerPerceptronClassifier(featuresCol='features', 
                                                                          labelCol=variavel_resposta, 
                                                                          layers=[len(features_cols), 5, 2], 
                                                                          seed=seed)),
        ("FMClassifier", FMClassifier(featuresCol='features', labelCol=variavel_resposta)),
        ("LightGBMClassifier", LightGBMClassifier(featuresCol='features', labelCol=variavel_resposta, predictionCol="prediction", seed=seed)),
        ("OneVsRest", OneVsRest(classifier=LogisticRegression(featuresCol='features', labelCol=variavel_resposta)))
    ]
    
    # Avaliar todos os modelos
    results = []
    for model_name, model in models:
        print(f"Avaliando {model_name}...")
        metrics, predictions = evaluate_model(model, train_df, test_df, variavel_resposta)
        results.append((model_name, metrics))
        print(f"{model_name} - Métricas: {metrics}")
    
    # Determinar o modelo vencedor com base na métrica escolhida
    winning_model_name, winning_metrics = max(results, key=lambda x: x[1][metrica_vencedora])
    print(f"Modelo vencedor: {winning_model_name} com {metrica_vencedora}: {winning_metrics[metrica_vencedora]}")
    
    # Otimizar o modelo vencedor
    print(f"Otimizando {winning_model_name} com Optuna...")
    best_params = optimize_model(winning_model_name, train_df, test_df, variavel_resposta, seed)
    print(f"Melhores hiperparâmetros para {winning_model_name}: {best_params}")
    
    # Treinar o modelo vencedor com os melhores hiperparâmetros
    if winning_model_name == "LogisticRegression":
        model = LogisticRegression(featuresCol='features', labelCol=variavel_resposta, **best_params)
    elif winning_model_name == "DecisionTreeClassifier":
        model = DecisionTreeClassifier(featuresCol='features', labelCol=variavel_resposta, seed=seed, **best_params)
    elif winning_model_name == "RandomForestClassifier":
        model = RandomForestClassifier(featuresCol='features', labelCol=variavel_resposta, seed=seed, **best_params)
    elif winning_model_name == "GBTClassifier":
        model = GBTClassifier(featuresCol='features', labelCol=variavel_resposta, seed=seed, **best_params)
    elif winning_model_name == "LinearSVC":
        model = LinearSVC(featuresCol='features', labelCol=variavel_resposta, **best_params)
    elif winning_model_name == "NaiveBayes":
        model = NaiveBayes(featuresCol='features', labelCol=variavel_resposta, **best_params)
    elif winning_model_name == "MultilayerPerceptronClassifier":
        model = MultilayerPerceptronClassifier(featuresCol='features', labelCol=variavel_resposta, 
                                               layers=[len(features_cols), 5, 2], seed=seed, **best_params)
    elif winning_model_name == "FMClassifier":
        model = FMClassifier(featuresCol='features', labelCol=variavel_resposta, **best_params)
    elif winning_model_name == "LightGBMClassifier":
        model = LightGBMClassifier(featuresCol='features', labelCol=variavel_resposta, predictionCol="prediction", seed=seed, **best_params)
    elif winning_model_name == "OneVsRest":
        model = OneVsRest(classifier=LogisticRegression(featuresCol='features', labelCol=variavel_resposta, **best_params))
    
    # Avaliar o modelo otimizado
    print(f"Avaliando {winning_model_name} otimizado...")
    optimized_metrics, optimized_predictions = evaluate_model(model, train_df, test_df, variavel_resposta)
    print(f"{winning_model_name} (Otimizado) - Métricas: {optimized_metrics}")
    
    # Salvar o modelo e os hiperparâmetros
    save_model_and_params(model, best_params, winning_model_name, save_dir)
    
    # Adicionar o modelo otimizado aos resultados
    optimized_results = {
        "Modelo": f"{winning_model_name} (Otimizado)",
        **optimized_metrics,
        "Hiperparâmetros": best_params
    }
    results.append(("Modelo Otimizado", optimized_results))
    
    # Converter resultados para DataFrame
    result_df = pd.DataFrame([{
        "Modelo": model_name,
        **metrics
    } for model_name, metrics in results])
    
    # Exibir o DataFrame de resultados
    print("Resultados dos modelos:")
    print(result_df)
    
    # Plotar gráficos apenas para o modelo vencedor otimizado
    if "ConfusionMatrix" in optimized_metrics:
        plot_confusion_matrix(optimized_metrics["ConfusionMatrix"])
    if "AUC" in optimized_metrics:
        plot_roc_curve(optimized_predictions, variavel_resposta)
        plot_pr_curve(optimized_predictions, variavel_resposta)
        plot_probability_distribution(optimized_predictions, variavel_resposta)
        plot_calibration_curve(optimized_predictions, variavel_resposta)
        plot_ks_curve(optimized_predictions, variavel_resposta)
        plot_lift_curve(optimized_predictions, variavel_resposta)
    if winning_model_name in ["RandomForestClassifier", "GBTClassifier"]:
        plot_feature_importance(model, features_cols)
    
    return result_df

In [None]:
# Exemplo de uso
if __name__ == "__main__":
    data = [(1.0, 2.0, 3.0, 0), (4.0, 5.0, 6.0, 1), (7.0, 8.0, 9.0, 0), (10.0, 11.0, 12.0, 1)]
    columns = ["feature1", "feature2", "feature3", "target"]
    spark = SparkSession.builder.appName("Example").getOrCreate()
    df = spark.createDataFrame(data, columns)
    
    features_cols = ["feature1", "feature2", "feature3"]
    variavel_resposta = "target"
    metricas_disponiveis = ["accuracy", "weightedPrecision", "weightedRecall", "f1", "AUC", "KS"]
    metrica_vencedora = "accuracy"
    seed = 42
    save_dir = "/dbfs/mnt/your_directory"
    
    result_df = main(df, features_cols, variavel_resposta, metricas_disponiveis, metrica_vencedora, seed, save_dir)