# Initialisation de la session Spark

In [None]:
# spark.stop()

In [None]:
from pyspark.sql import SparkSession
import os
from pyspark.ml.evaluation import RegressionEvaluator

spark = (SparkSession.builder
    .appName("house_pricing_model")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1")
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # Optimisations de performances
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.executor.cores", str(max(1, os.cpu_count() - 1)))
    .config("spark.driver.memory", "16g")
    .config("spark.executor.memory", "12g")
    .config("spark.memory.fraction", "0.8")
    .config("spark.memory.storageFraction", "0.2")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.sql.shuffle.partitions", str(os.cpu_count() * 2))
    .config("spark.default.parallelism", str(os.cpu_count() * 2))
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    # Configuration des logs
    .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=log4j.properties")
    .config("spark.executor.extraJavaOptions", "-Dlog4j.configuration=log4j.properties")
    .config("spark.sql.warnings.ignore", "true")
    .config("spark.log.level", "ERROR")
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=false")
    .getOrCreate())

In [None]:
info_df = spark.read.option("multiline", "true").option("mode", "PERMISSIVE").json("s3a://ml-datasets/house_price_model/latest/info.json")
version = info_df.collect()[0]['redirect_to']

# Chemins des fichiers
base_path = f"s3a://ml-datasets/house_price_model/{version}/"

# Chargement des données

In [None]:
# Chargement des DataFrames
df_full = spark.read.parquet(base_path + "full_dataset.parquet")
df_train = spark.read.parquet(base_path + "train.parquet")
df_validation = spark.read.parquet(base_path + "validation.parquet")
df_test = spark.read.parquet(base_path + "test.parquet")

# Affichage rapide
df_full.printSchema()
print("Nombre de lignes dans le dataset train :", df_train.count())

# Prétraitement des données

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler

def target_encoding(df, categorical_col='code_postal', target_col='valeur_fonciere', smoothing=10):
    """
    Applique un target encoding sur une colonne catégorielle par rapport à la moyenne de la colonne cible.

    Paramètres:
    -----------
    df : DataFrame Spark
        Le DataFrame contenant les données
    categorical_col : str, optionnel (défaut='code_postal')
        Le nom de la colonne catégorielle à encoder
    target_col : str, optionnel (défaut='valeur_fonciere')
        Le nom de la colonne cible dont on calcule la moyenne
    smoothing : int, optionnel (défaut=10)
        Facteur de lissage pour éviter l'overfitting sur les catégories peu fréquentes

    Return :
    ---------
    DataFrame : DataFrame Spark avec une nouvelle colonne contenant l'encodage
    """
    # Calculer la moyenne globale de la colonne cible
    global_avg = df.select(F.avg(target_col)).collect()[0][0]

    # Calculer les statistiques par catégorie
    category_stats = df.groupBy(categorical_col).agg(
        F.avg(target_col).alias('category_avg'),
        F.count('*').alias('category_count')
    )

    # Appliquer le lissage bayésien (smoothing)
    category_stats = category_stats.withColumn(
        'encoded_value',
        (F.col('category_avg') * F.col('category_count') + global_avg * smoothing) /
        (F.col('category_count') + smoothing)
    )

    # Joindre l'encodage au DataFrame original
    encoded_col_name = f"{categorical_col}_encoded"
    encoded_df = df.join(
        category_stats.select(categorical_col, 'encoded_value'),
        on=categorical_col,
        how='left'
    ).withColumnRenamed('encoded_value', encoded_col_name)

    # Gérer les valeurs manquantes avec la moyenne globale
    encoded_df = encoded_df.fillna({encoded_col_name: global_avg})

    return encoded_df

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import SQLTransformer

