# Traçage de réseaux de chaleur urbain

***Auteur : Charlotte Gressel, étudiante ingénieur à l'école des Mines, 2025***

Ce notebook permet, pour un quartier ou une ville sélectionnée, de déterminée les opportunités d'installation de réseau de chaleur urbain, et le tracer. Nous nous appuyerons sur les travaux de Nicolas Matte ayant fourni un algorythme de traçage du réseau optimal pour un lieu donné.

Nous proposerons, dans un second temps, une technique d'optimisation du réseau par réduction de sa longueur.

In [4]:
from shapely.geometry import shape, LineString, MultiLineString, Point, Polygon
from shapely.ops import nearest_points
import fiona
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from tqdm import tqdm
import geopandas as gpd

## Extraction et filtrage des données géographiques et énergétiques

Dans cette partie, deux principaux fichiers sont à l'étude : 
- Un fichier des bâtiments pour un département choisi, téléchargé sur le site BD TOPO® de Géoservices (base BDNB) au format Geopackage
- Un fichier des routes pour un département choisi, téléchargé sur le site BD CARTO® de Géoservices au format Geopackage

Elles ont ensuite été exploités avec le logiciel GQIS afin d'extraire et exporter uniquement les attributs à étudier, à savoir `Bâtiments groupes > Classe DPE (DPE réels)` pour les bâtiments et `BDT_3-~1 — troncon_de_route` pour les routes. 

Il est important de souligner que l'étude est faite à partir de données **départementales** et non nationales, notamment pour des raisons de format des données téléchargeables et du temps de calcul. Le département choisi dans l'étude qui suit est l'Oise.

In [7]:
# Chemins vers les fichiers GeoPackage
buildings_file = "C:/Users/charl/OneDrive/Documents/Documents/4. Mines/2. EFFINERSYS/Réseaux de chaleur et transition énergétique/test60/bat_oise.gpkg"
roads_file = "C:/Users/charl/OneDrive/Documents/Documents/4. Mines/2. EFFINERSYS/Réseaux de chaleur et transition énergétique/test60/routes_oise.gpkg"

#### 1<sup>er</sup> filtrage: sélection des bâtiments et des routes de la commune choisie

On commence par sélectionner les bâtiments de la commune choisie. Les bâtiments devront également répondre à un critère de hauteurs supérieur à 3 mètres. 

Cette opération est la plus chronophage en raison du nombre important de bâtiments à parcourir (390 581 ici).

In [10]:
import fiona
from tqdm import tqdm

# Extraction des bâtiments
buildings_fiona = fiona.open(buildings_file)

buildings_by_commune = {}  # Dictionnaire pour regrouper les bâtiments par commune
codes_communes = set()  # Utilisation d'un set pour éviter les doublons

for building in tqdm(buildings_fiona, desc="Extraction des bâtiments", unit="bâtiment"):
    properties = building["properties"]
    
    # Vérification des conditions sur les propriétés
    if (properties["code_commune_insee"] is not None and
        properties["s_geom_groupe"] is not None and
        properties["ffo_bat_annee_construction"] is not None and
        properties["bdtopo_bat_hauteur_mean"] is not None and
        properties["bdtopo_bat_hauteur_mean"] >= 3 and
        properties["dpe_mix_arrete_classe_conso_energie_arrete_2012"] is not None and
        properties["dpe_mix_arrete_classe_conso_energie_arrete_2012"] != "N"):
        
        code_commune_insee = properties["code_commune_insee"]
        
        # Ajouter le bâtiment au dictionnaire de la commune
        if code_commune_insee not in buildings_by_commune:
            buildings_by_commune[code_commune_insee] = []
        buildings_by_commune[code_commune_insee].append(building)

        # Ajouter le code commune au set
        codes_communes.add(code_commune_insee)

buildings_fiona.close()

# Conversion en liste triée des codes communes
codes_communes = sorted(list(codes_communes))

print(f"{len(buildings_by_commune)} communes avec des bâtiments sélectionnés")
print(f"Liste des codes communes INSEE : {codes_communes}")

# Extraction des routes
roads_fiona = fiona.open(roads_file)

roads_by_commune = {}  # Dictionnaire pour regrouper les routes par commune

print("Analyse des routes en cours...")
for road in tqdm(roads_fiona, desc="Progression", unit="route"):
    properties = road["properties"]
    
    # Identifier les communes associées à la route
    insee_commune_gauche = properties.get("insee_commune_gauche")
    insee_commune_droite = properties.get("insee_commune_droite")

    # Ajouter la route aux communes correspondantes
    for code_commune in {insee_commune_gauche, insee_commune_droite}:
        if code_commune in codes_communes:  # Vérification pour garder uniquement les communes du département
            if code_commune not in roads_by_commune:
                roads_by_commune[code_commune] = []
            roads_by_commune[code_commune].append(road)

roads_fiona.close()

print(f"{len(roads_by_commune)} communes avec des routes sélectionnées")

# Regrouper les données par commune
commune_data = {}

for code_commune in codes_communes:
    commune_data[code_commune] = {
        "buildings": buildings_by_commune.get(code_commune, []),
        "roads": roads_by_commune.get(code_commune, [])
    }

# Résumé
total_buildings = sum(len(data["buildings"]) for data in commune_data.values())
total_roads = sum(len(data["roads"]) for data in commune_data.values())

print(f"{total_buildings} bâtiments et {total_roads} routes répartis sur {len(commune_data)} communes")


Extraction des bâtiments: 100%|████████████████████████████████████████| 390581/390581 [06:15<00:00, 1040.53bâtiment/s]


