In [0]:
from pyspark.sql.functions import *

### import all table from azure container to databricks deltalake

In [0]:
display(dbutils.fs.ls("/mnt/monblob"))


In [0]:
# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/flux_nouveaux_emprunts.csv").write.mode("overwrite").saveAsTable("flux_nouveaux_emprunts")

# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/foyers_fiscaux.csv").write.mode("overwrite").saveAsTable("foyers_fiscaux")

# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/loyers.csv").write.mode("overwrite").saveAsTable("loyers")

# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/parc_immobilier.csv").write.mode("overwrite").saveAsTable("parc_immobilier")

# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/taux_endettement.csv").write.mode("overwrite").saveAsTable("taux_endettement")

# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/transactions.csv").write.mode("overwrite").saveAsTable("transactions")

# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/indice_reference_loyers.csv").write.mode("overwrite").saveAsTable("indice_reference_loyers")

# spark.read.format("csv").option("header", "true").load("dbfs:/mnt/monblob/taux_interet.csv").write.mode("overwrite").saveAsTable("taux_interet")


In [0]:
transactions = spark.sql("select * from hive_metastore.default.transactions")
foyers_fiscaux = spark.sql("select * from hive_metastore.default.foyers_fiscaux")
indice_loyers = spark.sql("select * from hive_metastore.default.indice_reference_loyers")
loyers = spark.sql("select * from hive_metastore.default.loyers")
parc_immobilier = spark.sql("select * from hive_metastore.default.parc_immobilier")
taux_endettement = spark.sql("select * from hive_metastore.default.taux_endettement")

In [0]:
transactions.display()

In [0]:
def convert_dept(col):
    return when((col == '2A') | (col == '2B'), '20').otherwise(col)

transactions = transactions.withColumn('departement', convert_dept(col('departement'))).withColumn('prix', col('prix').cast('float'))
loyers = loyers.withColumn('departement', convert_dept(col('departement')))
parc_immobilier = parc_immobilier.withColumn('departement', convert_dept(col('departement')))



In [0]:
# Préparation des données
indice_loyers = (indice_loyers
    .withColumn('quarter', to_date('quarter'))
    .withColumn('year', year('quarter'))
    .groupBy('year')
    .agg(avg('IRL').alias('IRL'))
)


In [0]:
indice_loyers.display()

In [0]:
# Jointures des données
merged_data = (transactions
    .join(loyers, ['departement', 'id_ville'], 'left')
    .join(foyers_fiscaux, ['departement', 'id_ville'], 'left')
    .withColumn('annee', year('date_transaction'))  # On crée d'abord la colonne année
    .join(indice_loyers, col('annee') == col('year'), 'left')  # Correction ici: on compare deux colonnes
)

display(merged_data)

In [0]:
print(merged_data.columns)

In [0]:
# Préparation des moyennes du parc immobilier
moyennes = (parc_immobilier
    .groupBy('departement', 'id_ville')
    .agg(
        avg('n_logements').alias('n_logements_mean'),
        avg('n_logements_vacants').alias('n_logements_vacants_mean')
    )
)

moyennes.display()

In [0]:
parc_immobilier.display()

In [0]:
# 1. Première jointure avec parc_immobilier sur date, departement, id_ville
merged_data = (merged_data
    .join(
        parc_immobilier.select('date', 'departement', 'id_ville', 'n_logements', 'n_logements_vacants'),
        ['date', 'departement', 'id_ville'],
        'left'
    )
)

In [0]:
# 3. Remplacement des valeurs nulles par les moyennes
merged_data = (merged_data
    .join(moyennes, ['departement', 'id_ville'], 'left')
    .withColumn('n_logements', 
        coalesce(col('n_logements'), col('n_logements_mean')))
    .withColumn('n_logements_vacants', 
        coalesce(col('n_logements_vacants'), col('n_logements_vacants_mean')))
    .drop('n_logements_mean', 'n_logements_vacants_mean')
)

display(merged_data)

In [0]:
merged_data.columns

**Colonnes principales pour l'analyse immobilière :**

- `prix` - La variable cible à prédire (prix du bien)
- `surface_habitable` - La surface du bien immobilier
- `type_batiment` - Type de propriété (maison, appartement, etc.)
- `n_pieces` - Nombre de pièces dans le bien
- `latitude`/`longitude` - Coordonnées géographiques pour la localisation précise
- `departement` - Code départemental pour l'analyse régionale
- `ville` - Nom de la commune
- `loyer_m2_appartement`/`loyer_m2_maison` - Prix du marché locatif au m²
- `revenu_fiscal_moyen` - Indicateur socio-économique de la zone
- `IRL` - Indice de référence des loyers
- `n_logements`/`n_logements_vacants` - État du marché immobilier local  
- `annee` - Pour l'analyse temporelle des prix

In [0]:
# Sélection des colonnes pertinentes avec la source spécifiée
selected_data = (merged_data.select(
    'annee',
    'prix',
    'departement',
    transactions['ville'].alias('ville'),  # On spécifie qu'on veut la ville de transactions
    'surface_habitable',
    'type_batiment',
    'n_pieces',
    'latitude',
    'longitude',
    'loyer_m2_appartement',
    'loyer_m2_maison',
    'revenu_fiscal_moyen',
    'IRL',
    'n_logements',
    'n_logements_vacants'
).distinct()  # Supprime les doublons
)

# Créer une vue temporaire
selected_data.createOrReplaceTempView("immobilier_analysis")

# Sauvegarder comme table Delta
selected_data.write.format("delta").mode("overwrite").option("overwriteSchema", "True").saveAsTable("default.immobilier_analysis")

# Afficher le résultat
display(selected_data)

In [0]:
selected_data.count()

In [0]:
%sql
SELECT * FROM default.immobilier_analysis