def prepare_features_with_encoding(df, target_column='valeur_fonciere',
                                         categorical_cols=['code_postal'],
                                         exclude_columns=None):
    """
    Crée un pipeline de prétraitement avec target encoding pour les colonnes catégorielles.

    Paramètres:
    -----------
    df : DataFrame Spark
        Le DataFrame d'entraînement contenant les données
    target_column : str, optionnel (défaut='valeur_fonciere')
        Le nom de la colonne cible à prédire
    categorical_cols : list, optionnel (défaut=['code_postal'])
        Liste des colonnes catégorielles à encoder
    exclude_columns : list, optionnel
        Liste des colonnes à exclure du vecteur de caractéristiques

    Retourne:
    ---------
    Tuple : (DataFrame transformé, PipelineModel de prétraitement)
    """
    # Initialiser la liste d'exclusion
    if exclude_columns is None:
        exclude_columns = []

    # Ajouter la colonne cible à la liste d'exclusion
    if target_column not in exclude_columns:
        exclude_columns.append(target_column)

    # Étapes du pipeline
    stages = []

    # Créer les transformations pour chaque colonne catégorielle
    # for cat_col in categorical_cols:
    #     # Calculer la moyenne globale et les statistiques par catégorie en utilisant SQLTransformer
    #     encoded_col = f"{cat_col}_encoded"
    #
    #     # Étape 1: Calculer les statistiques globales et par catégorie
    #     stats_calculator = SQLTransformer(
    #         statement=f"""
    #         WITH global_stats AS (
    #             SELECT AVG({target_column}) as global_avg FROM __THIS__
    #         ),
    #         category_stats AS (
    #             SELECT
    #                 {cat_col},
    #                 AVG({target_column}) as category_avg,
    #                 COUNT(*) as category_count
    #             FROM __THIS__
    #             GROUP BY {cat_col}
    #         )
    #         SELECT
    #             t.*,
    #             COALESCE(
    #                 (cs.category_avg * cs.category_count + gs.global_avg * 10) / (cs.category_count + 10),
    #                 gs.global_avg
    #             ) as {encoded_col}
    #         FROM __THIS__ t
    #         CROSS JOIN global_stats gs
    #         LEFT JOIN category_stats cs ON t.{cat_col} = cs.{cat_col}
    #         """
    #     )
    #     stages.append(stats_calculator)

    # Identifier les colonnes numériques
    numeric_columns = [col for col, dtype in df.dtypes
                      if dtype in ['int', 'double', 'float']
                      and col not in exclude_columns]

    # Ajouter les colonnes encodées
    # encoded_columns = [f"{col}_encoded" for col in categorical_cols]

    # Toutes les colonnes à utiliser comme features
    # feature_columns = numeric_columns + encoded_columns
    feature_columns = numeric_columns
    print(f"Colonnes utilisées comme caractéristiques: {feature_columns}")

    # Ajouter l'étape d'assemblage des vecteurs
    assembler = VectorAssembler(
        inputCols=feature_columns,
        outputCol="features",
        handleInvalid="skip"
    )
    stages.append(assembler)

    # Créer et appliquer le pipeline
    pipeline = Pipeline(stages=stages)
    pipeline_model = pipeline.fit(df)

    # Transformer le DataFrame d'entraînement
    df_transformed = pipeline_model.transform(df).select("features", target_column)

    return df_transformed, pipeline_model

In [None]:
preprocessing_model = prepare_features_with_encoding(df_train)[1]

df_train_features = preprocessing_model.transform(df_train).select("features", "valeur_fonciere")
df_validation_features = preprocessing_model.transform(df_validation).select("features", "valeur_fonciere")
df_test_features = preprocessing_model.transform(df_test).select("features", "valeur_fonciere")

# Création Baseline