677 communes avec des bâtiments sélectionnés
Liste des codes communes INSEE : ['60001', '60002', '60003', '60004', '60005', '60006', '60007', '60008', '60009', '60010', '60011', '60012', '60013', '60014', '60015', '60016', '60017', '60019', '60020', '60021', '60022', '60023', '60024', '60025', '60026', '60027', '60028', '60029', '60030', '60031', '60032', '60033', '60034', '60035', '60036', '60037', '60039', '60040', '60041', '60042', '60043', '60044', '60045', '60046', '60047', '60048', '60049', '60050', '60051', '60052', '60053', '60054', '60055', '60056', '60057', '60058', '60059', '60060', '60061', '60062', '60063', '60064', '60065', '60066', '60067', '60068', '60069', '60070', '60071', '60072', '60073', '60074', '60075', '60076', '60077', '60078', '60079', '60081', '60082', '60083', '60084', '60085', '60086', '60087', '60088', '60089', '60090', '60091', '60092', '60093', '60094', '60095', '60097', '60098', '60099', '60100', '60101', '60102', '60103', '60104', '60105', '60106', '60

Progression: 100%|█████████████████████████████████████████████████████████| 239444/239444 [04:00<00:00, 996.92route/s]


677 communes avec des routes sélectionnées
31698 bâtiments et 169923 routes répartis sur 677 communes


On sélectionne ensuite les routes de la commune choisie grâce à la commune qui se trouve à droite et à gauche de la route. On ne conserve que les routes "intérieures", à savoir celles dont la commune à droite et à gauche est celle qui porte le code commune Insee sélectionné.

In [12]:
# NE PAS REEXECUTER CETTE LIGNE
original_buildings = buildings_by_commune
original_roads = roads_by_commune

#### 2<sup>ème</sup> filtrage : Sélection des bâtiments connectables


##### Calcul de la consommation des bâtiments et sélection du plus grand consommateur

Nous nous basons sur l'attribut `dpe_class_to_consumption` pour évaluer la consommation des bâtiments. La lettre attribuée à chaque bâtiment est associée à sa consommation énergétique moyenne en kWh/m²/an.

Nous estimons alors la consommation énergétique annuelle moyenne de chaque bâtiment à partir de leurs DPE moyen, surface et hauteur, par la formule suivante :

$$
Q \approx DPE \times S \times E\left(\frac{h}{h_0}\right)
$$

Où $h_0 = 3m$ et où $E$ est la fonction partie entière, afin d'introduire une estimation du nombre d'étages pour chaque bâtiment.

In [16]:
%%time

import numpy as np

# Fonction de mapping de la classe DPE à la consommation énergétique (kWh/m²/an)
def dpe_class_to_consumption(dpe_class):
    dpe_map = {
        "A": 50,  # Consommation max pour A est 50
        "B": 90,  # Consommation max pour B est 90
        "C": 150, # Consommation max pour C est 150
        "D": 230, # Consommation max pour D est 230
        "E": 330, # Consommation max pour E est 330
        "F": 450, # Consommation max pour F est 450
        "G": 500  # On considère 500 comme une valeur représentative pour G (au-delà de 450)
    }
    return dpe_map.get(dpe_class, 0)  # Renvoie 0 si la classe n'est pas reconnue

# Calcul des demandes en chaleur pour tous les bâtiments retenus
h_0 = 3  # mètres, taille d'un étage

# Initialisation des variables pour stocker les résultats
heat_demands_by_commune = {}
highest_demand = {}  # Stocke le bâtiment avec la plus grande consommation par commune

for code_commune in codes_communes:
    # Nombre de bâtiments dans la commune
    nb_buildings = len(buildings_by_commune.get(code_commune, []))
    
    # Vérification s'il y a des bâtiments dans cette commune
    if nb_buildings == 0:
        continue
    
    # Initialisation d'un tableau pour stocker les demandes en chaleur
    heat_demands = np.zeros(nb_buildings)
    
    for k, building in enumerate(buildings_by_commune[code_commune]):
        properties = building["properties"]
        
        # Conversion de la classe DPE en consommation énergétique
        dpe_class = properties["dpe_mix_arrete_classe_conso_energie_arrete_2012"]
        dpe_consumption = dpe_class_to_consumption(dpe_class)
        
        # Calcul de la demande en chaleur en kWh/an
        if properties["s_geom_groupe"] is not None and properties["bdtopo_bat_hauteur_mean"] is not None:
            surface = properties["s_geom_groupe"]
            hauteur_moyenne = properties["bdtopo_bat_hauteur_mean"]
            Q = dpe_consumption * surface * int(hauteur_moyenne / h_0)
            heat_demands[k] = Q
    
    # Tri des consommations décroissantes
    sorted_indices = np.argsort(heat_demands)[::-1]
    heat_demands = heat_demands[sorted_indices]
    sorted_buildings = [buildings_by_commune[code_commune][i] for i in sorted_indices]
    
    # Mise à jour des bâtiments triés par consommation dans la commune
    buildings_by_commune[code_commune] = sorted_buildings
    
    # Stocker les demandes en chaleur
    heat_demands_by_commune[code_commune] = heat_demands.tolist()
    
    # Identifier le bâtiment avec la plus grande demande
    highest_demand[code_commune] = sorted_buildings[0] if sorted_buildings else None

# Résumé
total_heat_demand = sum(sum(demands) for demands in heat_demands_by_commune.values())
print(f"Demande totale de chaleur : {total_heat_demand:.2f} kWh/an")

Demande totale de chaleur : 1879579430.00 kWh/an
CPU times: total: 7.28 s
Wall time: 26.7 s


Nous basons notre étude sur plusieurs principes : 
* Il y a une seule centrale de production par zone étudiée.
* On ne prend pas en compte la localisation de cette centrale, car on considère que la connecter au réseau ne nécessite qu'une centaine de mètres d'ajout de cannalisations, négligeable devant la longueur totale du réseau.
* La capacité de production de chaleur est supposée sans limite.

Dans cette étude, **nous ferons le choix arbitraire de démarrer notre algorythme de traçage du bâtiment le plus consommateur en énergie**. 

##### Détermination des bâtiments connectables

Choisir les bâtiments qui seront raccordés au réseau de chaleur n'est pas un problème à solution absolue ni unique. Il faut prendre en compte une multitude de paramètres :
* Budget de la mairie
* Offre en chaleur disponible
* Consommation des bâtiments : est-il rentable de les raccorder ?
* Carbonation de la chaleur actuellement distribuée aux bâtiments
* Objectifs de la municipalité, pression d'autres acteurs...

L'agent municipal peut ainsi éliminer à la main certains groupes de bâtiments, mais il peut également se fier à certains critères de priorité pour le raccordement, notamment la demande en chaleur. En effet, pour avoir un réel impact et augmenter sa rentabilité, le réseau de chaleur doit pouvoir subvenir à la plus grande demande en chaleur possible avec une taille minimale.

Ainsi, la méthode que nous proposons en tant qu'agent municipal est la suivante :
1. Eliminer ou inclure d'office certains groupes de bâtiments à la main;
2. Déterminer un critère de consommation minimale pour définir le reste des bâtiments connectables;

##### 1. Eliminer ou inclure d'office certains groupes de bâtiments à la main

L'agent municipal doit parfois suivre certaines directives stratégiques, par exemple raccorder certains groupes de bâtiments prioritaires ou bien le tertiaire qui participe au financement du projet. Il peut également avoir conscience de certaines choses non prises en compte par l'ordinateur : certains bâtiments seront bientôt détruits, d'autres bientôt construits, certains sont déjà connectés à d'autres infrastructures performantes...  

On pourrait modéliser les décisions de l'agent municipal par deux liste d'indices `eliminated_index` et `imposed_index` qui correspondent respectivement aux bâtiments éliminés et raccordés d'office par l'agent municipal. Dans ce notebook, dans le but d'illustrer les résultats de l'algorithme, nous supposerons ces listes vides.

##### 2. Déterminer un critère de consommation minimale pour définir le reste des bâtiments connectables

Un tel critère est défini sur le site **France Chaleur** : au-delà de $30kW$, un bâtiment doit dans la plupart des cas être considéré pour le tracé du réseau de chaleur.

La puissance thermique maximale demandée par les habitations est estimée grâce à une technique définie dans la section suivante. On réalisera alors le filtrage des bâtiments selon ce critère.

Ce procédé permet de trouver le compromis entre une demande raccordée maximale en premier plan et un nombre de bâtiments raccordé minimal afin de limiter les coûts.

##### Sélection des bâtiments les plus demandeurs

Un critère pour labelliser un réseau de chaleur urbain est de raccorder systématiquement les bâtiments ayant une demande en puissance maximale supérieure à 30kW. Pour cela, la première étape est d'estimer leurs consommations thermiques annuelles. Le procédé est le même que dans le notebook précédent.

##### Estimation de la consommation énergétique annuelle moyenne

Afin de réaliser la simulation d'un réseau de chaleur avec le logiciel Dymola, il faut estimer leurs demandes instantanées tout au long de l'année. Pour cela, nous utilisons la méthode décrite dans les sections suivantes.

Nous utilisons des profils typiques de demande en chaleur. Nous disposons de trois fichiers correspondant aux courbes typiques des bâtiments construits durant les périodes suivantes :
* Avant 1989 (exclu) : `RT_1974_Treated.csv` (il s'agit en réalité des normes des bâtiments construits entre 1974 et 1989);
* Entre 1989 (inclus) et 2005 (exclu) : `RT_1989_Trated.csv` (il s'agit des normes des bâtiments construits entre 1989 et 2005);
* Après 2005 (inclus) : `RT_2005_Treated.csv`.

Il s'agit de timeseries que nous allons manipuler avec `pandas`.

In [29]:
heat_profile_1974 = pd.read_csv("C:/Users/charl/OneDrive/Documents/Documents/4. Mines/2. EFFINERSYS/Réseaux de chaleur et transition énergétique/code/data/RT_1974_Treated.csv", index_col=0)
heat_profile_1989 = pd.read_csv("C:/Users/charl/OneDrive/Documents/Documents/4. Mines/2. EFFINERSYS/Réseaux de chaleur et transition énergétique/code/data/RT_1989_Treated.csv", index_col=0)
heat_profile_2005 = pd.read_csv("C:/Users/charl/OneDrive/Documents/Documents/4. Mines/2. EFFINERSYS/Réseaux de chaleur et transition énergétique/code/data/RT_2005_Treated.csv", index_col=0)

On calcule ensuite la puissance thermique demandée pour chaque bâtiment à partir de la formule suivante:
$$
\Phi_{th} = D_m \cdot c_p \cdot (T_{départ} - T_{retour})
$$

Puis on calcule le coefficient de demande en chaleur associé à chaque bâtiment:
$$
\alpha_k = \frac{Q_{total}}{\left( \sum_{1 \, \text{an}} \Phi_{th,ref} \right) \cdot \Delta t}
$$


In [31]:
%%time

heat_profile_coeffs_by_commune = {}

# Constantes
cp_water = 4180 # J/K/kg
delta_t = 1 # h (l'énergie est en kWh)

# Initialisation
sum_power = np.zeros(3)

# Fonction pour catégoriser les bâtiments suivant leur année de construction
def categorize(building):
    construction_date = building["properties"]["ffo_bat_annee_construction"]
    # 0 : avant 1989 exclu
    if construction_date < 1989:
        return 0
    # 1 : entre 1989 inclus et 2005 exclu
    elif 1989 <= construction_date < 2005:
        return 1
    # 2 : après 2005 exclu
    else:
        return 2
    
df_tab = [heat_profile_1974, heat_profile_1989, heat_profile_2005]

# Calcul de la colonne des puissances dans les dataframes de référence
def add_power_column(df):
    df.Puissance = cp_water * df.Debit_eau * (df.Tdepart - df.Tretour)
    
for k, df in enumerate([heat_profile_1974, heat_profile_1989, heat_profile_2005]):
    # Ajout de la colonne puissance
    add_power_column(df)
    # Calcul de la somme des puissances
    sum_power[k] = df.Puissance.sum()

for code_commune in codes_communes:
    # Nombre de bâtiments dans la commune
    nb_buildings = len(buildings_by_commune.get(code_commune, []))
    
    # Vérification s'il y a des bâtiments dans cette commune
    if nb_buildings == 0:
        continue
    
    # Initialisation d'un tableau pour stocker les demandes en chaleur
    heat_profile_coeffs = np.zeros(nb_buildings)
    
    # Calcul des coefficients pour chaque bâtiment hors usine à chaleur
    for k, building in enumerate(buildings_by_commune[code_commune]):
        category = categorize(building)
        heat_profile_coeffs[k] = heat_demands_by_commune[code_commune][k] / (delta_t * sum_power[category] * 1e-3) # attention unités : kWh / kWh

    heat_profile_coeffs_by_commune[code_commune] = heat_profile_coeffs.tolist()



CPU times: total: 2.45 s
Wall time: 2.93 s


On ne conserve que les bâtiments dont la puissance maximale demandée excède $30kW$.

In [33]:
%%time

max_power_by_commune = {}

for code_commune in codes_communes:
    # Nombre de bâtiments dans la commune
    nb_buildings = len(buildings_by_commune.get(code_commune, []))
    
    # Vérification s'il y a des bâtiments dans cette commune
    if nb_buildings == 0:
        continue
  
    max_power = np.zeros(nb_buildings)
    
    for k, building in enumerate(buildings_by_commune[code_commune]):
        max_power[k] = heat_profile_coeffs_by_commune[code_commune][k] * np.max(df_tab[categorize(building)].Puissance)

    max_power_by_commune[code_commune] = max_power.tolist()

CPU times: total: 4.53 s
Wall time: 5.15 s


In [34]:
%%time

for code_commune in codes_communes:
    # Nombre de bâtiments dans la commune
    nb_buildings = len(buildings_by_commune.get(code_commune, []))
    
    # Vérification s'il y a des bâtiments dans cette commune
    if nb_buildings == 0:
        continue

    mask = np.array([power > 30e3 for power in max_power_by_commune[code_commune]])
    buildings_by_commune[code_commune] = [buildings_by_commune[code_commune][k] for k in range(nb_buildings) if mask[k]]
    
    nb_buildings = len(buildings_by_commune[code_commune])
    
    print(f"{code_commune} - Nombre de bâtiments après le 2ème filtrage :", nb_buildings)

60001 - Nombre de bâtiments après le 2ème filtrage : 15
60002 - Nombre de bâtiments après le 2ème filtrage : 10
60003 - Nombre de bâtiments après le 2ème filtrage : 6
60004 - Nombre de bâtiments après le 2ème filtrage : 5
60005 - Nombre de bâtiments après le 2ème filtrage : 8
60006 - Nombre de bâtiments après le 2ème filtrage : 11
60007 - Nombre de bâtiments après le 2ème filtrage : 9
60008 - Nombre de bâtiments après le 2ème filtrage : 3
60009 - Nombre de bâtiments après le 2ème filtrage : 15
60010 - Nombre de bâtiments après le 2ème filtrage : 19
60011 - Nombre de bâtiments après le 2ème filtrage : 3
60012 - Nombre de bâtiments après le 2ème filtrage : 17
60013 - Nombre de bâtiments après le 2ème filtrage : 15
60014 - Nombre de bâtiments après le 2ème filtrage : 0
60015 - Nombre de bâtiments après le 2ème filtrage : 12
60016 - Nombre de bâtiments après le 2ème filtrage : 1
60017 - Nombre de bâtiments après le 2ème filtrage : 5
60019 - Nombre de bâtiments après le 2ème filtrage : 0
60

In [35]:
%%time

filtered_communes = []

for code_commune in codes_communes:
    # Nombre de bâtiments dans la commune
    nb_buildings = len(buildings_by_commune.get(code_commune, []))
    
    # Vérification s'il y a des bâtiments dans cette commune
    if nb_buildings == 0:
        continue

    mask = np.array([power > 30e3 for power in max_power_by_commune[code_commune]])
    buildings_by_commune[code_commune] = [buildings_by_commune[code_commune][k] for k in range(nb_buildings) if mask[k]]
    
    nb_buildings = len(buildings_by_commune[code_commune])
    
    print(f"{code_commune} - Nombre de bâtiments après le 2ème filtrage :", nb_buildings)

    # Ajouter le code commune si le nombre de bâtiments est supérieur à 25
    if nb_buildings > 40:
        filtered_communes.append(code_commune)

print(f"{len(filtered_communes)} communes avec plus de 40 bâtiments après filtrage :", filtered_communes)

60001 - Nombre de bâtiments après le 2ème filtrage : 15
60002 - Nombre de bâtiments après le 2ème filtrage : 10
60003 - Nombre de bâtiments après le 2ème filtrage : 6
60004 - Nombre de bâtiments après le 2ème filtrage : 5
60005 - Nombre de bâtiments après le 2ème filtrage : 7
60006 - Nombre de bâtiments après le 2ème filtrage : 11
60007 - Nombre de bâtiments après le 2ème filtrage : 8
60008 - Nombre de bâtiments après le 2ème filtrage : 3
60009 - Nombre de bâtiments après le 2ème filtrage : 14
60010 - Nombre de bâtiments après le 2ème filtrage : 19
60011 - Nombre de bâtiments après le 2ème filtrage : 3
60012 - Nombre de bâtiments après le 2ème filtrage : 17
60013 - Nombre de bâtiments après le 2ème filtrage : 15
60015 - Nombre de bâtiments après le 2ème filtrage : 11
60016 - Nombre de bâtiments après le 2ème filtrage : 1
60017 - Nombre de bâtiments après le 2ème filtrage : 5
60020 - Nombre de bâtiments après le 2ème filtrage : 2
60021 - Nombre de bâtiments après le 2ème filtrage : 1
60

**Sélection des routes proches des bâtiments**

### Préparation des routes pour l'algorithme Dijkstra

L'algorithme d'origine parcourt les routes dans leur intégralité, en suivant la longueur et l'ordre des tronçons. Chaque tronçon de route est modélisé par un objet `LineString` de la bibliothèque `Shapely`, lui-même composée d'une suite de points entre lesquels une droite est tracée. Or, dans les données brutes téléchargées, ces tronçons sont ordonnées et découpées de façon aléatoires. Par exemple, une route faisant 1km et ne diposant que d'un seul bâtiment au début de cette route, dison à 10m, sera ajoutée au réseau dans son intégralité si ce bâtiment est sélectionné dans le réseau. 

D'autre part, dans le logiciel de simulation Dymola, chaque tronçon est modélisé par un tuyau, et chaque tuyau est traversé par un débit qui dépend des ba^timents en amont et en aval. En conséquence le modèle devient vite compliqué vue le nombre des tronçons à modéliser dans le modèle actuel. Pour que la simulation soit effective, il est nécessaire qu'un tronçon de route soit unique entre deux bâtiments à connecter au réseau.


Il s'agit donc dans le code ci-dessous de réunir et redécouper ces routes de sorte à ce que leurs extrémités ne soient plus posées de façon aléatoires mais existent si et seulment si elles rencontrent un bâtiment ou une intersection.

**1<sup>ère</sup> étape : Projection des bâtiments sur les routes**

On commence par projeter la localisation des bâtiments sur les routes.

In [39]:
%%time

projected_buildings_by_commune = {}

for code_commune in filtered_communes:
    projected_buildings_by_commune[code_commune] = []
    
    for building in buildings_by_commune[code_commune]:
        polygon_building = shape(building["geometry"])
        # Coordonnées du centre du bâtiment
        centroid = polygon_building.centroid
        building_point = Point(centroid.x, centroid.y)
    
        # Trouver la route la plus proche du bâtiment
        closest_route = None
        min_distance = float("inf")
        for route in roads_by_commune[code_commune]:
            line = LineString(route["geometry"]["coordinates"])
            distance = line.distance(building_point)
            if distance < min_distance:
                min_distance = distance
                closest_route = line
    
        # Projeter le point du bâtiment sur la route la plus proche
        projected_building = closest_route.interpolate(closest_route.project(building_point))
        projected_building = Point(projected_building.x, projected_building.y)
        projected_buildings_by_commune[code_commune].append(projected_building)

CPU times: total: 6min 54s
Wall time: 7min 54s


**2<sup>ème</sup> étape : Réunion et découpage des routes**

Pour le moment, les routes sont découpées de façon aléatoires. Pour visualiser ceci, nous allons affecter de façon aléatoire une parmi quatre couleurs à chaque tronçon.

*1) Réunion des routes jusque chaque croisement*

In [41]:
%%time

from shapely.ops import linemerge

merged_roads_by_commune = {}

for code_commune in filtered_communes:
    
    roads_to_linestring =[]
    merged_roads_by_commune[code_commune]=[]
    
    for route in roads_by_commune[code_commune]:
        line = LineString(route["geometry"]["coordinates"])
        roads_to_linestring.append(line)
    
    # On réunit toutes les routes ensemble en un seul objet MultiLineString
    multi_line = MultiLineString(roads_to_linestring)
    
    # La commande linemerge permet de réunir les routes qui se joignent
    merged_roads_by_commune[code_commune] = linemerge(multi_line)
    
    print(f"{code_commune} - Nombre de routes après réunion: {len(merged_roads_by_commune[code_commune].geoms)}")

60057 - Nombre de routes après réunion: 5363
60088 - Nombre de routes après réunion: 770
60104 - Nombre de routes après réunion: 603
60139 - Nombre de routes après réunion: 1312
60141 - Nombre de routes après réunion: 1806
60157 - Nombre de routes après réunion: 826
60159 - Nombre de routes après réunion: 4782
60175 - Nombre de routes après réunion: 1974
60176 - Nombre de routes après réunion: 1485
60282 - Nombre de routes après réunion: 1706
60286 - Nombre de routes après réunion: 330
60346 - Nombre de routes après réunion: 1070
60360 - Nombre de routes après réunion: 527
60382 - Nombre de routes après réunion: 649
60395 - Nombre de routes après réunion: 1673
60414 - Nombre de routes après réunion: 1128
60439 - Nombre de routes après réunion: 592
60463 - Nombre de routes après réunion: 1089
60471 - Nombre de routes après réunion: 1768
60509 - Nombre de routes après réunion: 1314
60581 - Nombre de routes après réunion: 591
60584 - Nombre de routes après réunion: 766
60612 - Nombre de r

*2) Découpage des routes au niveau des projections des bâtiments*

In [43]:
def split_routes(merged_connected_roads, projected_buildings2, safety_distance=10):
    split_connected_roads = []

    for route in merged_connected_roads.geoms:
        if len(route.coords[0]) == 3:  # Cas où la coordonnée contient (x, y, z)
            route = LineString([(x, y) for x, y, z in route.coords])  # Ignorer le z
        elif len(route.coords[0]) == 2:  # Cas où la coordonnée contient (x, y)
            route = LineString([(x, y) for x, y in route.coords])  # Garde seulement (x, y)
        else:
            # Si les coordonnées n'ont ni 2 ni 3 éléments, on peut ignorer la route
            print(f"Route avec coordonnées inattendues: {route.coords}")
            continue

        # Vérifier si des points sont proches de la route, mais à une distance suffisante des extrémités
        no_points_nearby = True
        for point in projected_buildings2:
            distance_to_route = point.distance(route)
            distance_to_start = point.distance(Point(list(route.coords)[0]))
            distance_to_end = point.distance(Point(list(route.coords)[-1]))

            # Si le point est proche de la route mais loin des extrémités, nous allons découper la route
            if distance_to_route < 10 and (distance_to_start > safety_distance and distance_to_end > safety_distance):
                no_points_nearby = False
                break  # Sortir de la boucle dès qu'un point valide est trouvé

        # Si aucun point n'est proche de la route, ajouter la route sans modification
        if no_points_nearby:
            split_connected_roads.append(route)
            continue

        # Récupérer les points proches de la route
        points_in_route = [point for point in projected_buildings2 if point.distance(route) < 10]
        points_in_route.insert(0, Point(list(route.coords)[0]))  # Ajouter le début de la route
        points_in_route.append(Point(list(route.coords)[-1]))  # Ajouter la fin de la route

        if len(points_in_route) > 2:
            # Trier les points par distance croissante par rapport au début de la route
            points_in_route = sorted(points_in_route, key=lambda p: route.project(p))

            # Diviser la route en segments successifs
            previous_position = 0
            last_segment_coords = []  # Liste pour stocker le dernier segment

            for i, point in enumerate(points_in_route):
                current_position = route.project(point)

                # Extraire les coordonnées du segment de la position précédente à la position actuelle
                segment_coords = [
                    coord for coord in route.coords
                    if route.project(Point(coord)) >= previous_position
                    and route.project(Point(coord)) <= current_position
                ]

                # Si ce n'est pas le premier segment, ajoutez le dernier point du segment précédent
                if i != 0:
                    segment_coords.insert(0, last_segment_coords[-1])  # Ajouter le dernier point du segment précédent

                # Ajouter le point de division pour éviter les trous
                segment_coords.append((point.x, point.y))

                # Vérifier que le segment contient plus de 1 point avant de créer le LineString
                if len(segment_coords) > 1:
                    segment = LineString(segment_coords)
                    split_connected_roads.append(segment)

                # Mettre à jour la position précédente et mémoriser le dernier segment
                previous_position = current_position
                last_segment_coords = segment_coords

            # Ajouter le dernier segment entre le dernier point et la fin de la route
            final_segment_coords = [
                coord for coord in route.coords
                if route.project(Point(coord)) >= previous_position
            ]

            # Ajouter la dernière coordonnée pour s'assurer que le dernier point est inclus
            if route.coords[-1] not in final_segment_coords:
                final_segment_coords.append(route.coords[-1])

            # Vérification si final_segment_coords contient plus de 1 point avant de créer le segment
            if len(final_segment_coords) > 1:
                final_segment = LineString(final_segment_coords)
                split_connected_roads.append(final_segment)

        for road in split_connected_roads:
            if road.length == 0 : #On retire les routes de longueur quasi nulle
                split_connected_roads.remove(road)

    return split_connected_roads

In [44]:
%%time

split_roads_by_commune = {}

for code_commune in filtered_communes:
    split_roads_by_commune[code_commune] = split_routes(merged_roads_by_commune[code_commune], projected_buildings_by_commune[code_commune], safety_distance=10)
    nb_split_roads = len(split_roads_by_commune[code_commune])
    print(f"{code_commune} - Nombre total de routes après découpage : {nb_split_roads}")

60057 - Nombre total de routes après découpage : 5855
60088 - Nombre total de routes après découpage : 812
60104 - Nombre total de routes après découpage : 660
60139 - Nombre total de routes après découpage : 1384
60141 - Nombre total de routes après découpage : 1937
60157 - Nombre total de routes après découpage : 916
60159 - Nombre total de routes après découpage : 5048
60175 - Nombre total de routes après découpage : 2092
60176 - Nombre total de routes après découpage : 1606
60282 - Nombre total de routes après découpage : 1768
60286 - Nombre total de routes après découpage : 372
60346 - Nombre total de routes après découpage : 1192
60360 - Nombre total de routes après découpage : 582
60382 - Nombre total de routes après découpage : 711
60395 - Nombre total de routes après découpage : 1730
60414 - Nombre total de routes après découpage : 1207
60439 - Nombre total de routes après découpage : 633
60463 - Nombre total de routes après découpage : 1189
60471 - Nombre total de routes aprè

*3) Conversion des routes en objet GeoJSON*

