In [1]:
from pyspark.sql.types import FloatType, IntegerType
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RealEstatePriceAnalysis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.memory.fraction", 0.8) \
    .getOrCreate()

In [2]:
from pyspark.sql.functions import col

data = spark.read.option("header", "true").csv('23.csv', header=True, inferSchema=True, sep=',')
# Filtrage des données
print("Nombre de lignes avant filtrage : ", data.count())
data = data.filter(
    (col("nature_mutation").isin("Vente", "Vente en l’état futur d’achèvement")) &
    # ((col("nombre_lots") == 1) | (col("nombre_lots") == 0)) &
    # (col("code_nature_culture").isNotNull() & (col("code_nature_culture") == "S")) &
    # (col("surface_reelle_bati").isNotNull() & (col("surface_reelle_bati") != 0)) &
    (col("valeur_fonciere").isNotNull() & (col("valeur_fonciere") != 0)) &
    (col("longitude").isNotNull()) & (col("latitude").isNotNull())
)
print("Nombre de lignes après filtrage : ", data.count())

# Filtrage des mutations qui possèdent des vignes (code_nature_culture contenant "VI")
vignes_mutations = data.filter(col("code_nature_culture") == "VI").select("id_mutation").distinct()
# Filtrage des mutation qui possèdent des locals
local_mutations = data.filter(col("code_type_local") == "4").select("id_mutation").distinct()
removed_mutation = vignes_mutations.join(local_mutations, on="id_mutation", how="outer")

# Exclusion des mutations possédant des vignes et des locals
df_final = data.join(removed_mutation, on="id_mutation", how="left_anti")

Nombre de lignes avant filtrage :  1599284
Nombre de lignes après filtrage :  1472174


In [3]:
df_final.where((col("id_mutation") == "2023-1264625") | (col("id_mutation") == "2023-1264630")).show(20)

+-----------+-------------+------------------+---------------+---------------+--------------+---------------+----------------+-----------------+-----------+------------+-----------+----------------+-------------------+------------------+-----------+------------------+-------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+---------------+----------+-------------------+-------------------------+-------------------+--------------+----------------------------+-----------------------+---------------+---------+--------+
|id_mutation|date_mutation|numero_disposition|nature_mutation|valeur_fonciere|adresse_numero|adresse_suffixe|adresse_nom_voie|adresse_code_voie|code_postal|code_commune|nom_commune|code_departement|ancien_code_commune|ancien_nom_commune|id_parcelle|ancien_id_parcelle|numero_volume|lot1_numero|lot1_surface_carrez|lot2_numero|lot2_surface_carrez|lot

In [4]:
from pyspark.sql.functions import collect_list, struct, map_from_entries, to_json, expr

# Sélection des colonnes nécessaires
cols = data.columns
id_cadastre_col = "id_parcelle"  # ou la colonne qui identifie le cadastre
id_mutation_col = "id_mutation"
id_mutation_col = "adresse_numero"

# Collecter les données en utilisant collect_list et struct
mutation_structs = data.select(id_cadastre_col, id_mutation_col, *[col for col in cols if col != id_cadastre_col and col != id_mutation_col]) \
    .groupBy(id_cadastre_col, id_mutation_col) \
    .agg(collect_list(struct(*[col for col in cols if col != id_cadastre_col and col != id_mutation_col])).alias("properties"))

mutation_structs.where(col("id_parcelle") == "95256000AD0777").show(20, truncate=False)

+-----------+--------------+----------+
|id_parcelle|adresse_numero|properties|
+-----------+--------------+----------+
+-----------+--------------+----------+



In [5]:
from pyspark.sql.functions import approx_count_distinct, collect_set
# Sélection des parcelles qui apparaissent dans plusieurs mutations à une même date
df_grouped = df_final.groupBy("id_parcelle", "valeur_fonciere", "date_mutation").agg(
    approx_count_distinct("id_mutation").alias("unique_mutation_count"),
    collect_set("id_mutation").alias("mutation_ids")
)

# Filtrage des parcelles apparaissant dans plusieurs id_mutation à une même date
df_multiple_mutations = df_grouped.filter(col("unique_mutation_count") > 1)

df_multiple_mutations.show(truncate=False)