In [None]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
def create_baselines_from_raw_df(df_train, df_validation, df_test, target_column="valeur_fonciere"):
    """
    Crée plusieurs modèles baseline directement à partir des données brutes (non vectorisées)

    Paramètres:
    -----------
    df_train : DataFrame Spark
        DataFrame d'entraînement brut avec toutes les colonnes originales
    df_validation : DataFrame Spark
        DataFrame de validation brut avec toutes les colonnes originales
    df_test : DataFrame Spark
        DataFrame de test brut avec toutes les colonnes originales
    target_column : str
        Nom de la colonne cible à prédire

    Retourne:
    ---------
    dict : Dictionnaire des performances des différents modèles baseline
    """
    results = {}

    # Vérifier que les colonnes nécessaires existent
    required_cols = ["surface_reelle_bati", "code_postal", target_column, "nombre_pieces_principales"]
    for col in required_cols:
        if col not in df_train.columns:
            raise ValueError(f"La colonne '{col}' est requise mais n'est pas présente dans le DataFrame")

    print("Schéma du DataFrame d'entraînement:")
    df_train.printSchema()

    # 1. Baseline #1: Moyenne globale
    global_mean = df_train.select(F.mean(target_column).alias("prediction")).collect()[0][0]
    print(f"Moyenne globale des prix: {global_mean:.2f}")

    # Prédiction avec la moyenne globale
    global_mean_predictions = df_validation.withColumn("prediction", F.lit(global_mean))

    # Évaluation
    evaluator = RegressionEvaluator(
        labelCol=target_column,
        predictionCol="prediction",
        metricName="rmse"
    )

    rmse = evaluator.evaluate(global_mean_predictions)
    mae = evaluator.setMetricName("mae").evaluate(global_mean_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(global_mean_predictions)

    results["moyenne_globale"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2
    }

    print(f"Baseline Moyenne Globale: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")

    # 2. Baseline #2: Moyenne par code postal
    print("Calcul de la baseline par code postal...")
    postal_code_means = df_train.groupBy("code_postal").agg(
        F.mean(target_column).alias("postal_mean"),
        F.count("*").alias("count")
    )

    # Nombre de codes postaux uniques
    num_postal_codes = postal_code_means.count()
    print(f"Nombre de codes postaux uniques: {num_postal_codes}")

    # Joindre avec le dataset de validation
    postal_predictions = df_validation.join(
        postal_code_means,
        on="code_postal",
        how="left"
    )

    # Utiliser la moyenne globale pour les codes postaux non présents dans l'ensemble d'entraînement
    postal_predictions = postal_predictions.withColumn(
        "prediction",
        F.coalesce(F.col("postal_mean"), F.lit(global_mean))
    )

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(postal_predictions)
    mae = evaluator.setMetricName("mae").evaluate(postal_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(postal_predictions)

    results["moyenne_code_postal"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2
    }

    print(f"Baseline Moyenne par Code Postal: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")

    # 3. Baseline #3: Régression linéaire sur la surface uniquement
    # Créer un vecteur avec une seule caractéristique
    surface_assembler = VectorAssembler(
        inputCols=["surface_reelle_bati"],
        outputCol="surface_feature",
        handleInvalid="skip"
    )

    train_surface = surface_assembler.transform(df_train)
    validation_surface = surface_assembler.transform(df_validation)

    # Entraîner une régression linéaire simple
    lr_surface = LinearRegression(
        featuresCol="surface_feature",
        labelCol=target_column,
        maxIter=10,
        regParam=0.0,
        elasticNetParam=0.0
    )

    lr_surface_model = lr_surface.fit(train_surface)
    validation_predictions = lr_surface_model.transform(validation_surface)

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(validation_predictions)
    mae = evaluator.setMetricName("mae").evaluate(validation_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(validation_predictions)

    results["regression_surface"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "coefficients": lr_surface_model.coefficients.toArray().tolist(),
        "intercept": lr_surface_model.intercept
    }

    print(f"Baseline Régression Surface: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")
    print(f"Équation: prix = {lr_surface_model.intercept:.2f} + {lr_surface_model.coefficients[0]:.2f} × surface")

    # 4. Baseline #4: Régression linéaire sur le nombre de pièces uniquement
    pieces_assembler = VectorAssembler(
        inputCols=["nombre_pieces_principales"],
        outputCol="pieces_feature",
        handleInvalid="skip"
    )

    train_pieces = pieces_assembler.transform(df_train)
    validation_pieces = pieces_assembler.transform(df_validation)

    # Entraîner une régression linéaire simple
    lr_pieces = LinearRegression(
        featuresCol="pieces_feature",
        labelCol=target_column,
        maxIter=10,
        regParam=0.0,
        elasticNetParam=0.0
    )

    lr_pieces_model = lr_pieces.fit(train_pieces)
    validation_predictions = lr_pieces_model.transform(validation_pieces)

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(validation_predictions)
    mae = evaluator.setMetricName("mae").evaluate(validation_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(validation_predictions)

    results["regression_pieces"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "coefficients": lr_pieces_model.coefficients.toArray().tolist(),
        "intercept": lr_pieces_model.intercept
    }

    print(f"Baseline Régression Nombre de Pièces: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")
    print(f"Équation: prix = {lr_pieces_model.intercept:.2f} + {lr_pieces_model.coefficients[0]:.2f} × nombre_pieces")

    # 5. Baseline #5: Régression linéaire sur surface et nombre de pièces
    multi_assembler = VectorAssembler(
        inputCols=["surface_reelle_bati", "nombre_pieces_principales"],
        outputCol="multi_features",
        handleInvalid="skip"
    )

    train_multi = multi_assembler.transform(df_train)
    validation_multi = multi_assembler.transform(df_validation)

    # Entraîner une régression linéaire multi-features
    lr_multi = LinearRegression(
        featuresCol="multi_features",
        labelCol=target_column,
        maxIter=10,
        regParam=0.0,
        elasticNetParam=0.0
    )

    lr_multi_model = lr_multi.fit(train_multi)
    validation_predictions = lr_multi_model.transform(validation_multi)

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(validation_predictions)
    mae = evaluator.setMetricName("mae").evaluate(validation_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(validation_predictions)

    results["regression_surface_pieces"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "coefficients": lr_multi_model.coefficients.toArray().tolist(),
        "intercept": lr_multi_model.intercept
    }

    print(f"Baseline Régression Surface + Pièces: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")
    print(f"Équation: prix = {lr_multi_model.intercept:.2f} + {lr_multi_model.coefficients[0]:.2f} × surface + {lr_multi_model.coefficients[1]:.2f} × pièces")

    # 6. Baseline #6: Régression linéaire sur toutes les features numériques
    # Identifier les colonnes numériques (hors target)
    numeric_cols = [col for col, dtype in df_train.dtypes
                  if dtype in ['int', 'double', 'float']
                  and col != target_column]

    print(f"Colonnes numériques utilisées pour la régression linéaire: {numeric_cols}")

    # Assembler toutes les features numériques
    all_features_assembler = VectorAssembler(
        inputCols=numeric_cols,
        outputCol="features",
        handleInvalid="skip"
    )

    train_all_features = all_features_assembler.transform(df_train)
    validation_all_features = all_features_assembler.transform(df_validation)

    # Entraîner une régression linéaire sur toutes les features
    lr_all = LinearRegression(
        featuresCol="features",
        labelCol=target_column,
        maxIter=10,
        regParam=0.01,
        elasticNetParam=0.0
    )

    lr_all_model = lr_all.fit(train_all_features)
    validation_predictions = lr_all_model.transform(validation_all_features)

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(validation_predictions)
    mae = evaluator.setMetricName("mae").evaluate(validation_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(validation_predictions)

    results["regression_lineaire"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "coefficients": lr_all_model.coefficients.toArray().tolist(),
        "intercept": lr_all_model.intercept
    }

    print(f"Baseline Régression Linéaire (toutes features): RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")

    # 7. Baseline #7: Arbre de décision simple (profondeur limitée)
    dt = DecisionTreeRegressor(
        featuresCol="features",
        labelCol=target_column,
        maxDepth=3  # Profondeur très limitée pour éviter l'overfitting
    )

    dt_model = dt.fit(train_all_features)
    validation_predictions = dt_model.transform(validation_all_features)

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(validation_predictions)
    mae = evaluator.setMetricName("mae").evaluate(validation_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(validation_predictions)

    results["arbre_decision_simple"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2
    }

    print(f"Baseline Arbre de Décision Simple: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")

    # 8. Baseline #8: Régression linéaire avec transformation logarithmique
    # Filtrer les valeurs négatives ou nulles (qui causeraient des problèmes avec log)
    train_log_filtered = df_train.filter(F.col(target_column) > 0)
    validation_log_filtered = df_validation.filter(F.col(target_column) > 0)

    # Appliquer la transformation log
    train_log = all_features_assembler.transform(
        train_log_filtered.withColumn("log_target", F.log1p(F.col(target_column)))
    )
    validation_log = all_features_assembler.transform(
        validation_log_filtered.withColumn("log_target", F.log1p(F.col(target_column)))
    )

    # Entraîner une régression linéaire sur le log des prix
    lr_log = LinearRegression(
        featuresCol="features",
        labelCol="log_target",
        maxIter=10,
        regParam=0.01,
        elasticNetParam=0.0
    )

    lr_log_model = lr_log.fit(train_log)
    validation_log_pred = lr_log_model.transform(validation_log)

    # Convertir les prédictions pour les ramener à l'échelle originale
    validation_pred = validation_log_pred.withColumn(
        "prediction",
        F.expm1(F.col("prediction"))
    ).select("prediction", target_column)

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(validation_pred)
    mae = evaluator.setMetricName("mae").evaluate(validation_pred)
    r2 = evaluator.setMetricName("r2").evaluate(validation_pred)

    results["regression_log_transformee"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "coefficients": lr_log_model.coefficients.toArray().tolist(),
        "intercept": lr_log_model.intercept
    }

    print(f"Baseline Régression Log-transformée: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")

    # 9. Baseline #9: Prix au m² moyen (global) × surface
    # Calculer le prix moyen au m²
    price_per_sqm = df_train.filter(F.col("surface_reelle_bati") > 0).withColumn(
        "price_per_sqm", F.col(target_column) / F.col("surface_reelle_bati")
    ).select(F.mean("price_per_sqm")).collect()[0][0]

    print(f"Prix moyen au m²: {price_per_sqm:.2f}€")

    # Appliquer ce prix moyen aux surfaces du jeu de validation
    ppm_predictions = df_validation.withColumn(
        "prediction", F.col("surface_reelle_bati") * F.lit(price_per_sqm)
    )

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(ppm_predictions)
    mae = evaluator.setMetricName("mae").evaluate(ppm_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(ppm_predictions)

    results["prix_m2_moyen"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "price_per_sqm": price_per_sqm
    }

    print(f"Baseline Prix au m² moyen: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")
    print(f"Équation: prix = {price_per_sqm:.2f} × surface")

    # 10. Baseline #10: Prix au m² par code postal × surface
    # Calculer le prix moyen au m² par code postal
    price_per_sqm_by_postal = df_train.filter(F.col("surface_reelle_bati") > 0).withColumn(
        "price_per_sqm", F.col(target_column) / F.col("surface_reelle_bati")
    ).groupBy("code_postal").agg(
        F.mean("price_per_sqm").alias("postal_ppm"),
        F.count("*").alias("count")
    )

    # Calculer également la moyenne globale pour les codes postaux manquants

    # Joindre avec le jeu de validation
    ppm_postal_predictions = df_validation.join(
        price_per_sqm_by_postal,
        on="code_postal",
        how="left"
    ).withColumn(
        "postal_ppm",
        F.coalesce(F.col("postal_ppm"), F.lit(price_per_sqm))
    ).withColumn(
        "prediction",
        F.col("surface_reelle_bati") * F.col("postal_ppm")
    )

    # Évaluation
    rmse = evaluator.setMetricName("rmse").evaluate(ppm_postal_predictions)
    mae = evaluator.setMetricName("mae").evaluate(ppm_postal_predictions)
    r2 = evaluator.setMetricName("r2").evaluate(ppm_postal_predictions)

    results["prix_m2_par_code_postal"] = {
        "rmse": rmse,
        "mae": mae,
        "r2": r2
    }

    print(f"Baseline Prix au m² par code postal: RMSE = {rmse:.2f}, MAE = {mae:.2f}, R² = {r2:.4f}")

    # Résumé des performances
    print("\nRésumé des performances des modèles baseline:")
    for model, metrics in results.items():
        print(f"{model}: RMSE = {metrics['rmse']:.2f}, R² = {metrics['r2']:.4f}")

    return results

In [None]:
def visualize_baseline_results(results):
    """
    Visualise les performances des différents modèles baseline

    Paramètres:
    -----------
    results : dict
        Dictionnaire des performances des différents modèles
    """
    # Extraction des métriques pour chaque modèle
    models = list(results.keys())
    rmse_values = [results[model]["rmse"] for model in models]
    r2_values = [results[model]["r2"] for model in models]

    # Création d'un dataframe pour faciliter la visualisation
    performance_df = pd.DataFrame({
        "Modèle": models,
        "RMSE": rmse_values,
        "R²": r2_values
    })

    # Trier par RMSE croissant (meilleure performance)
    performance_df = performance_df.sort_values("RMSE")

    # Visualisation des RMSE
    plt.figure(figsize=(14, 10))
    plt.subplot(2, 1, 1)
    bars = plt.barh(performance_df["Modèle"], performance_df["RMSE"], color="skyblue")
    plt.xlabel("RMSE (€)")
    plt.title("Comparaison des modèles baseline - RMSE")
    plt.grid(axis="x", linestyle="--", alpha=0.7)

    # Ajouter les valeurs sur les barres
    for i, bar in enumerate(bars):
        plt.text(bar.get_width() + 5000, bar.get_y() + bar.get_height()/2,
                f'{performance_df["RMSE"].iloc[i]:.0f}€',
                va='center', fontsize=9)

    # Visualisation des R²
    plt.subplot(2, 1, 2)
    bars = plt.barh(performance_df["Modèle"], performance_df["R²"], color="lightgreen")
    plt.xlabel("R²")
    plt.title("Comparaison des modèles baseline - R²")
    plt.grid(axis="x", linestyle="--", alpha=0.7)

    # Ajouter les valeurs sur les barres
    for i, bar in enumerate(bars):
        plt.text(bar.get_width() + 0.02, bar.get_y() + bar.get_height()/2,
                f'{performance_df["R²"].iloc[i]:.4f}',
                va='center', fontsize=9)

    plt.tight_layout()
    plt.savefig("baseline_models_comparison.png")
    plt.show()

    return performance_df

In [None]:
def analyze_linear_coefficients(results, feature_names=None):
    """
    Analyse les coefficients des modèles de régression linéaire

    Paramètres:
    -----------
    results : dict
        Dictionnaire des performances avec les coefficients
    feature_names : list, optionnel
        Noms des features (pour la régression linéaire)
    """
    # Modèles de régression à analyser
    regression_models = {}

    # Extraire les modèles avec des coefficients
    for model_name, metrics in results.items():
        if "coefficients" in metrics and "intercept" in metrics:
            regression_models[model_name] = {
                "coefficients": metrics["coefficients"],
                "intercept": metrics["intercept"]
            }

    if not regression_models:
        print("Aucun modèle de régression avec coefficients trouvé.")
        return

    print("\nAnalyse des coefficients des modèles de régression:")

    for model_name, model_info in regression_models.items():
        print(f"\nModèle: {model_name}")
        print(f"Intercept: {model_info['intercept']:.2f}")

        coefs = model_info["coefficients"]

        if feature_names and len(coefs) == len(feature_names):
            for i, (coef, name) in enumerate(zip(coefs, feature_names)):
                print(f"  {name}: {coef:.2f}")

            # Visualisation des coefficients
            plt.figure(figsize=(10, 6))
            bars = plt.barh(feature_names, coefs, color="lightblue")
            plt.axvline(0, color='gray', linestyle='--')
            plt.title(f"Coefficients du modèle {model_name}")
            plt.grid(axis="x", linestyle="--", alpha=0.7)

            # Ajouter les valeurs sur les barres
            for i, bar in enumerate(bars):
                plt.text(bar.get_width() + (0.01 if bar.get_width() >= 0 else -0.01),
                        bar.get_y() + bar.get_height()/2,
                        f'{coefs[i]:.2f}',
                        va='center', ha='left' if bar.get_width() >= 0 else 'right',
                        fontsize=9)

            plt.tight_layout()
            plt.savefig(f"{model_name}_coefficients.png")
            plt.show()
        else:
            print(f"  Coefficients: {coefs}")

In [None]:
# 1. Créer et évaluer les baselines à partir des données brutes
baseline_results = create_baselines_from_raw_df(
    df_train,              # DataFrame original avec toutes les colonnes
    df_validation,
    df_test,
    target_column="valeur_fonciere"
)

# 2. Visualiser les résultats
performance_df = visualize_baseline_results(baseline_results)

# 3. Analyser les coefficients des modèles linéaires
numeric_cols = [col for col, dtype in df_train.dtypes
              if dtype in ['int', 'double', 'float']
              and col != "valeur_fonciere"]
analyze_linear_coefficients(baseline_results, feature_names=numeric_cols)

### Observations clés

La localisation est cruciale:
- Le modèle "Prix au m² par code postal" est le plus performant, expliquant 33.7% de la variance des prix. Cela confirme l'importance de la localisation dans la valorisation immobilière.
- La surface est plus explicative que le nombre de pièces: La régression basée uniquement sur la surface (R² = 0.1850) est bien meilleure que celle basée sur le nombre de pièces (R² = 0.0941).
- Interaction surface/pièces peu significative: La combinaison de surface et pièces (R² = 0.1858) n'améliore que très légèrement la performance par rapport à la surface seule, et on note même un coefficient négatif pour les pièces (-3,572€) lorsque la surface est déjà prise en compte.
- Transformations log peu efficaces: La régression logarithmique ne semble pas améliorer les performances, ce qui suggère que malgré une probable asymétrie, la relation prix/variables reste assez linéaire.
- Effet des autres variables numériques: La régression linéaire utilisant toutes les features numériques (R² = 0.2534) est nettement meilleure que celle utilisant uniquement la surface, ce qui montre l'importance des autres facteurs comme l'année, le terrain, et les coordonnées géographiques.

### Analyse des coefficients de la régression linéaire complète

- code_type_local: +150,170€ par unité, suggérant une forte influence du type de bien
- latitude: -55,164€ par degré, indiquant que les prix diminuent en allant vers le nord dans votre dataset
- annee_mutation: +17,872€ par année, ce qui montre une inflation immobilière significative
- longitude: +6,219€ par degré, montrant un gradient est-ouest des prix
- nombre_pieces_principales: +3,050€ par pièce (après contrôle des autres variables)
- surface_reelle_bati: +1,950€ par m² (en contrôlant pour les autres variables)
- ratio_terrain_bati: +194€ par unité, impact positif mais modéré
- surface_terrain: +0.98€ par m², impact très faible par rapport à la surface bâtie

# Entraînement du modèle GBTRegressor

In [None]:
import numpy as np
from pyspark.ml.regression import GBTRegressor
import pyspark.sql.functions as F
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

def train_gradient_boosting(train_df, val_df, target_column='valeur_fonciere'):
    """
    Entraîne un modèle GBT en optimisant l'utilisation des ressources disponibles

    Args:
        train_df (pyspark.sql.DataFrame): DataFrame d'entraînement
        val_df (pyspark.sql.DataFrame): DataFrame de validation
        target_column (str): Nom de la colonne cible

    Returns :
        tuple : Modèle entraîné et métriques de performance
    """

    # Mettre en cache les DataFrames
    train_df.cache()
    val_df.cache()

    # Forcer une action pour matérialiser les caches
    train_count = train_df.count()
    val_count = val_df.count()
    print(f"Train set: {train_count} rows, Validation set: {val_count} rows")

    # Définir une grille de paramètres à tester
    gbt = GBTRegressor(
        featuresCol="features",
        labelCol=target_column,
        predictionCol="predicted_" + target_column
    )

    param_grid = ParamGridBuilder() \
        .addGrid(gbt.maxDepth, [8]) \
        .addGrid(gbt.maxIter, [30, 40]) \
        .addGrid(gbt.stepSize, [0.07, 0.08]) \
        .addGrid(gbt.subsamplingRate, [0.8, 0.9]) \
        .build()

    evaluator = RegressionEvaluator(
        labelCol=target_column,
        predictionCol="predicted_" + target_column,
        metricName="rmse"
    )

    cv = CrossValidator(
        estimator=gbt,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=2,
        parallelism=14,
    )

    print(f"Début de la recherche d'hyperparamètres...")
    cv_model = cv.fit(train_df)
    best_model = cv_model.bestModel
    print("Recherche terminée")

    # Extraire les meilleurs hyperparamètres
    best_params = {}
    if hasattr(cv_model, 'getEstimatorParamMaps'):
        # Récupérer l'index du meilleur modèle
        best_index = np.argmin(cv_model.avgMetrics)
        # Récupérer les paramètres correspondants
        best_params = {str(k): v for k, v in cv_model.getEstimatorParamMaps()[best_index].items()}
    else:
        # Essayer de récupérer directement les paramètres du meilleur modèle
        try:
            best_params = {
                "maxDepth": cv_model.bestModel.getMaxDepth(),
                "maxIter": cv_model.bestModel.getMaxIter(),
                "stepSize": cv_model.bestModel.getStepSize(),
                "subsamplingRate": cv_model.bestModel.getSubsamplingRate()
            }
        except:
            print("Impossible de récupérer les paramètres du meilleur modèle")

    # Générer des prédictions
    print("Génération des prédictions...")
    predictions = best_model.transform(val_df).select(
        target_column,
        "predicted_" + target_column
    )

    # Évaluation du modèle
    print("Évaluation du modèle...")
    evaluator = RegressionEvaluator(
        labelCol=target_column,
        predictionCol="predicted_" + target_column
    )

    # Calculer les métriques
    metrics = {}
    for metric_name in ["rmse", "mae", "r2"]:
        evaluator.setMetricName(metric_name)
        metrics[metric_name] = evaluator.evaluate(predictions)

    # Calculer le pourcentage d'erreur
    predictions_with_error = predictions.withColumn(
        "percent_error",
        F.abs(F.col("predicted_" + target_column) - F.col(target_column)) / F.col(target_column) * 100
    )

    error_stats = predictions_with_error.select(
        F.avg("percent_error").alias("avg_percent_error"),
        F.expr("percentile(percent_error, 0.5)").alias("median_percent_error")
    ).collect()[0]

    metrics["avg_percent_error"] = error_stats["avg_percent_error"]
    metrics["median_percent_error"] = error_stats["median_percent_error"]

    # Ajouter les informations du modèle
    try:
        # Pour les modèles GBT de PySpark
        metrics["num_trees"] = best_model.trees
    except:
        try:
            # Autre méthode alternative
            metrics["num_trees"] = best_model.getNumTrees()
        except:
            metrics["num_trees"] = None

    # Tentative de récupération des importances des features
    try:
        metrics["feature_importances"] = best_model.featureImportances.toArray().tolist()
    except:
        metrics["feature_importances"] = None

    metrics["best_params"] = best_params

    # Libérer les caches
    train_df.unpersist()
    val_df.unpersist()

    # Afficher un résumé
    print(f"Performances du modèle: RMSE={metrics['rmse']:.2f}, MAE={metrics['mae']:.2f}, R²={metrics['r2']:.4f}")
    print(f"Erreur moyenne: {metrics['avg_percent_error']:.2f}%, Erreur médiane: {metrics['median_percent_error']:.2f}%")

    return best_model, metrics

In [None]:
# Entraîner le modèle Gradient Boosting
model, metrics = train_gradient_boosting(    df_train_features,
    df_validation_features,
    target_column='valeur_fonciere')

In [None]:
# Appliquer le modèle sur les données de test
predictions_test = model.transform(df_test_features)

# Créer un évaluateur
evaluator = RegressionEvaluator(
    labelCol="valeur_fonciere",
    predictionCol="predicted_valeur_fonciere"
)

# Calculer les métriques sur le jeu de test
test_metrics = {}
for metric_name in ["rmse", "mae", "r2"]:
    evaluator.setMetricName(metric_name)
    test_metrics[metric_name] = evaluator.evaluate(predictions_test)

# Afficher les résultats
print("\nPerformances sur le jeu de test:")
print(f"RMSE: {test_metrics['rmse']:.2f}")
print(f"MAE: {test_metrics['mae']:.2f}")
print(f"R²: {test_metrics['r2']:.4f}")

# Pour comparer avec les performances sur le jeu de validation
print("\nComparaison validation vs test:")
print(f"RMSE - Validation: {metrics['rmse']:.2f}, Test: {test_metrics['rmse']:.2f}")
print(f"R² - Validation: {metrics['r2']:.4f}, Test: {test_metrics['r2']:.4f}")

price_comparison = predictions_test.select(
    "valeur_fonciere",
    "predicted_valeur_fonciere",
    (F.col("predicted_valeur_fonciere") - F.col("valeur_fonciere")).alias("error"),
    F.abs(F.col("predicted_valeur_fonciere") - F.col("valeur_fonciere")).alias("absolute_error"),
    (F.abs(F.col("predicted_valeur_fonciere") - F.col("valeur_fonciere")) / F.col("valeur_fonciere") * 100).alias("percent_error")
)

# Afficher quelques exemples
print("\nExemples de prédictions:")
price_comparison.orderBy(F.rand()).limit(10).show(truncate=False)

# Calcul des statistiques d'erreur avec erreur absolue
print("\nStatistiques d'erreur:")
price_comparison.select(
    F.avg("absolute_error").alias("Erreur absolue moyenne"),
    F.avg("percent_error").alias("% d'erreur moyen"),
    F.expr("percentile(percent_error, 0.5)").alias("% d'erreur médian")
).show()

In [None]:
import os
import tempfile
from minio import Minio

def save_model_to_minio(model, model_name):
    """
    Sauvegarde simplement un modèle PySpark dans le bucket ml_models de MinIO

    Args:
        model: Le modèle PySpark à sauvegarder
        model_name (str): Nom du modèle (sera utilisé comme dossier dans le bucket)

    Returns:
        bool: True si la sauvegarde a réussi, False sinon
    """
    # Configuration du client MinIO
    minio_client = Minio(
        endpoint="localhost:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False
    )

    bucket_name = "models"

    try:
        # Vérifier si le bucket existe, sinon le créer
        if not minio_client.bucket_exists(bucket_name):
            minio_client.make_bucket(bucket_name)
            print(f"Bucket '{bucket_name}' créé")

        # Créer un dossier temporaire pour sauvegarder le modèle
        with tempfile.TemporaryDirectory() as temp_dir:
            local_model_path = os.path.join(temp_dir, "model")

            # Sauvegarder le modèle PySpark localement
            model.save(local_model_path)

            # Télécharger tous les fichiers du modèle vers MinIO
            for root, dirs, files in os.walk(local_model_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    # Construire le chemin dans MinIO
                    object_name = f"{model_name}/{os.path.relpath(file_path, local_model_path)}"

                    # Uploader le fichier
                    minio_client.fput_object(bucket_name, object_name, file_path)

            print(f"Modèle sauvegardé avec succès dans {bucket_name}/{model_name}")
            return True

    except Exception as e:
        print(f"Erreur lors de la sauvegarde du modèle: {e}")
        return False

In [None]:
save_model_to_minio(model, "immobilier_value_predictor_model")