In [46]:
%%time

from shapely.geometry import mapping

final_roads_by_commune = {}

for code_commune in filtered_communes:
    final_roads_by_commune[code_commune] = []
    for line in split_roads_by_commune [code_commune]:
        geojson_geometry = mapping(line)
        
        # Étape 3 : Créer l'objet Feature sans propriétés
        feature = fiona.model.Feature(
            geometry=geojson_geometry,
            properties={}  # Dictionnaire vide pour les propriétés
        )
        final_roads_by_commune[code_commune].append(feature)

    roads_by_commune[code_commune]=final_roads_by_commune[code_commune]

CPU times: total: 1.19 s
Wall time: 1.31 s


## Modélisation du quartier par un graphe

### Définition des noeuds

On définit deux types de noeuds qui possédent de nombreuses propriétés de trois types : topologiques, énergétiques et relatives au réseau de chaleur. On compte comme type de noeud :
* Les bâtiments
* Les routes
* Le plus grand consommateur

Une telle représentation permet de grandement simplifier l'écriture du code.

In [50]:
lambda_ = 1.5e3

class Node_building:
    # Un bâtiment sera représenté par un indice, la commune et l'objet bâtiment qui y est associé
    def __init__(self, index, code_commune):
        self.index = index
        self.commune = code_commune
        self.building = buildings_by_commune[code_commune][index]
        self.coordinates = self.building["geometry"]["coordinates"][0][0]
        x, y = zip(*self.coordinates)
        self.center = (np.mean(x), np.mean(y))
        self.type = "batiment"
        self.predecessor = None
        properties = self.building["properties"]
        self.heat_demand = heat_demands_by_commune[code_commune][index]
        self.radius = self.heat_demand / lambda_
        self.construction_date = properties["ffo_bat_annee_construction"]
        self.heat_profile_coeff = heat_profile_coeffs_by_commune[code_commune][index]

