In [49]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col, lower, regexp_replace

spark = SparkSession.builder \
.appName('dpe_existants_rawdata_dl_job') \
.master('spark://spark-master:7077') \
.config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
.config('spark.ui.port', '4041') \
.getOrCreate()

In [None]:
enedis = spark.read.csv('hdfs:///hadoop/dfs/data/enedis/raw_data/consommation-annuelle-residentielle_2021.csv',sep=';',header=True)
dpe = spark.read.parquet('hdfs:///hadoop/dfs/data/DPE/raw_data/dpe_logements_existants')
ban = spark.read.csv('hdfs:///hadoop/dfs/data/base-adresse-national/adresses-92.csv',sep=';',header=True)

dpe_filtrer = dpe.filter(col('Code_postal_(BAN)').startswith('92'))
enedis_filtrer = enedis.filter(col('code_commune').startswith('92'))


In [None]:
# Normalisation des adresses
df = enedis_filtrer.withColumn("adresse_brute", concat_ws(" ",
    col("adresse"),
    col("code_commune"),
    col("nom_commune")
))
enedis_normaliser = df.withColumn("adresse_brute", lower(regexp_replace(col("adresse_brute"), r"\s+", " ")))
distinct_values = enedis_normaliser.select("adresse_brute").distinct()
distinct_values.show(truncate=False)

df2 = dpe_filtrer.withColumn("adresse_brute", concat_ws(" ",
    col('N°_voie_(BAN)'),
    col("Nom__rue_(BAN)"),
    col("Code_postal_(BAN)"),
    col("Nom__commune_(BAN)")
))
dpe_normaliser = df2.withColumn("adresse_brute", lower(regexp_replace(col("adresse_brute"), r"\s+", " ")))
distinct_values = dpe_normaliser.select("adresse_brute").distinct()
distinct_values.show(truncate=False)

# Création d'une colonne adresse normalisée dans le DataFrame de la BAN
ban_df = ban.withColumn("adresse_normalisee", concat_ws(" ",
    col("numero"),
    col("rep"),
    col("nom_voie"),
    col("code_postal"),
    col("nom_commune")
))
ban_normaliser = ban_df.withColumn("adresse_normalisee", lower(regexp_replace(col("adresse_normalisee"), r"\s+", " ")))
distinct_values = ban_normaliser.select("adresse_normalisee").distinct()
distinct_values.show(truncate=False)

# Création d'une colonne adresse normalisée dans le DataFrame de la BAN
ban_df = ban.withColumn("adresse_normalisee_insee", concat_ws(" ",
    col("numero"),
    col("rep"),
    col("nom_voie"),
    col("code_insee"),
    col("nom_commune")
))
ban_normaliser = ban_df.withColumn("adresse_normalisee_insee", lower(regexp_replace(col("adresse_normalisee_insee"), r"\s+", " ")))
distinct_values = ban_normaliser.select("adresse_normalisee_insee").distinct()
distinct_values.show(truncate=False)

In [64]:
#join dpe normaliser avec la base ban 
dpe_join_ban = dpe_normaliser.join(ban_normaliser.select("adresse_normalisee", "id"), dpe_normaliser.adresse_brute == ban_normaliser.adresse_normalisee, how="left")
distinct_values = dpe_join_ban.select("id").distinct()
distinct_values.show(truncate=False)

+----------------+
|id              |
+----------------+
|92012_7205_00026|
|92046_7150_00056|
|92035_4115_00005|
|92020_3100_00005|
|92064_0620_00009|
|92007_0070_00250|
|92050_7170_00082|
|92004_5175_00008|
|92073_4785_00003|
|92048_1327_00015|
|92049_0045_00083|
|92050_0015_00020|
|92051_4170_00018|
|92062_9635_00024|
|92012_1415_00026|
|92062_9620_00110|
|92025_0025_00091|
|92050_0015_00016|
|92002_6800_00006|
|92063_1555_00009|
+----------------+
only showing top 20 rows



In [112]:
#join enedis normaliser avec la base ban 

enedis_join_ban = enedis_normaliser.join(ban_normaliser.select("adresse_normalisee_insee", "id"), enedis_normaliser.adresse_brute == ban_normaliser.adresse_normalisee_insee                         , how="left")
distinct_values = enedis_join_ban.select("adresse_brute").distinct()
distinct_values.show(truncate=False)




+----------------------------------------------------+
|adresse_brute                                       |
+----------------------------------------------------+
|14 rue francoise barre sinoussi 92023 clamart       |
|197 avenue achille peretti 92051 neuilly-sur-seine  |
|3 rue paul deroulede 92051 neuilly-sur-seine        |
|43 avenue pablo picasso 92050 nanterre              |
|18 t rue paul bert 92040 issy-les-moulineaux        |
|5 rue des paradis 92032 fontenay-aux-roses          |
|85 rue des hautes bievres 92002 antony              |
|1 avenue d argenteuil 92004 asnières-sur-seine      |
|14 place des victoires 92004 asnières-sur-seine     |
|2 rue voltaire 92062 puteaux                        |
|53 rue du trosy 92023 clamart                       |
|202 b avenue jean jaures 92023 clamart              |
|17 residence les taratres 92063 rueil-malmaison     |
|32 rue du capitaine ferber 92040 issy-les-moulineaux|
|8 villa haussmann 92040 issy-les-moulineaux         |
|48 rue ve

                                                                                

In [None]:
#supprimer des colonnes (!! rajouter tt les colonnes qui ne sont pas necessaire)
dpe_join_ban_cleaned = dpe_join_ban.drop('adresse_brute')
enedis_join_ban_cleaned = enedis_join_ban.drop('adresse_brute')

#jointure enedis et dpe sur l'id ban
enedis_dpe_joind= enedis_join_ban_cleaned.join(dpe_join_ban_cleaned, dpe_join_ban_cleaned.id == enedis_join_ban_cleaned.id, how="inner")

#faire le select apres la jointure pour garder un seul id ban
final_df = enedis_dpe_joind_cleaned.select(
    enedis_join_ban.id,
    *[col for col in dpe_join_ban_cleaned.columns if col != "id"],
    *[col for col in enedis_join_ban_cleaned.columns if col != "id"]
)
final_df.show(1,truncate=False,vertical=True)

In [117]:
# df = dpe_join_ban.filter(col('id').startswith('92002_8115'))
# final_df.count()
distinct_values = final_df.select("id").distinct()
distinct_values.show(truncate=False)



+----------------+
|id              |
+----------------+
|92071_3561_00004|
|92024_0050_00005|
|92048_2560_00001|
|92040_4735_00247|
|92004_5175_00008|
|92049_0045_00083|
|92078_4410_00001|
|92002_2520_00112|
|92012_4140_00025|
|92050_8568_00002|
|92024_6140_00144|
|92071_5141_00014|
|92073_7160_00058|
|92026_0077_00002|
|92078_4410_00047|
|92035_9615_00003|
|92073_4125_00144|
|92012_1435_00032|
|92050_7170_00082|
|92050_0015_00020|
+----------------+
only showing top 20 rows



                                                                                

In [120]:
df = enedis_join_ban_cleaned.filter(col('id').startswith('92071_3561_00004'))
final_df.count()

                                                                                

1

In [None]:
## netoyer le code et charger plus de données (voir comment faire pour télécharger la base des adresse total + le ficher parquet de enedis ) 

In [121]:
final_df.count()

                                                                                

91