+--------------+---------------+-------------+---------------------+---------------------------------------+
|id_parcelle   |valeur_fonciere|date_mutation|unique_mutation_count|mutation_ids                           |
+--------------+---------------+-------------+---------------------+---------------------------------------+
|02810000AL0394|14000.0        |2023-05-02   |2                    |[2023-16643, 2023-16645]               |
|07010000AB0478|351000.0       |2023-07-31   |2                    |[2023-84053, 2023-84049]               |
|070360000B1427|60000.0        |2023-04-13   |2                    |[2023-80473, 2023-80212]               |
|11116000CW0027|34447.0        |2023-09-18   |2                    |[2023-113785, 2023-113834]             |
|11116000CW0027|78785.0        |2023-09-18   |2                    |[2023-113749, 2023-113832]             |
|12202000AM0459|11000.0        |2023-06-09   |2                    |[2023-121062, 2023-121059]             |
|132058210E0026|200

In [6]:
df_final.where(col("id_parcelle") == "040880000G1483").select("id_mutation", "date_mutation", "adresse_numero", "adresse_nom_voie", "code_postal", "nom_commune", "valeur_fonciere", "longitude", "latitude").show(20, truncate=False)

+-----------+-------------+--------------+---------------------+-----------+-----------+---------------+---------+---------+
|id_mutation|date_mutation|adresse_numero|adresse_nom_voie     |code_postal|nom_commune|valeur_fonciere|longitude|latitude |
+-----------+-------------+--------------+---------------------+-----------+-----------+---------------+---------+---------+
|2023-32500 |2023-02-28   |155           |RUE DOC CASIMIR CAIRE|4300       |Forcalquier|120000.0       |5.776414 |43.961318|
|2023-32500 |2023-02-28   |155           |RUE DOC CASIMIR CAIRE|4300       |Forcalquier|120000.0       |5.776414 |43.961318|
|2023-32996 |2023-02-28   |155           |RUE DOC CASIMIR CAIRE|4300       |Forcalquier|100000.0       |5.776414 |43.961318|
|2023-32996 |2023-02-28   |155           |RUE DOC CASIMIR CAIRE|4300       |Forcalquier|100000.0       |5.776414 |43.961318|
|2023-35370 |2023-09-20   |155           |RUE DOC CASIMIR CAIRE|4300       |Forcalquier|100000.0       |5.776414 |43.961318|


In [7]:
df_multiple_mutations.count()

669

In [8]:
from pyspark.sql.functions import col, sum as spark_sum

# Calcul de la surface totale par mutation
df_surface_totale = df_final.groupBy("id_mutation").agg(
    spark_sum("surface_reelle_bati").alias("surface_totale")
)

# Jointure pour obtenir la surface totale dans le DataFrame principal
df_joined = df_final.join(df_surface_totale, "id_mutation")

# Calcul de la valeur foncière proportionnelle pour chaque bien
df_final = df_joined.withColumn(
    "valeur_fonciere_proportionnelle",
    (col("surface_reelle_bati") / col("surface_totale")) * col("valeur_fonciere")
)
df_final = df_final.withColumn("prix_m2", col("valeur_fonciere_proportionnelle") / col("surface_reelle_bati"))

# Affichage du résultat
df_final.select(
    "id_mutation", "adresse_numero", "adresse_nom_voie", "code_postal", "nom_commune", "valeur_fonciere", "type_local", "valeur_fonciere_proportionnelle", "prix_m2", "surface_reelle_bati", "surface_totale"
).show(truncate=False)

+-----------+--------------+----------------+-----------+--------------------+---------------+-----------+-------------------------------+------------------+-------------------+--------------+
|id_mutation|adresse_numero|adresse_nom_voie|code_postal|nom_commune         |valeur_fonciere|type_local |valeur_fonciere_proportionnelle|prix_m2           |surface_reelle_bati|surface_totale|
+-----------+--------------+----------------+-----------+--------------------+---------------+-----------+-------------------------------+------------------+-------------------+--------------+
|2023-100028|NULL          |LE VILLAGE      |10130      |Villeneuve-au-Chemin|50000.0        |NULL       |NULL                           |NULL              |NULL               |83            |
|2023-100028|27            |RTE NATIONALE 77|10130      |Villeneuve-au-Chemin|50000.0        |Maison     |50000.0                        |602.4096385542168 |83                 |83            |
|2023-100028|27            |RTE NAT

In [9]:
df_final.select("code_nature_culture", "nature_culture").distinct().show()