class Node_road:
    def __init__(self, index, code_commune):
        self.index = index
        self.commune = code_commune
        self.road = roads_by_commune[code_commune][index]

        # Récupérer les coordonnées brutes
        raw_coordinates = self.road["geometry"]["coordinates"]

        # Vérifier si c'est une structure 2D ou 3D, ou un type inattendu
        if isinstance(raw_coordinates[0], (list, tuple)):
            self.coordinates = [
                (coord[0], coord[1]) if len(coord) >= 2 else (coord[0], 0)
                for coord in raw_coordinates
                if isinstance(coord, (list, tuple))  # Vérifie que coord est une liste ou un tuple
            ]
        else:
            # Si la structure est une seule ligne
            self.coordinates = [
                (raw_coordinates[0], raw_coordinates[1]) if len(raw_coordinates) >= 2 else (raw_coordinates[0], 0)
            ]

        # Vérifier si des coordonnées valides ont été trouvées
        if not self.coordinates:
            raise ValueError(f"Aucune coordonnée valide trouvée pour la route à l'index {index}. "
                             f"Données initiales : {self.road['geometry']['coordinates']}")

        # Calcul du centre
        x, y = zip(*self.coordinates)
        self.center = (np.mean(x), np.mean(y))
        self.type = "route"

        self.predecessor = None
        self.length = shape(self.road["geometry"]).length

class Node_plant:
    # L'usine à chaleur sera représenté par sa commune et l'objet bâtiment qui y est associée
    # Différence avec les noeuds bâtiments : ni demande en chaleur ni rayon caractéristique, index = nb_buildings pour le distinguer et ce sera le dernier dans les listes
    def __init__(self, code_commune):
        self.index = len(buildings_by_commune.get(code_commune, []))
        self.building = highest_demand[code_commune]
        self.coordinates = self.building["geometry"]["coordinates"][0][0]
        x, y = zip(*self.coordinates)
        self.center = (np.mean(x), np.mean(y))
        self.type = "Plus grand consommateur"
        self.predecessor = None

### Modélisation des arêtes

La présence d'une arête entre deux noeuds signifie qu'un tuyau peut joindre directement ces derniers en suivant la route et sans croiser de noeud intermédiaire. Il y a ainsi deux types de noeuds :
* Entre deux tronçons de routes successifs;
* Entre un bâtiment et une route qui lui est proche.

Nous avons fait le choix de ne raccorder que depuis la route les bâtiments, on exclut donc la possibilité de réaliser des tuyaux entre les habitations. Il y aura donc éventuellement une légère surestimation du coût du réseau de chaleur qu'il faudra prendre en compte lors de la présentation du projet.

Les arêtes seront modélisées par des tables d'adjacence représentées par des dictionnaires Python. Il s'agit certes de structures redondantes, mais nous les avons choisi pour leur grande efficacité en terme de temps de calcul. L'encombrement mémoire ne représentera par ailleurs pas un problème étant donnée la faible taille du graphe étudié (de l'odre du millier de noeuds).

### Construction du graphe

Les règles sont les suivantes pour calculer les arêtes :
1. Deux routes sont connectées si et seulement si la distance minimale entre ces dernières est inférieure à 5m;
2. Un bâtiment est connecté à sa route la plus proche.

In [None]:
# Construction du graphe

list_nodes_buildings_by_commune ={}
list_nodes_roads_by_commune ={}
list_edges_buildings_by_commune ={}
list_edges_roads_by_commune ={}
list_edges_roads_buildings_by_commune ={}

for code_commune in tqdm(filtered_communes, desc="Chargement par commune", unit="commune"):
    
    # Nombre de bâtiments dans la commune
    nb_buildings = len(buildings_by_commune.get(code_commune, []))
    nb_roads = len(roads_by_commune.get(code_commune, []))
    # Vérification s'il y a des bâtiments dans cette commune
    if nb_buildings == 0:
        continue
    
    # Liste des noeuds bâtiments
    list_nodes_buildings = [Node_building(index, code_commune) for index in range(0, nb_buildings)] + [Node_plant(code_commune)]
    # Liste des noeuds routes
    list_nodes_roads = [Node_road(index, code_commune) for index in range(nb_roads)]
    # Liste des arêtes entre les routes initialisée vide
    list_edges_roads = {road.index : [] for road in list_nodes_roads}
    # Liste des arêtes entre routes et bâtiments initialisée vide
    list_edges_buildings = {building.index : [] for building in list_nodes_buildings}
    # Liste des arêtes entre routes et bâtiments, mais dans l'autre sens
    list_edges_roads_buildings = {road.index : [] for road in list_nodes_roads}

        # Relier les routes entre elles à 5m près
    for road1 in list_nodes_roads:
        for road2 in list_nodes_roads[road1.index+1:]:
            if road2 != road1:
                line1 = LineString(road1.coordinates)
                line2 = LineString(road2.coordinates)
        
                # On relie si cette distance est inférieure à 5m
                if line1.distance(line2) <= 5:
                    list_edges_roads[road1.index].append(road2)
                    list_edges_roads[road2.index].append(road1)
                
    # Relier les bâtiments et l'usine à chaleur à la route la moins éloignée
    for building in list_nodes_buildings:
        is_connected = False
        closest_road = list_nodes_roads[0]
        closest_road_distance = 10000
        for road in list_nodes_roads:
    
            point1 = Point(building.center)
            line2 = LineString(road.coordinates)

            # Calculer la distance minimale entre le bâtiment et la route
            minimal_distance = point1.distance(line2)
    
            if minimal_distance < closest_road_distance:
                closest_road_distance = minimal_distance
                closest_road = road
            
        # On connecte à la route la plus proche
        list_edges_buildings[building.index].append((closest_road, closest_road_distance))
        list_edges_roads_buildings[closest_road.index].append((building, closest_road_distance))

    list_nodes_buildings_by_commune[code_commune] = list_nodes_buildings
    list_nodes_roads_by_commune[code_commune] = list_nodes_roads
    list_edges_buildings_by_commune[code_commune] = list_edges_buildings
    list_edges_roads_by_commune[code_commune] = list_edges_roads
    list_edges_roads_buildings_by_commune[code_commune] = list_edges_roads_buildings


  return lib.distance(a, b, **kwargs)
Chargement par commune:  58%|██████████████████████████████▉                      | 14/24 [57:10<14:48, 88.83s/commune]

## Implémentation de l'algorithme de tracé

Tout est codé dans la classe `Graph` implémentée ci-dessous. Quelques points pour décrire les outils implémentés :
* Le réseau de chaleur est initialisé au seul noeud de production
* Le critère de raccordabilité est toujours vérifié et mis à jour
* La connexion entre deux bâtiments est effectuée en suivant le plus court chemin dans un espace métrique où l'on suit les routes et où les routes déjà traversées par le réseau sont de longueurs nulles (les distances sont donc actualisées dynamiquement).

Modification de l'algo pour marquer comme parcouru et non connecté au réseau tout bâtiment "isolé", c'est à dire non connecté aux bâtiments de la commune étudiée par des routes - comme le cas de la commune '60178'.

In [None]:
from numpy import inf


class Graph:

    def __init__(self, list_nodes_buildings, list_nodes_roads, list_edges_buildings, list_edges_roads, list_edges_roads_buildings):
        self.list_nodes_buildings = list_nodes_buildings
        self.list_nodes_roads = list_nodes_roads
        self.list_edges_buildings = list_edges_buildings
        self.list_edges_roads = list_edges_roads
        self.list_edges_roads_buildings = list_edges_roads_buildings

        # Calcul du nombre de bâtiments et routes
        self.nb_buildings = len(self.list_nodes_buildings) - 1  # Exclut l'usine
        self.nb_roads = len(self.list_nodes_roads)

        # Initialisation des tableaux
        self.dhn_roads = np.full(self.nb_roads, False)  # Réseau de chaleur pour les routes
        self.dhn_buildings = np.full(self.nb_buildings, False)  # Réseau de chaleur pour les bâtiments

        # Initialisation de self.radii avec une taille adaptée
        max_index = max(building.index for building in self.list_nodes_buildings[:-1])
        self.radii = np.zeros(max_index + 1)

        self.network_radius = 0
        self.loop_test = False

    def initialise_radii(self):
        """
        Réinitialise la liste interne au graphe aux valeurs des rayons des bâtiments
        """
        for building in self.list_nodes_buildings[:-1]:
            self.radii[building.index] = building.radius

    def connect_network(self, building, verbose=False):
        """
        Connecte building depuis l'usine à chaleur en remontant le plus court chemin
        grâce aux prédecesseurs (calculés en amont de cette fonction avec shortest_way !)
        """
        node = building.predecessor
        # Indiquer le bâtiment comme connecté
        if node == None:
            self.dhn_buildings[building.index] = False
        else:
            self.dhn_buildings[building.index] = True
            # Trouver la rue connectée à building située sur le plus court chemin entre building et l'usine à chaleur
            while node.type != "Plus grand consommateur":
                if node.type == 'route':
                    if verbose and not self.dhn_roads[node.index]:
                    # On remonte le plus court chemin jusqu'à revenir à building
                        self.dhn_roads[node.index] = True
                node = node.predecessor

    def reinitialise_predecessors(self):
        """
        Réinitialise les attributs 'predecessor' de tous les noeuds
        """
        for building in self.list_nodes_buildings:
            building.predecessor = None
        

    def shortest_way(self, initial_builing, list_buildings, verbose=False):
        """
        Calcule les plus petits chemins entre initial_building et les éléments
        de liste_building
        """
        verbose = False
        # Algorithme de Djikstra, termine lorsque les batiments de liste_batiments ont été parcourus
        # Réinitialisation des prédecesseurs
        self.reinitialise_predecessors()
        # Initialisation
        list_index = [building.index for building in list_buildings]
        distances_buildings = np.full(self.nb_buildings + 1, np.inf)
        distances_roads = np.full(self.nb_roads, np.inf)
        buildings_to_cross = np.full(self.nb_buildings + 1, True)
        roads_to_cross = np.full(self.nb_roads, True)

        distances_buildings[initial_builing.index] = 0
        
        # Itérations - tant que les batiments de list_buildings n'ont pas tous été parcourus
        while np.any(buildings_to_cross[list_index]):
            # Nous allons calculer le noeud de plus petite distance en comparant celui de type Node_building et celui de type Node_road (tous deux de plus petite distance par rapport à leurs semblables)

            # Il reste nécessairement des noeuds de bâtiments à parcourir
            # On choisit le noeud non parcouru de plus petite distance de type bâtiment
            mask_buildings = buildings_to_cross & (distances_buildings == np.min(distances_buildings[buildings_to_cross])) # tableau de booléens dont les composantes valent True ssi le bâtiment n'a pas été parcouru et est de plus petite distance
            index_building = np.argmax(mask_buildings) # Calcule l'indice de la première occurence de True dans le tableau précédent
            # On stocke la distance correspondante
            minimal_distance_buildings = distances_buildings[index_building] 

            # S'il reste des noeuds de routes à parcourir
            if np.any(roads_to_cross):
                # On choisit le noeud non parcouru de plus petite distance de type route
                mask_roads = roads_to_cross & (distances_roads == np.min(distances_roads[roads_to_cross]))
                index_road = np.argmax(mask_roads)
                minimal_distance_roads = distances_roads[index_road]
                # test = "Le noeud non parcouru de plus petite distance est-il un bâtiment ?"
                test = minimal_distance_buildings <= minimal_distance_roads
            # S'il n'y a plus de routes à parcourir, alors on dit que test est vrai pour parcourir le noeud bâtiment de plus petite distance
            else :
                test = True
            
            # CAS 1 : le noeud de plus petite distance est un bâtiment
            if test:
                # current_building est le bâtiment correspondant à cette distance
                current_building = self.list_nodes_buildings[index_building]
                # On l'indique comme parcouru
                buildings_to_cross[index_building] = False
                # On parcourt les noeuds voisins non parcourus (routes)
                for road, length in self.list_edges_buildings[index_building]:
                    # On teste si le noeud n'a pas été parcouru
                    if roads_to_cross[road.index]:
                        # Minimisation du chemin
                        new_distance = distances_buildings[index_building] + length
                        if distances_roads[road.index] > new_distance:
                            distances_roads[road.index] = new_distance
                            road.predecessor = current_building
                
            # CAS 2 : le noeud de plus petite distance est une route
            else:
                # current_road est la route correspondant à cette distance
                current_road = self.list_nodes_roads[index_road]
                # On l'indique comme parcourue
                roads_to_cross[index_road] = False
                # On détermine la longueur de tuyaux à ajouter sur la route actuelle
                if self.dhn_roads[current_road.index]:
                    length_road = 0
                else:
                    length_road = current_road.length
                # On parcourt les routes voisines non parcourues
                for road in self.list_edges_roads[index_road]:
                    if roads_to_cross[road.index]:
                        # Minimisation du chemin
                        new_distance = distances_roads[index_road] + length_road
                        if distances_roads[road.index] > new_distance:
                            distances_roads[road.index] = new_distance
                            road.predecessor = current_road
                # On parcourt les batiments voisins non parcourus
                for building, length in self.list_edges_roads_buildings[index_road]:
                    if buildings_to_cross[building.index]:
                        # Minimisation du chemin
                        new_distance = distances_roads[index_road] + length + length_road
                        if distances_buildings[building.index] > new_distance:
                            distances_buildings[building.index] = new_distance
                            building.predecessor = current_road
        return { list_index[k] : distances_buildings[building.index] for k in range(len(list_index)) }
      
            
    def compute_dhn(self, verbose=False):
        """
        Calcule le tracé du réseau de chaleur urbain
        """
        # Initialisation de la file de parcours des bâtiments, dans l'ordre de la conso énergétique décroissante
        self.queue = list(np.arange(self.nb_buildings))
        # Initialisation d'une liste pour savoir quels bâtiments ont déjà été parcourus
        self.were_crossed_buildings = np.full(self.nb_buildings, 0)
        # Réinitialisation des rayons
        self.initialise_radii()
        self.network_radius = 0
        # Réinitialisation du réseau de chaleur
        self.dhn_buildings = np.full(self.nb_buildings + 1, False)
        self.dhn_roads = np.full(self.nb_roads, False)
        # Le réseau est initialisé au noeud du plus grand consommateur
        self.dhn_buildings[-1] = True
        # On parcourt tous les bâtiments (hors usine à chaleur)
        while len(self.queue) > 0:
            # On prend le premier bâtiment dans la file d'attente
            index = self.queue[0]
            current_building = self.list_nodes_buildings[index]
            # Calcule sa distance au réseau de chaleur
            distance_network = self.shortest_way(self.list_nodes_buildings[-1], [current_building], verbose=verbose)[index]
            # Si le raccord est économiquement viable, on l'effectue
            if self.radii[index] + self.network_radius >= distance_network:
                # Raccord
                self.connect_network(current_building, verbose=verbose)
                # Actualisation du rayon du réseau de chaleur
                self.network_radius += self.radii[index] - distance_network
                # On indique que le dernier bâtiment a avoir été raccordé l'a été après x tentatives
                self.loop_test = self.were_crossed_buildings[index]
            # Sinon, on remet le bâtiment au fond de la file à condition qu'un bâtiment a été connecté lors de la dernière boucle
            elif self.loop_test >= self.were_crossed_buildings[index] - 1:
                # Ajout à la file
                self.queue.append(index)
                # On indique que le bâtiment a été parcouru une fois de plus
                self.were_crossed_buildings[index] += 1
            # Dernier cas, le bâtiment ne sera pas raccordé au réseau
            else:
                pass
            # On retire current_building de la file
            self.queue.pop(0)