+-------------------+-------------------+
|code_nature_culture|     nature_culture|
+-------------------+-------------------+
|                  E|               eaux|
|                 BM|     futaies mixtes|
|                 LB|     landes boisées|
|                  T|             terres|
|                 BS|taillis sous futaie|
|                 PA|            pâtures|
|                 BO|           oseraies|
|                 BF|  futaies feuillues|
|                  B|               bois|
|                 AG|terrains d'agrément|
|                 PC|            pacages|
|                 CA|          carrières|
|                 BT|    taillis simples|
|                  J|            jardins|
|                 PP|       prés plantes|
|                 BP|        peupleraies|
|                 TP|    terres plantées|
|                  P|               prés|
|                 CH|      chemin de fer|
|                 BR| futaies résineuses|
+-------------------+-------------

In [10]:
print("Nombre de lignes avant filtrage : ", df_final.count())
df_final = df_final.filter(
    (col("code_nature_culture").isNotNull() & (col("code_nature_culture") == "S")) 
)
print("Nombre de lignes après filtrage : ", df_final.count())

Nombre de lignes avant filtrage :  1298759
Nombre de lignes après filtrage :  428633


In [11]:
from pyspark.sql.functions import count

# Filtrage des lignes qui ont des id_mutation et valeur_fonciere identiques
duplicate_rows = df_final.groupBy("id_mutation", "valeur_fonciere") \
                   .agg(count("*").alias("count")) \
                   .filter(col("count") > 1)

# Joindre les lignes dupliquées avec le DataFrame original pour récupérer toutes les colonnes
duplicate_details = df_final.join(duplicate_rows, ["id_mutation", "valeur_fonciere"], "inner")

# Affichage des lignes dupliquées
duplicate_details.show(truncate=False)

+-----------+---------------+-------------+------------------+---------------+--------------+---------------+----------------------------+-----------------+-----------+------------+------------------+----------------+-------------------+------------------+--------------+------------------+-------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+---------------+----------+-------------------+-------------------------+-------------------+--------------+----------------------------+-----------------------+---------------+---------+---------+--------------+-------------------------------+------------------+-----+
|id_mutation|valeur_fonciere|date_mutation|numero_disposition|nature_mutation|adresse_numero|adresse_suffixe|adresse_nom_voie            |adresse_code_voie|code_postal|code_commune|nom_commune       |code_departement|ancien_code_commune|ancien_nom_commun

In [12]:
from pyspark.sql.functions import radians

# Appliquer la conversion aux colonnes longitude et latitude
df_final = df_final.withColumn('longitude_r', radians(col('longitude')))
df_final = df_final.withColumn('latitude_r', radians(col('latitude')))

# Sélection des colonnes requises
df_final.select(
    "id_mutation", "date_mutation", "valeur_fonciere", "prix_m2", "code_postal", "type_local", "surface_reelle_bati", "valeur_fonciere_proportionnelle", "surface_reelle_bati", "surface_totale", "longitude_r", "latitude_r"
).show(20)

# Optionnel : enregistrer le dataset filtré dans un nouveau fichier CSV
# data.write.option("header", "true").csv("path_to_save_filtered_dataset.csv")

+-----------+-------------+---------------+------------------+-----------+----------+-------------------+-------------------------------+-------------------+--------------+-------------------+------------------+
|id_mutation|date_mutation|valeur_fonciere|           prix_m2|code_postal|type_local|surface_reelle_bati|valeur_fonciere_proportionnelle|surface_reelle_bati|surface_totale|        longitude_r|        latitude_r|
+-----------+-------------+---------------+------------------+-----------+----------+-------------------+-------------------------------+-------------------+--------------+-------------------+------------------+
|2023-100028|   2023-05-26|        50000.0| 602.4096385542168|      10130|    Maison|                 83|                        50000.0|                 83|            83|0.06716293997049735| 0.839310371700587|
|2023-100028|   2023-05-26|        50000.0|              NULL|      10130|Dépendance|               NULL|                           NULL|               

In [13]:
df_final.where(col("valeur_fonciere") > 20000000).show(29)

+-----------+-------------+------------------+---------------+---------------+--------------+---------------+-----------------+-----------------+-----------+------------+--------------------+----------------+-------------------+------------------+--------------+------------------+-------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+-------------------+-----------+---------------+----------+-------------------+-------------------------+-------------------+--------------+----------------------------+-----------------------+---------------+---------+---------+--------------+-------------------------------+------------------+-------------------+------------------+
|id_mutation|date_mutation|numero_disposition|nature_mutation|valeur_fonciere|adresse_numero|adresse_suffixe| adresse_nom_voie|adresse_code_voie|code_postal|code_commune|         nom_commune|code_departement|ancien_code_commune|an

In [14]:
df_final.write.mode("overwrite").option("header", "true").option("inferSchema", "true").csv("dataset.csv")

In [15]:
spark.stop()