## Application de l'algorithme - tracé du réseau de chaleur

In [None]:
G_by_commune = {}
execution_times_algorithm = {}

for code_commune in tqdm(filtered_communes, desc="Chargement par commune", unit="commune"):
    
    start_time = time.time()
    
    G_by_commune[code_commune] = Graph(list_nodes_buildings_by_commune[code_commune], list_nodes_roads_by_commune[code_commune], list_edges_buildings_by_commune[code_commune], list_edges_roads_by_commune[code_commune], list_edges_roads_buildings_by_commune[code_commune])
    G_by_commune[code_commune].compute_dhn(verbose=True)

    end_time = time.time()  # Fin de la mesure du temps
    execution_times_algorithm[code_commune] = end_time - start_time  # Stocker la durée

# Afficher un résumé global si nécessaire
print("\nRésumé des temps d'exécution par commune :")
for commune, duration in execution_times_algorithm.items():
    print(f"Commune {commune}: {duration:.4f} secondes")

## Analyse du RCU créé

On extrait le réseau de chaleur tracé pour pouvoir analyser sa longueur et la demande en chaleur qu'il couvre.

**Extraction des routes :**

In [None]:
dhn_roads_by_commune= {}

for code_commune in tqdm(filtered_communes, desc="Chargement par commune", unit="commune"):
    G = G_by_commune[code_commune]
    list_roads = list(G.dhn_roads)
    
    dhn_roads_by_commune[code_commune] = []
    
    for index, road in enumerate(G.list_nodes_roads):
        if list(G.dhn_roads)[index]: # Si la route est connectée
            x_coords, y_coords = zip(*road.coordinates)
            dhn_roads_by_commune[code_commune].append(LineString(zip(x_coords, y_coords)))
            len(dhn_roads_by_commune[code_commune])

**Extraction des bâtiments :**

In [None]:
dhn_buildings_by_commune = {}

for code_commune in tqdm(filtered_communes, desc="Chargement par commune", unit="commune"):
    dhn_buildings_by_commune[code_commune] = []
    
    for index in range(G_by_commune[code_commune].nb_buildings):  # Parcourt tous les bâtiments sauf l'usine
        if G_by_commune[code_commune].dhn_buildings[index]:  # Si le bâtiment est connecté
            building = list_nodes_buildings_by_commune[code_commune][index]  
            building_info = {
                "index": building.index,              # Index du bâtiment
                "geometry": building.building["geometry"],  # Géométrie du bâtiment
                "center": building.center,            # Coordonnées du centre
                "heat_demand": building.heat_demand   # Demande de chaleur du bâtiment
            }
            dhn_buildings_by_commune[code_commune].append(building_info)  


**Affichage du réseau tracé :**

In [None]:
# Demande du code postal de la ville/lieu à étudier

def demander_code_commune():
    while True:
        code_commune = input(f"Code commune Insee retenu parmi {filtered_communes} : ")
        
        # Validation simple : vérifier que le code postal contient exactement 5 chiffres
        if code_commune.isdigit() and len(code_commune) == 5:
            return code_commune
        else:
            print("Le code commune Insee doit contenir exactement 5 chiffres. Veuillez réessayer.")

def find_libelle_commune(code_commune):
    buildings_fiona = fiona.open(buildings_file)

    libelle_commune = None
    for building in buildings_fiona:
        building_properties = building["properties"]
        if building_properties["code_commune_insee"] == code_commune:
            libelle_commune = building_properties["libelle_commune_insee"]
            break  # On arrête dès qu'on trouve un match
    
    buildings_fiona.close()

    return libelle_commune

code_commune = demander_code_commune()
libelle_commune = find_libelle_commune(code_commune)
print(f"La commune retenue est : {libelle_commune}")

In [None]:
for route in dhn_roads_by_commune[code_commune]:
    x, y = zip(*route.coords)  # Séparation des coordonnées x et y
    plt.plot(x, y, color="green", linewidth=1, label="Routes connectées")  # Tracé en vert pour les routes connectées

# Tracé des nœuds créés (en bleu)
for building in dhn_buildings_by_commune[code_commune]:
    x, y = building["center"]  # Coordonnées extraites du champ 'center'
    plt.scatter(x, y, color="red", s=20, label="Bâtiments connectés")

plt.title(f"RCU créé - Commune de {libelle_commune}")
plt.show()

On calcule alors la longueur totale du réseau tracé, en kilomètres.

In [None]:
def get_total_dhn_length(connected_routes):
    total_length = 0

    for route in connected_routes:
        # Créer une ligne à partir des coordonnées du tronçon
        line = LineString(route)
        
        # Ajouter la longueur du tronçon à la longueur totale
        total_length += line.length

    return total_length

In [None]:
dhn_length_by_commune = {}

for code_commune in filtered_communes:
    dhn_length_by_commune[code_commune] = []
    length = get_total_dhn_length(dhn_roads_by_commune[code_commune])
    dhn_length_by_commune[code_commune] = length/1e3

dhn_length_by_commune

<u>Remarque</u> : Nous négligeons, dans la valeur ci-dessus, la longueur des pipes pour connecter l'usine à chaleur au réseau - estimée à une centaine de mètres maximum et donc négligeable.

Calcul de la demande en chaleur, en GWh.

In [None]:
def get_dhn_heat_demand(connected_buildings):
    total_demand = 0  # Initialisation de la consommation totale
    
    for building in connected_buildings:
        total_demand += building["heat_demand"]  # Ajoute la demande de chaleur de chaque bâtiment

    return total_demand

In [None]:
dhn_heat_demand_by_commune = {}

for code_commune in filtered_communes:
    dhn_heat_demand_by_commune[code_commune] = []
    demand = get_dhn_heat_demand(dhn_buildings_by_commune[code_commune])
    dhn_heat_demand_by_commune[code_commune] = demand/1e6

dhn_heat_demand_by_commune

In [None]:
for code_commune in filtered_communes:
    if dhn_heat_demand_by_commune[code_commune] == 0:
        filtered_communes.remove(code_commune)

filtered_communes

In [None]:
dhn_linear_demand_by_commune = {}

for code_commune in filtered_communes:
    dhn_linear_demand_by_commune[code_commune] = []
    if dhn_length_by_commune[code_commune] > 0:
        linear_demand = dhn_heat_demand_by_commune[code_commune]/dhn_length_by_commune[code_commune]
        dhn_linear_demand_by_commune[code_commune] = linear_demand
    else:
        dhn_linear_demand_by_commune[code_commune] = "n.a."

dhn_linear_demand_by_commune

### Couverture du réseau et évaluation du potentiel de la zone étudiée

In [None]:
total_heat_demand_by_commune = []
total_dhn_heat_demand_by_commune = []

for code_commune in filtered_communes:
    total_heat_demand_by_commune.append(sum(heat_demands_by_commune[code_commune]))
    total_dhn_heat_demand_by_commune.append(get_dhn_heat_demand(dhn_buildings_by_commune[code_commune]))
total_heat_demand = sum(total_heat_demand_by_commune)/1e6
total_dhn_heat_demand = sum(total_dhn_heat_demand_by_commune)/1e6

print(f"Demande en chaleur totale couverte par le RCU créé : {total_dhn_heat_demand:.2f} GWh")
print(f"Demande en chaleur totale de la zone étudiée : {total_heat_demand:.2f} GWh")
print(f"Couverture totale du RCU créé (pour un seuil de puissance à 30kW) : {(total_dhn_heat_demand/total_heat_demand)*100:.2f}%") 

Nous aboutissons donc à une couverture totale de plus de la moitié de la demande en chaleur totale de la zone étudiée, ce qui représentente un potentiel de couverture très important.

## Optimisation des RCU créés

Nous pouvons remarquer, en affichant précédemment le RCU créé, que celui-ci présente une principale contradiction : des longueurs de canalisations sont parfois inutiles, en particulier aux extrémités du réseau. Cela est dû au fait que l'algorithme sélectionne des routes entières et non des tronçons de route - donc ne sarrête pas au dernier bâtiment de la route mais à la fin de celle-ci. La suite de l'algorithme ci-dessous tâchera donc de retirer les longueurs "inutiles" de notre réseau.  

### Réunion et redécoupage

**1<sup>ère</sup> étape : Réunion des routes du RCU créé**

In [None]:
%%time

merged_connected_roads_by_commune = {}

for code_commune in filtered_communes:
    merged_connected_roads_by_commune[code_commune]= []
    merged_connected_roads_by_commune[code_commune] = linemerge(MultiLineString(dhn_roads_by_commune[code_commune]))
    if isinstance(merged_connected_roads_by_commune[code_commune], MultiLineString):
        print(f"{code_commune} - Nombre de routes du RCU après réunion : {len(merged_connected_roads_by_commune[code_commune].geoms)}")
    else:
        print(f"{code_commune} - Nombre de routes du RCU après réunion : 1")

**2<sup>ème</sup> étape : Projection  orthogonale des bâtiments sur les routes**

In [None]:
%%time

projected_dhn_buildings_by_commune = {}

for code_commune in filtered_communes:

    projected_dhn_buildings_by_commune[code_commune] = []
    
    # Créer un MultiLineString à partir des routes
    multiline = MultiLineString([LineString(route) for route in dhn_roads_by_commune[code_commune]])
    
    for building in dhn_buildings_by_commune[code_commune]:
        building_point = Point(building["center"])
    
        # Trouver la route la plus proche du bâtiment
        closest_route = None
        min_distance = float("inf")
        for route in dhn_roads_by_commune[code_commune]:
            line = LineString(route)
            distance = line.distance(building_point)
            if distance < min_distance:
                min_distance = distance
                closest_route = line
    
        # Projeter le point du bâtiment sur la route la plus proche
        projected_point = closest_route.interpolate(closest_route.project(building_point))
        projected_dhn_buildings_by_commune[code_commune].append(projected_point)

**3<sup>ème</sup> étape : Découpage des routes en chaque point de projection des bâtiments**

In [None]:
%%time

split_dhn_roads_by_commune = {}

for code_commune in filtered_communes:
    split_dhn_roads_by_commune[code_commune] = []
    if isinstance(merged_connected_roads_by_commune[code_commune], LineString):
        split_dhn_roads_by_commune[code_commune].append(merged_connected_roads_by_commune[code_commune])
    else:
        split_dhn_roads_by_commune[code_commune] = split_routes(merged_connected_roads_by_commune[code_commune], projected_dhn_buildings_by_commune[code_commune], safety_distance=10)

In [None]:
%%time

# Conversion en objet GeoJSON
final_dhn_roads_by_commune = {}

for code_commune in filtered_communes:
    final_dhn_roads_by_commune[code_commune] = []
    for line in split_dhn_roads_by_commune[code_commune]:
        geojson_geometry = mapping(line)
        
        feature = fiona.model.Feature(
            geometry=geojson_geometry,
            properties={}  # Dictionnaire vide pour les propriétés
        )
        final_dhn_roads_by_commune[code_commune].append(feature)

In [None]:
%%time

# On affecte le nouveau découpage aux routes du RCU créé, et on le convertit en Linestring pour l'optimisation
dhn_routes_by_commune = {}

for code_commune in filtered_communes:
    dhn_routes_by_commune[code_commune] = []
    for route in final_dhn_roads_by_commune[code_commune]:
        line = LineString(route["geometry"]["coordinates"])
        dhn_routes_by_commune[code_commune].append(line)

## Optimisation

**1<sup>ère</sup> étape : Création des noeuds extrémaux**

*1) Création des noeuds pour chacune des deux extrémités des routes :*

In [None]:
class Node:
    def __init__(self, index, coordinates, node_type):
        self.index = index  # ID unique
        self.coordinates = coordinates  # Coordonnées (x, y)
        self.type = node_type  # Type de nœud, ex: "route", "bâtiment"
        self.predecessor = None  # Attribut pour stocker le prédécesseur, utile pour des algos comme Dijkstra

def create_nodes_from_routes(routes):
    nodes = []
    node_map = {}  # Dictionnaire pour éviter les doublons (clé : tuple(coordonnées), valeur : node_id)
    node_id_counter = 0  # Compteur d'ID des nœuds

    for route in routes:
        # Vérifie que la route est un LineString
        if isinstance(route, LineString):
            # Obtenir les extrémités
            start_point = Point(route.coords[0])  # Extrémité de début
            end_point = Point(route.coords[-1])  # Extrémité de fin

            # Ajouter les points aux nœuds, en évitant les doublons
            for point in [start_point, end_point]:
                coord = (point.x, point.y)
                if coord not in node_map:
                    # Si le point est unique, créer un nouveau nœud
                    node = Node(node_id_counter, coord, "route")
                    nodes.append(node)
                    node_map[coord] = node_id_counter
                    node_id_counter += 1

    return nodes

In [None]:
%%time

nodes_by_commune = {}

for code_commune in filtered_communes:
    nodes_by_commune[code_commune]=create_nodes_from_routes(dhn_routes_by_commune[code_commune])

*2) Sélection des noeuds internes et extrémaux du RCU créé*

In [None]:
%%time

internal_nodes_by_commune = {}
extremal_nodes_by_commune = {}

for code_commune in filtered_communes:

    internal_nodes_by_commune[code_commune] = []
    
    def is_near_multiple_routes(point, routes, tolerance):
        count = 0
        previous_route = None  # Pour mémoriser la route précédente proche du point
    
        for route in routes:
            # Vérifier qu'il s'agit d'une route et pas juste d'un point
            if route.length == 0:
                continue
            # Vérifier si la route est proche du point
            if point.distance(route) < tolerance:
                # Si une route précédente existe, vérifier la superposition de deux coordonnées ou plus
                if previous_route:
                    # Comparer les coordonnées des deux routes
                    common_coords = set(route.coords).intersection(set(previous_route.coords))
                    if len(common_coords) >= 2:  # Si deux coordonnées ou plus sont partagées
                        continue  # Ignorer cette route (ne pas l'inclure dans le comptage)
                
                # Si aucune route précédente ou si aucune superposition, on ajoute à `count`
                count += 1
                previous_route = route  # Mettre à jour la route précédente
    
            # Si au moins deux routes sont proches du point, on arrête et on retourne True
            if count >= 2:
                return True
    
        return False 

    for node in nodes_by_commune[code_commune]:
        node_point = Point(node.coordinates)
        if is_near_multiple_routes(node_point, dhn_routes_by_commune[code_commune], 5):
            internal_nodes_by_commune[code_commune].append(node)

    extremal_nodes_by_commune[code_commune] = [node for node in nodes_by_commune[code_commune] if node not in internal_nodes_by_commune[code_commune] ]
    extremal_nodes_by_commune[code_commune] = [node for node in extremal_nodes_by_commune[code_commune] if all(Point(node.coordinates).distance(point)>10 for point in projected_dhn_buildings_by_commune[code_commune])]

**2<sup>ème</sup> étape : Sélection des parties à "raccourcir"**

*1) Sélection des routes extrémales*

In [None]:
%%time

extremal_routes_by_commune = {}

for code_commune in filtered_communes:
    extremal_routes_by_commune[code_commune] =[]
    for node in extremal_nodes_by_commune[code_commune]:
        node_point = Point(node.coordinates)  # Convertir le nœud en Point
    
        # Filtrer les routes pour celles qui contiennent ou touchent le nœud
        filtered_routes = [
            route for route in dhn_routes_by_commune[code_commune]
            if isinstance(route, LineString) and route.intersects(node_point) and route.length > 0  # Vérifie si le Point est sur la LineString
        ]
        
        # Sélectionner la première route correspondante
        matching_route = filtered_routes[0] if filtered_routes else None
        extremal_routes_by_commune[code_commune].append(matching_route)

*2) Suppression des routes extrémales*

In [None]:
%%time

dhn_roads_optimized_by_commune = {}

for code_commune in filtered_communes:
    dhn_roads_optimized_by_commune[code_commune] = []
    if len(dhn_routes_by_commune[code_commune]) > 1:
        for route in dhn_routes_by_commune[code_commune]:
            if route not in extremal_routes_by_commune[code_commune]: # On ne conserve que les routes non extrémales
                dhn_roads_optimized_by_commune[code_commune].append(route)
    else:
        dhn_roads_optimized_by_commune[code_commune] = dhn_roads_by_commune[code_commune]

### Evaluation du gain après optimisation

**Calcul de la longueur du réseau optimisé :**

In [None]:
dhn_length_optimized_by_commune = {}
reduction_percentage = []

print("Différence APRES vs. AVANT optimisation, Réduction du réseau :")
for code_commune in filtered_communes:
    dhn_length_optimized_by_commune[code_commune] = []
    dhn_length_optimized_by_commune[code_commune].append(get_total_dhn_length(dhn_roads_optimized_by_commune[code_commune])/1e3)
    reduction = ((dhn_length_optimized_by_commune[code_commune][0]/dhn_length_by_commune[code_commune])-1)*100
    reduction_percentage.append(reduction)
    print(f"{dhn_length_optimized_by_commune[code_commune][0]:.2f} km vs. {dhn_length_by_commune[code_commune]:.2f} km, {reduction :.2f} %")

mean_reduction = sum(reduction_percentage)/len(reduction_percentage)
print(f"\033[1mRéduction moyenne: {mean_reduction:.2f}%\033[0m")

In [None]:
for code_commune in filtered_communes:
    print(f"{code_commune} - Nouvelle densité linéique: {dhn_heat_demand_by_commune[code_commune]/dhn_length_optimized_by_commune[code_commune][0]:.2f} GWh/km")

In [None]:
import matplotlib.pyplot as plt
from shapely.geometry import shape
import random

# Créer une figure avec 5 lignes et 2 colonnes
if len(filtered_communes) % 2 == 0:
    rows = int(len(filtered_communes)/2)
else:
    rows = int(len(filtered_communes)/2) + 1
fig, axes = plt.subplots(rows, 2, figsize=(10,50))  # Taille ajustée pour une bonne visibilité
fig.suptitle("RCU créés", fontsize=16)

# Parcourir chaque commune et afficher sur un sous-graphe
for idx, (code_commune, ax) in enumerate(zip(filtered_communes, axes.flatten())):
    # Obtenir les routes et bâtiments pour la commune
    routes = list_nodes_roads_by_commune[code_commune]  # Liste d'objets LineString
    buildings = list_nodes_buildings_by_commune[code_commune]  # Objets Fiona (dictionnaires)

    # Tracer les routes (objets LineString)
    for road in dhn_roads_optimized_by_commune[code_commune]:
        if hasattr(road, 'coordinates'):  # S'assurer que l'objet a des coordonnées
            coords = road.coordinates
        else:  # Sinon utiliser la méthode de `shapely`
            coords = list(road.coords)
        x, y = zip(*coords)
        ax.plot(x, y, color="black", linewidth=1, label="Routes" if idx == 0 else "")

    # Tracer les bâtiments (objets fiona)
    for building in dhn_buildings_by_commune[code_commune]:
        geom = shape(building["geometry"])  # Transformer en objet shapely
        x, y = geom.centroid.x, geom.centroid.y  # Coordonnées du centre du bâtiment
        ax.scatter(x, y, color="blue", s=10, label="Bâtiments" if idx == 0 else "")

    # Ajouter le titre de la sous-figure
    ax.set_title(find_libelle_commune(code_commune))
    ax.grid(True)
    ax.set_xticks([])
    ax.set_yticks([])

fig.savefig('dhn_oise.png')

## Comparaison au réseau de chaleur existant

Grâce au site France Chaleur, on a extrait les données Geopackage des réseaux de chaleur en France, ici sous le nom de `dhn_france_file`. Nous souhaitons comparer la longueur et le livraison de chaleur du réseau réel dans la zone sélectionnée.

Nous commençons par visualiser le fichier gpkg. et ses attributs.

In [None]:
%%time

# Ouverture du fichier France chaleur des réseaux de chaleurs en France
dhn_france_file = "C:/Users/charl/OneDrive/Documents/Documents/4. Mines/2. EFFINERSYS/Réseaux de chaleur et transition énergétique/test60/reseaux_chaleur_France.gpkg"
dhn_fiona = fiona.open(dhn_france_file)
gdf = gpd.read_file(dhn_france_file, layer='reseaux_de_chaleur')

# Lister tous les attributs (colonnes) disponibles
print("Attributs disponibles dans la couche :")
print(gdf.columns)

**1<sup>er</sup> filtrage sur les réseaux de chaleur français**

Nous pouvons alors procéder au filtrage pour ne sélectionner que les réseaux dont les attributs `longueur_reseau` et `livraisons_totale_MWh`.

In [None]:
# Liste pour stocker les réseaux de chaleur filtrés
dhn_filtered = []

for dhn in dhn_fiona:
    properties = dhn["properties"]
    
    # Filtrage selon les critères : 'longueur_reseau', et 'livraisons_totale_MWh' non nuls
    if (properties["longueur_reseau"] is not None) and \
       (properties["livraisons_totale_MWh"] is not None):
        dhn_filtered.append(dhn)

# Fermer le fichier source après le traitement
dhn_fiona.close()

# Affichage du nombre de réseaux filtrés
print(len(dhn_filtered), "réseaux de chaleur ont été retenus.")

On sélectionne ensuite parmi tous les réseaux preé-filtrés le réseau de la zone sélectionnée à partir du `code_commune_insee`. 

<span style="color:red">Si la zone sélecitonnée n'admet pas de réseau de chaleur, l'étude comparative s'arrête ici.</span>

In [None]:
# Rechercher le libellé de la commune correspondant
buildings_fiona = fiona.open(buildings_file)
real_dhn_by_commune = {}

for code_commune in filtered_communes:
    libelle_commune = None
    for building in buildings_fiona:
        building_properties = building["properties"]
        if building_properties["code_commune_insee"] == code_commune:
            libelle_commune = building_properties["libelle_commune_insee"]
            break  # On arrête dès qu'on trouve un match
    
    if libelle_commune is None:
        print("Aucune commune trouvée pour ce code INSEE.")
    else:
        print(f"Le libellé de la commune est : {libelle_commune}")
    
    # Rechercher dans les réseaux de chaleur filtrés
    real_dhn_by_commune[code_commune] = []  # Liste pour stocker les réseaux correspondants
    
    for dhn in dhn_filtered:
        dhn_properties = dhn["properties"]
        # Vérifier si la commune associée au réseau correspond
        if dhn_properties["communes"] == libelle_commune:
            real_dhn_by_commune[code_commune].append(dhn)
    
    # Afficher les résultats
    if real_dhn_by_commune[code_commune]:
        print(f"{len(real_dhn_by_commune[code_commune])} réseau(x) de chaleur trouvé(s) pour la commune '{libelle_commune}':")
        for dhn in real_dhn_by_commune[code_commune]:
            print(f"- Nom du réseau : {dhn['properties'].get('nom_reseau', 'Nom inconnu')}")
    else:
        print(f"Aucun réseau de chaleur trouvé pour la commune '{libelle_commune}'.")

buildings_fiona.close()

In [None]:
communes_with_dhn = []

for code_commune in filtered_communes:
    if len(real_dhn_by_commune[code_commune]) > 0:
        communes_with_dhn.append(code_commune)

print(f"{len(communes_with_dhn)} communes possèdent un RCU réel.")

**Longueur et capacité d'approvisionnement du RCU réel**

In [None]:
real_dhn_length_by_commune = {}
real_dhn_heat_supply_by_commune = {}

for code_commune in communes_with_dhn:
    for dhn in real_dhn_by_commune[code_commune]:
        real_dhn_length_by_commune[code_commune] = dhn['properties'].get('longueur_reseau')
        real_dhn_heat_supply_by_commune[code_commune] = dhn['properties'].get('livraisons_totale_MWh')/1000
    print(f"Longueur totale du RCU réel : {real_dhn_length_by_commune[code_commune]:.2f} km")
    print(f"Consommation couverte par le RCU réel : {real_dhn_heat_supply_by_commune[code_commune]:.2f} GW")

**Evaluation de la performance de notre modèle :**

Visuellement, nous pouvons comparer les deux traçés des réseaux :

In [None]:
import matplotlib.pyplot as plt


for code_commune in communes_with_dhn:
    libelle_commune = find_libelle_commune(code_commune)
    # Création d'une figure avec 1 ligne et 2 colonnes pour les deux sous-graphes
    fig, axes = plt.subplots(1, 2, figsize=(14, 7))  # 1 ligne, 2 colonnes
    
    # Première sous-figure : RCU créé
    ax1 = axes[0]
    
    # Tracé des routes et positionnement des noeuds de route
    G = G_by_commune[code_commune]
    for road in G.list_nodes_roads:
        x, y = zip(*road.coordinates)
        ax1.plot(x, y, color="black", linewidth=0.8)
    
    # Tracé des routes du RCU créé
    for route in dhn_roads_optimized_by_commune[code_commune]:
        x, y = zip(*route.coords)  # Séparation des coordonnées x et y
        ax1.plot(x, y, color="green", linewidth=2)  # Tracé en vert pour les routes connectées
    
    # Tracé des bâtiments connectés
    for building in dhn_buildings_by_commune[code_commune]:
        x, y = building["center"]  # Coordonnées du centre du bâtiment
        heat_demand = building["heat_demand"]
        ax1.scatter(x, y, color="purple", s=20, label=f"Bâtiment {building['index']}")
        
    ax1.set_title(f"RCU créé - {libelle_commune}")
    ax1.axis("equal")  # Échelle égale pour les axes
    ax1.set_xticks([])
    ax1.set_yticks([])
    
    
    # Deuxième sous-figure : RCU réel
    ax2 = axes[1]
        
    # Tracé des routes et positionnement des noeuds de route
    for road in G.list_nodes_roads:
        x, y = zip(*road.coordinates)
        ax2.plot(x, y, color="black", linewidth=0.8)
    
    # Tracé des routes du RCU réel
    for road in real_dhn_by_commune[code_commune]:
        coordinates = road["geometry"]["coordinates"]  # Récupère les coordonnées
        for segment in coordinates:  
            x, y = zip(*segment)
            ax2.plot(x, y, color="red", linewidth=1)
    
    ax2.set_title(f"RCU réel - {libelle_commune}")
    ax2.axis("equal")
    ax2.set_xticks([])
    ax2.set_yticks([])

    fig.savefig(f"{code_commune}_real_vs_created_dhn_oise.png")
    # Affichage des graphesplt.tight_layout()  # Améliorer l'espacement entre les sous-graphes
    plt.tight_layout()  # Améliorer l'espacement entre les sous-graphes
    plt.show()

In [None]:
print("\033[1mCouverture RCU créé vs. RCU réél\033[0m")

coverage_percentage = []

for code_commune in communes_with_dhn:
    libelle_commune = find_libelle_commune(code_commune)
    percentage = (dhn_heat_demand_by_commune[code_commune]/real_dhn_heat_supply_by_commune[code_commune] - 1)*100
    coverage_percentage.append(percentage)
    print(f"{libelle_commune} - {dhn_heat_demand_by_commune[code_commune]:.2f} GWh vs {real_dhn_heat_supply_by_commune[code_commune]:.2f} GWh ---- Potentiel de couverture supplémentaire : {percentage:.2f}%")

mean_coverage = sum(coverage_percentage)/len(coverage_percentage)
print(f"\033[1mEn moyenne: {mean_coverage:.2f}%\033[0m")

In [None]:
print(f"\033[1mCouverture du réseau créé vs. réseau réel\033[0m")

for code_commune in filtered_communes:
    libelle_commune = find_libelle_commune(code_commune)
    if code_commune in communes_with_dhn:
        created_coverage = (dhn_heat_demand_by_commune[code_commune]/(sum(heat_demands_by_commune[code_commune])*1e-6))*100
        real_coverage = (real_dhn_heat_supply_by_commune[code_commune]/(sum(heat_demands_by_commune[code_commune])*1e-6))*100
        difference = created_coverage - real_coverage
        print(f"{libelle_commune} - RCU créé : {created_coverage:.2f}% vs RCU réél {real_coverage:.2f}%")
        print(f"==> Potentiel : {difference:.2f}%")
    else:
        created_coverage = (dhn_heat_demand_by_commune[code_commune]/(sum(heat_demands_by_commune[code_commune])*1e-6))*100
        print(f"{libelle_commune} - RCU créé : {created_coverage:.2f}%")
        print(f"==> Potentiel : {created_coverage:.2f}%")

In [None]:
print("\033[1mRapport 'demande en chaleur couverte/longueur du réseau' - RCU créé vs. RCU réel\033[0m")

for code_commune in communes_with_dhn:
    libelle_commune = find_libelle_commune(code_commune)
    real_ratio = real_dhn_heat_supply_by_commune[code_commune]/real_dhn_length_by_commune[code_commune]
    created_ratio = dhn_heat_demand_by_commune[code_commune]/dhn_length_optimized_by_commune[code_commune][0]
    print(f"{libelle_commune} - {created_ratio:.2f} GWh/km vs. {real_ratio:.2f} GWh/km")

FIN