In [None]:
!pip install osmnx networkx folium

Collecting osmnx
  Downloading osmnx-2.0.1-py3-none-any.whl.metadata (4.9 kB)
Downloading osmnx-2.0.1-py3-none-any.whl (99 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.6/99.6 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: osmnx
Successfully installed osmnx-2.0.1


In [None]:
import osmnx as ox
import networkx as nx
import folium

## On définit les points de départ et d'arrivée

In [None]:
# Définir les lieux de départ et d’arrivée
start_location = "Tour Eiffel, Paris, France"
end_location = "Louvre, Paris, France"

# Convertir les adresses en coordonnées GPS
start_lat, start_lon = ox.geocode(start_location)
end_lat, end_lon = ox.geocode(end_location)

# Afficher les coordonnées obtenues
print(f"Départ : {start_location} → {start_lat}, {start_lon}")
print(f"Arrivée : {end_location} → {end_lat}, {end_lon}")

Départ : Tour Eiffel, Paris, France → 48.8582599, 2.2945006358633115
Arrivée : Louvre, Paris, France → 48.8611473, 2.33802768704666


## On télécharge la carte (de Paris dans notre test)

In [None]:
# Télécharger le graphe routier de Paris
city_graph = ox.graph_from_place("Paris, France", network_type="drive")

# Vérifier la structure du graphe
print(f"Nombre de nœuds : {len(city_graph.nodes)}")
print(f"Nombre d’arêtes : {len(city_graph.edges)}")

Nombre de nœuds : 9503
Nombre d’arêtes : 18304


## On convertit nos localisation en noeuds et en coordonnées pour pouvoir calculer le plus court chemin.

In [None]:
# Trouver les nœuds les plus proches dans le graphe
orig_node = ox.nearest_nodes(city_graph, start_lon, start_lat)
dest_node = ox.nearest_nodes(city_graph, end_lon, end_lat)

# Vérifier les nœuds trouvés
print(f"Nœud de départ : {orig_node}")
print(f"Nœud d’arrivée : {dest_node}")

Nœud de départ : 6900473051
Nœud d’arrivée : 693583565


## A l'aide de l'algorithme de Djikstra, on détermine le plus court chemin entre nos deux noeuds

In [None]:
# Calculer le chemin le plus court en distance
route = nx.shortest_path(city_graph, orig_node, dest_node, weight="length")

# Vérifier la longueur du trajet en mètres
route_length = nx.shortest_path_length(city_graph, orig_node, dest_node, weight="length")
print(f"Longueur du trajet : {route_length:.2f} mètres")

Longueur du trajet : 4438.03 mètres


## Enfin, on visualise notre itinéraire sur la carte

In [None]:
# Extraire les coordonnées de l’itinéraire
route_coords = [(city_graph.nodes[node]['y'], city_graph.nodes[node]['x']) for node in route]

# Créer une carte centrée sur le départ
m = folium.Map(location=[start_lat, start_lon], zoom_start=14)

# Tracer l’itinéraire en bleu
folium.PolyLine(route_coords, color="blue", weight=5, opacity=0.7).add_to(m)

# Ajouter des marqueurs pour les points de départ et d’arrivée
folium.Marker([start_lat, start_lon], popup="Départ", icon=folium.Icon(color="green")).add_to(m)
folium.Marker([end_lat, end_lon], popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

# Afficher la carte
m

## A présent, nous allons essayer d'ajouter des contraintes et des critères d'optimisation.

In [None]:
!pip install requests



In [None]:
import requests

## Au lieu d'optimiser par la distance, nous allons essayer d'optimiser selon la vitesse moyenne des routes.

In [None]:
# Fonction pour récupérer la vitesse moyenne d'une route
def get_speed(u, v, graph):
    speed = graph[u][v][0].get("maxspeed", None)  # Récupérer la vitesse max

    # Vérifier si la vitesse est une liste
    if isinstance(speed, list):
        speed = speed[0]  # Prendre la première valeur

    # Vérifier si la vitesse est un nombre
    if isinstance(speed, (int, float)):
        return speed  # Retourner directement si c'est un nombre

    # Vérifier si la vitesse est une chaîne de caractères
    if isinstance(speed, str):
        if speed.isdigit():  # Vérifier si la chaîne est un nombre
            return int(speed)
        elif "walk" in speed.lower():  # Cas des routes piétonnes
            return 5  # Vitesse piétonne en km/h

    return 50  # Vitesse par défaut (50 km/h)

# Ajouter le poids 'travel_time' aux routes du graphe
for u, v, data in city_graph.edges(data=True):
    length = data["length"]  # Longueur de la route en mètres
    speed = get_speed(u, v, city_graph) / 3.6  # Conversion km/h → m/s
    data["travel_time"] = length / speed  # Temps de trajet en secondes

# Calcul du chemin le plus rapide
route_time = nx.shortest_path(city_graph, orig_node, dest_node, weight="travel_time")
time_length = nx.shortest_path_length(city_graph, orig_node, dest_node, weight="travel_time")

print(f"Temps de trajet estimé : {time_length:.2f} secondes")

Temps de trajet estimé : 532.56 secondes


## Nous allons maintenant récupérer les données météos grâce à l'API de OpenWeather

In [None]:
# Remplace par ta clé API OpenWeatherMap
API_KEY = "d1899c1183faa2cae0f15319a5f35d75"

# URL de l'API météo avec latitude et longitude
weather_url = f"http://api.openweathermap.org/data/2.5/weather?lat={start_lat}&lon={start_lon}&appid={API_KEY}&units=metric"

# Envoyer la requête et récupérer la réponse
response = requests.get(weather_url)

# Vérifier si la requête a réussi (code 200)
if response.status_code == 200:
    weather_data = response.json()  # Convertir la réponse en JSON

    # Vérifier la structure du JSON reçu
    print("Réponse API OpenWeatherMap :")
    print(weather_data)

    # Vérifier si 'weather' et 'wind' existent
    if "weather" in weather_data and "wind" in weather_data:
        weather = weather_data["weather"][0]["description"]
        wind_speed = weather_data["wind"].get("speed", 0)  # 0 si non disponible

        print(f"Météo actuelle : {weather}")
        print(f"Vitesse du vent : {wind_speed} m/s")
    else:
        print("❌ Erreur : Clés 'weather' ou 'wind' manquantes dans la réponse de l'API.")
else:
    print(f"❌ Erreur API : Code {response.status_code} - {response.text}")

Réponse API OpenWeatherMap :
{'coord': {'lon': 2.2945, 'lat': 48.8583}, 'weather': [{'id': 804, 'main': 'Clouds', 'description': 'overcast clouds', 'icon': '04d'}], 'base': 'stations', 'main': {'temp': 5.09, 'feels_like': 0.94, 'temp_min': 3.86, 'temp_max': 5.67, 'pressure': 1017, 'humidity': 80, 'sea_level': 1017, 'grnd_level': 1006}, 'visibility': 10000, 'wind': {'speed': 6.17, 'deg': 80}, 'clouds': {'all': 100}, 'dt': 1738936793, 'sys': {'type': 2, 'id': 2012208, 'country': 'FR', 'sunrise': 1738912335, 'sunset': 1738947465}, 'timezone': 3600, 'id': 6545270, 'name': 'Palais-Royal', 'cod': 200}
Météo actuelle : overcast clouds
Vitesse du vent : 6.17 m/s


## Nous allons maintenant ajuster le trajet en fonction de a météo.

In [None]:
# Modifier le temps de trajet en fonction de la météo
weather_penalty = 1.0
if "rain" in weather or "snow" in weather:
    weather_penalty = 1.2  # Temps augmenté de 20% en cas de pluie/neige
if wind_speed > 10:
    weather_penalty = 1.3  # Temps augmenté de 30% si vent fort

# Recalculer le poids des routes avec la pénalité météo
for u, v, data in city_graph.edges(data=True):
    data["adjusted_time"] = data["travel_time"] * weather_penalty

# Calculer le nouvel itinéraire
route_adjusted = nx.shortest_path(city_graph, orig_node, dest_node, weight="adjusted_time")
adjusted_time_length = nx.shortest_path_length(city_graph, orig_node, dest_node, weight="adjusted_time")

print(f"Nouveau temps de trajet ajusté : {adjusted_time_length:.2f} secondes")


Nouveau temps de trajet ajusté : 532.56 secondes


## Puis nous l'affichons

In [None]:
# Extraire les coordonnées des itinéraires
route_time_coords = [(city_graph.nodes[node]['y'], city_graph.nodes[node]['x']) for node in route_time]
route_adjusted_coords = [(city_graph.nodes[node]['y'], city_graph.nodes[node]['x']) for node in route_adjusted]

# Créer une carte centrée sur le départ
m = folium.Map(location=[start_lat, start_lon], zoom_start=14)

# Tracer l’itinéraire sans ajustement météo (en bleu)
folium.PolyLine(route_time_coords, color="blue", weight=5, opacity=0.7, tooltip="Trajet sans météo").add_to(m)

# Tracer l’itinéraire ajusté (en rouge)
folium.PolyLine(route_adjusted_coords, color="red", weight=5, opacity=0.7, tooltip="Trajet ajusté météo").add_to(m)

# Ajouter des marqueurs pour les points de départ et d’arrivée
folium.Marker([start_lat, start_lon], popup="Départ", icon=folium.Icon(color="green")).add_to(m)
folium.Marker([end_lat, end_lon], popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

# Afficher la carte
m

## Maintenant nous allons créer une classe véhicule, qui permettra de modéliser chaque voiture. Cela passera par sa baterie, ses itinéraries &c.

In [None]:
import random

class Vehicle:
    def __init__(self, start_location, end_location, battery_capacity=100, consumption_rate=0.2):
        """
        Initialise un véhicule autonome.

        - start_location : Adresse de départ
        - end_location : Adresse d’arrivée
        - battery_capacity : Capacité totale de la batterie (en kWh)
        - consumption_rate : Consommation d’énergie par kilomètre
        """
        self.start_location = start_location
        self.end_location = end_location
        self.battery_capacity = battery_capacity
        self.current_battery = battery_capacity
        self.consumption_rate = consumption_rate
        self.route = []  # Itinéraire calculé
        self.total_distance = 0  # Distance totale à parcourir
        self.needs_recharge = False

    def calculate_energy_needed(self, distance_km):
        """Calcule l'énergie requise pour parcourir une distance donnée"""
        return distance_km * self.consumption_rate

    def check_battery(self):
        """Vérifie si le véhicule a assez d’énergie pour arriver à destination"""
        required_energy = self.calculate_energy_needed(self.total_distance)
        if self.current_battery < required_energy:
            self.needs_recharge = True
            print(f"⚠️ Le véhicule n'a pas assez d’énergie ! Recharge nécessaire.")
        else:
            print(f"✅ Batterie OK. Énergie disponible : {self.current_battery:.2f} kWh")

    def recharge(self, station_energy=50):
        """Recharge le véhicule à une station"""
        self.current_battery = min(self.current_battery + station_energy, self.battery_capacity)
        self.needs_recharge = False
        print(f"🔋 Recharge terminée ! Batterie actuelle : {self.current_battery:.2f} kWh")



## Nous allons maintenant trouver les stations de recherges qui seront des arrêts obligatoires dans le cas où la batterie est insuffisante pour le trajet.

In [None]:
def find_nearest_charging_station(graph, vehicle, max_distance=5000):
    """
    Recherche une station de recharge à proximité de l'itinéraire.

    - graph : Graphe routier OSMnx
    - vehicle : Objet Vehicle
    - max_distance : Distance maximale pour chercher une station

    Retourne : Le nœud le plus proche d’une station de recharge.
    """
    # Générer des coordonnées GPS aléatoires pour simuler des stations (dans un rayon donné)
    charging_stations = [
        (vehicle.start_location[0] + random.uniform(-0.02, 0.02), vehicle.start_location[1] + random.uniform(-0.02, 0.02)),
        (vehicle.end_location[0] + random.uniform(-0.02, 0.02), vehicle.end_location[1] + random.uniform(-0.02, 0.02))
    ]

    # Convertir en nœuds du graphe
    station_nodes = [ox.nearest_nodes(graph, lon, lat) for lat, lon in charging_stations]

    # Sélectionner la station la plus proche du véhicule
    return station_nodes[0]


In [None]:
def plan_route_with_energy(graph, vehicle):
    """
    Calcule l'itinéraire en tenant compte de l'autonomie du véhicule.

    - graph : Graphe routier OSMnx
    - vehicle : Objet Vehicle
    """
    orig_node = ox.nearest_nodes(graph, vehicle.start_location[1], vehicle.start_location[0])
    dest_node = ox.nearest_nodes(graph, vehicle.end_location[1], vehicle.end_location[0])

    # Trouver la distance du plus court chemin
    vehicle.route = nx.shortest_path(graph, orig_node, dest_node, weight="length")
    vehicle.total_distance = nx.shortest_path_length(graph, orig_node, dest_node, weight="length") / 1000  # Convertir en km

    # Vérifier si une recharge est nécessaire
    vehicle.check_battery()

    if vehicle.needs_recharge:
        print("🔍 Recherche d'une station de recharge...")
        station_node = find_nearest_charging_station(graph, vehicle)

        # Ajouter l’étape intermédiaire (station de recharge)
        route_to_station = nx.shortest_path(graph, orig_node, station_node, weight="length")
        route_to_destination = nx.shortest_path(graph, station_node, dest_node, weight="length")

        # Fusionner les itinéraires
        vehicle.route = route_to_station + route_to_destination[1:]
        print("📌 Itinéraire mis à jour avec une station de recharge.")


In [None]:
def display_route_with_recharge(graph, vehicle):
    """
    Affiche l'itinéraire avec station de recharge sur une carte Folium.

    - graph : Graphe routier OSMnx
    - vehicle : Objet Vehicle
    """
    route_coords = [(graph.nodes[node]['y'], graph.nodes[node]['x']) for node in vehicle.route]

    # Créer une carte
    m = folium.Map(location=[vehicle.start_location[0], vehicle.start_location[1]], zoom_start=14)

    # Tracer l'itinéraire
    folium.PolyLine(route_coords, color="red", weight=5, opacity=0.7, tooltip="Itinéraire avec recharge").add_to(m)

    # Ajouter des marqueurs
    folium.Marker(vehicle.start_location, popup="Départ", icon=folium.Icon(color="green")).add_to(m)
    folium.Marker(vehicle.end_location, popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

    # Afficher la carte
    return m


## Nous allons maintenant faire une simulation où le véhicule a assez de batterie

In [None]:
# Initialiser le véhicule
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=50,  # Capacité réduite pour forcer la recharge
    consumption_rate=0.3   # Consommation plus élevée
)

# Planifier l’itinéraire en tenant compte de l'autonomie
plan_route_with_energy(city_graph, vehicle)

# Afficher la carte avec l’itinéraire
display_route_with_recharge(city_graph, vehicle)

✅ Batterie OK. Énergie disponible : 50.00 kWh


## Puis maintenant, voici une simulation où le véhicule n'a pas assez de batterie

In [None]:
# Initialiser un véhicule avec faible autonomie
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=1,  # Capacité très réduite
    consumption_rate=0.5  # Consommation élevée
)

# Calcul du premier itinéraire (sans modification)
orig_node = ox.nearest_nodes(city_graph, vehicle.start_location[1], vehicle.start_location[0])
dest_node = ox.nearest_nodes(city_graph, vehicle.end_location[1], vehicle.end_location[0])

initial_route = nx.shortest_path(city_graph, orig_node, dest_node, weight="length")
vehicle.total_distance = nx.shortest_path_length(city_graph, orig_node, dest_node, weight="length") / 1000  # en km

# Vérifier si une recharge est nécessaire pour cet itinéraire
vehicle.check_battery()

⚠️ Le véhicule n'a pas assez d’énergie ! Recharge nécessaire.


In [None]:
# Si le véhicule a besoin de se recharger, ajouter une station au trajet
if vehicle.needs_recharge:
    print("🔍 Recherche d'une station de recharge...")
    station_node = find_nearest_charging_station(city_graph, vehicle)

    # Calculer les segments du trajet
    route_to_station = nx.shortest_path(city_graph, orig_node, station_node, weight="length")
    route_to_destination = nx.shortest_path(city_graph, station_node, dest_node, weight="length")

    # Fusionner les segments pour obtenir le trajet complet
    modified_route = route_to_station + route_to_destination[1:]

    # Mettre à jour les distances pour le véhicule
    distance_to_station = nx.shortest_path_length(city_graph, orig_node, station_node, weight="length") / 1000
    distance_to_destination = nx.shortest_path_length(city_graph, station_node, dest_node, weight="length") / 1000
    vehicle.total_distance = distance_to_station + distance_to_destination

    print(f"📌 Distance totale recalculée (avec recharge) : {vehicle.total_distance:.2f} km")


🔍 Recherche d'une station de recharge...
📌 Distance totale recalculée (avec recharge) : 5.80 km


In [None]:
# Extraire les coordonnées des itinéraires
initial_coords = [(city_graph.nodes[node]['y'], city_graph.nodes[node]['x']) for node in initial_route]
if vehicle.needs_recharge:
    modified_coords = [(city_graph.nodes[node]['y'], city_graph.nodes[node]['x']) for node in modified_route]

# Créer une carte centrée sur le départ
m = folium.Map(location=[vehicle.start_location[0], vehicle.start_location[1]], zoom_start=14)

# Tracer l'itinéraire initial (en bleu)
folium.PolyLine(initial_coords, color="blue", weight=5, opacity=0.7, tooltip="Itinéraire initial").add_to(m)

# Tracer l'itinéraire modifié (en rouge), s'il y a une recharge
if vehicle.needs_recharge:
    folium.PolyLine(modified_coords, color="red", weight=5, opacity=0.7, tooltip="Itinéraire modifié (avec recharge)").add_to(m)

# Ajouter des marqueurs pour le départ et l’arrivée
folium.Marker(vehicle.start_location, popup="Départ", icon=folium.Icon(color="green")).add_to(m)
folium.Marker(vehicle.end_location, popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

# Ajouter un marqueur pour la station de recharge
if vehicle.needs_recharge:
    station_coords = (city_graph.nodes[station_node]['y'], city_graph.nodes[station_node]['x'])
    folium.Marker(station_coords, popup="Station de recharge", icon=folium.Icon(color="orange")).add_to(m)

# Afficher la carte
m


## L'étape suivante consiste à faire communiquer les véhicules entre eux, en les connectant tous à un serveur central.

In [None]:
import socket
import json
import threading

# Dictionnaire pour stocker les informations sur la fréquentation des routes
traffic_data = {}

# Flag pour arrêter le serveur
stop_server = False

# Fonction pour gérer les connexions des véhicules
def handle_vehicle_connection(client_socket):
    global traffic_data
    try:
        # Recevoir les données du véhicule
        data = client_socket.recv(1024).decode()
        vehicle_info = json.loads(data)
        print(f"🚗 Véhicule connecté : {vehicle_info}")

        # Mettre à jour la fréquentation des routes
        route = vehicle_info["route"]
        for segment in route:
            traffic_data[segment] = traffic_data.get(segment, 0) + 1  # Incrémenter la fréquentation

        # Envoyer les données mises à jour au véhicule
        client_socket.send(json.dumps({"traffic_data": traffic_data}).encode())
    except Exception as e:
        print(f"⚠️ Erreur avec un véhicule : {e}")
    finally:
        client_socket.close()

# Fonction pour démarrer le serveur avec arrêt propre
def start_server(host="127.0.0.1", port=5000):
    global stop_server
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # Réutiliser l'adresse
    server.bind((host, port))
    server.listen(5)
    print(f"📡 Serveur lancé sur {host}:{port}")

    while not stop_server:
        try:
            server.settimeout(1)  # Timeout pour vérifier le flag d'arrêt
            client_socket, addr = server.accept()
            print(f"Nouvelle connexion depuis {addr}")
            thread = threading.Thread(target=handle_vehicle_connection, args=(client_socket,))
            thread.start()
        except socket.timeout:
            continue

    server.close()
    print("🛑 Serveur arrêté.")

# Fonction pour demander l'arrêt du serveur
def stop_server_command():
    global stop_server
    stop_server = True
    print("🔌 Arrêt du serveur demandé.")

In [None]:
import threading

# Fonction pour démarrer le serveur en arrière-plan
def start_server_in_thread(host="127.0.0.1", port=5000):
    server_thread = threading.Thread(target=start_server, args=(host, port), daemon=True)
    server_thread.start()
    print("📡 Serveur lancé en arrière-plan.")

In [None]:
# !lsof -i :5000

In [None]:
# !kill -9 7601

In [None]:
start_server_in_thread()

📡 Serveur lancé en arrière-plan.
📡 Serveur lancé sur 127.0.0.1:5000


## Essayons d'ajouter un premier véhicule

In [None]:
def communicate_with_server(vehicle, host="127.0.0.1", port=5000):
    """
    Simule la communication entre un véhicule et le serveur central.

    - vehicle : Objet Vehicle
    - host : Adresse IP du serveur
    - port : Port du serveur
    """
    # Préparer les données du véhicule
    vehicle_data = {
        "id": id(vehicle),
        "route": vehicle.route
    }

    try:
        # Se connecter au serveur
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
            client_socket.connect((host, port))
            client_socket.send(json.dumps(vehicle_data).encode())

            # Recevoir les données de trafic mises à jour
            response = client_socket.recv(1024).decode()
            traffic_info = json.loads(response)
            print(f"📊 Données reçues du serveur : {traffic_info['traffic_data']}")
    except Exception as e:
        print(f"⚠️ Erreur de communication avec le serveur : {e}")

In [None]:
communicate_with_server(vehicle)

Nouvelle connexion depuis ('127.0.0.1', 35532)
🚗 Véhicule connecté : {'id': 136605897957904, 'route': []}
📊 Données reçues du serveur : {}


## Puis plusieurs :

In [None]:
# Simuler plusieurs véhicules
vehicles = [
    Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=50, consumption_rate=0.3),
    Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=60, consumption_rate=0.4),
    Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=70, consumption_rate=0.2)
]

# Planifier leurs itinéraires
for vehicle in vehicles:
    plan_route_with_energy(city_graph, vehicle)
    communicate_with_server(vehicle)

✅ Batterie OK. Énergie disponible : 50.00 kWh
Nouvelle connexion depuis ('127.0.0.1', 35540)
🚗 Véhicule connecté : {'id': 136605775845200, 'route': [6900473051, 21378260, 204834024, 160018434, 21378257, 21378258, 24909781, 24909788, 6449290646, 6448064269, 470184, 34845781, 34845766, 1179777317, 368273, 205094615, 470185, 25203153, 25203147, 470192, 25203143, 25203140, 470166, 25199639, 470165, 470164, 470162, 25194419, 470161, 1756960219, 25192856, 25193316, 25193321, 25191346, 470157, 160034963, 227211630, 227214316, 368195, 368205, 373732187, 25273197, 11247130726, 25273380, 15643099, 25554239, 25554240, 25554265, 25554267, 15643100, 693583565]}
📊 Données reçues du serveur : {'6900473051': 1, '21378260': 1, '204834024': 1, '160018434': 1, '21378257': 1, '21378258': 1, '24909781': 1, '24909788': 1, '6449290646': 1, '6448064269': 1, '470184': 1, '34845781': 1, '34845766': 1, '1179777317': 1, '368273': 1, '205094615': 1, '470185': 1, '25203153': 1, '25203147': 1, '470192': 1, '25203143

## Ajustons les itinéraires en conséquance :

In [None]:
def adjust_route_based_on_traffic(graph, vehicle, traffic_data):
    """
    Recalcule l'itinéraire d'un véhicule en fonction des données de trafic.

    - graph : Graphe routier OSMnx
    - vehicle : Objet Vehicle
    - traffic_data : Données de trafic reçues du serveur
    """
    orig_node = ox.nearest_nodes(graph, vehicle.start_location[1], vehicle.start_location[0])
    dest_node = ox.nearest_nodes(graph, vehicle.end_location[1], vehicle.end_location[0])

    # Ajouter une pénalité pour les routes congestionnées
    for u, v, data in graph.edges(data=True):
        congestion = traffic_data.get((u, v), 0)  # Obtenir le niveau de congestion
        data["adjusted_time"] = data["travel_time"] * (1 + 0.1 * congestion)  # Pénalité

    # Recalculer l'itinéraire
    vehicle.route = nx.shortest_path(graph, orig_node, dest_node, weight="adjusted_time")
    print(f"🔄 Nouvel itinéraire pour le véhicule {id(vehicle)} calculé.")


## Puis nous allons essayer de les visualiser

In [None]:
import folium
from IPython.display import IFrame

def create_map(graph, route_coords, color, tooltip, output_file):
    """
    Crée une carte Folium pour un itinéraire donné.

    - graph : Graphe routier OSMnx
    - route_coords : Coordonnées de l'itinéraire (liste de tuples)
    - color : Couleur de la ligne représentant l'itinéraire
    - tooltip : Texte affiché au survol de la ligne
    - output_file : Nom du fichier de sortie HTML pour la carte
    """
    # Centrer la carte sur le premier point de l'itinéraire
    m = folium.Map(location=route_coords[0], zoom_start=14)

    # Tracer l'itinéraire
    folium.PolyLine(route_coords, color=color, weight=5, opacity=0.7, tooltip=tooltip).add_to(m)

    # Ajouter des marqueurs pour le départ et l'arrivée
    folium.Marker(route_coords[0], popup="Départ", icon=folium.Icon(color="green")).add_to(m)
    folium.Marker(route_coords[-1], popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

    # Enregistrer la carte dans un fichier HTML
    m.save(output_file)
    print(f"📍 Carte sauvegardée dans {output_file}")


In [None]:
# Créer une carte centrée sur le point de départ
m = folium.Map(location=[initial_coords[0][0], initial_coords[0][1]], zoom_start=14)

# Tracer l'itinéraire initial en bleu
folium.PolyLine(initial_coords, color="blue", weight=5, opacity=0.7, tooltip="Itinéraire initial").add_to(m)

# Tracer l'itinéraire modifié en rouge
folium.PolyLine(modified_coords, color="red", weight=5, opacity=0.7, tooltip="Itinéraire modifié (avec recharge)").add_to(m)

# Ajouter des marqueurs pour le départ et l'arrivée
folium.Marker(initial_coords[0], popup="Départ", icon=folium.Icon(color="green")).add_to(m)
folium.Marker(initial_coords[-1], popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

# Afficher la carte
m

## L'étape suivant consiste à implémenter les bornes de recharges pour végicules électriques.

In [None]:
import random

class ChargingStation:
    def __init__(self, lat, lon):
        """
        Initialise une borne de recharge avec :
        - lat, lon : Coordonnées GPS
        - prix au kWh variable
        - temps d’attente avant recharge
        - puissance de charge (kW)
        """
        self.lat = lat
        self.lon = lon
        self.price_per_kWh = round(random.uniform(0.20, 0.50), 2)  # Prix aléatoire entre 0.20 et 0.50 €/kWh
        self.wait_time = random.randint(0, 10)  # Temps d'attente aléatoire entre 0 et 10 minutes
        self.power = random.choice([50, 100, 150])  # Puissance de charge (en kW)

    def __repr__(self):
        return f"Borne ({self.lat}, {self.lon}) - {self.price_per_kWh}€/kWh, {self.wait_time} min attente, {self.power} kW"


In [None]:
def generate_charging_stations(num_stations, start_lat, start_lon, end_lat, end_lon):
    """
    Génère un certain nombre de bornes de recharge dans une zone proche de l'itinéraire.

    - num_stations : Nombre de bornes à générer
    - start_lat, start_lon : Coordonnées du départ
    - end_lat, end_lon : Coordonnées de l’arrivée
    """
    stations = []
    for _ in range(num_stations):
        lat = random.uniform(min(start_lat, end_lat) - 0.02, max(start_lat, end_lat) + 0.02)
        lon = random.uniform(min(start_lon, end_lon) - 0.02, max(start_lon, end_lon) + 0.02)
        stations.append(ChargingStation(lat, lon))
    return stations

# Exemple de génération
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)

# Afficher les bornes générées
for station in charging_stations:
    print(station)


Borne (48.850973431296715, 2.3043580539021695) - 0.44€/kWh, 6 min attente, 150 kW
Borne (48.845533812313164, 2.280374123084059) - 0.43€/kWh, 10 min attente, 150 kW
Borne (48.8575846583561, 2.2811473021386357) - 0.31€/kWh, 0 min attente, 100 kW
Borne (48.8624638821282, 2.3435249307272263) - 0.43€/kWh, 6 min attente, 50 kW
Borne (48.86323056666207, 2.343829706366982) - 0.25€/kWh, 0 min attente, 150 kW


In [None]:
import folium

def display_charging_stations_map(stations, start_coords, end_coords):
    """
    Affiche une carte avec les bornes de recharge.
    - stations : Liste des objets ChargingStation
    - start_coords, end_coords : Coordonnées du départ et de l’arrivée
    """
    m = folium.Map(location=start_coords, zoom_start=14)

    # Ajouter le départ et l’arrivée
    folium.Marker(start_coords, popup="Départ", icon=folium.Icon(color="green")).add_to(m)
    folium.Marker(end_coords, popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

    # Ajouter les bornes de recharge
    for station in stations:
        popup_text = f"💰 {station.price_per_kWh}€/kWh | ⏳ {station.wait_time} min | ⚡ {station.power} kW"
        folium.Marker([station.lat, station.lon], popup=popup_text, icon=folium.Icon(color="blue")).add_to(m)

    return m

# Afficher la carte avec les bornes
display_charging_stations_map(charging_stations, (start_lat, start_lon), (end_lat, end_lon))


## Nous allons maintenant tenter de faire communiquer les véhicules avec les bornes de recharges que nous avons implémenté.

In [None]:
import time

class ChargingStation:
    def __init__(self, lat, lon):
        """
        Initialise une borne de recharge avec des paramètres :
        - lat, lon : Coordonnées GPS
        - prix au kWh variable
        - temps d’attente
        - puissance de charge (kW)
        """
        self.lat = lat
        self.lon = lon
        self.price_per_kWh = round(random.uniform(0.20, 0.50), 2)  # Prix entre 0.20 et 0.50 €/kWh
        self.wait_time = random.randint(0, 10)  # Attente entre 0 et 10 min
        self.power = random.choice([50, 100, 150])  # Puissance en kW

    def negotiate_price(self, vehicle_offer):
        """
        Simulation de négociation entre la borne et un véhicule.
        - vehicle_offer : Prix max que le véhicule est prêt à payer
        """
        start_time = time.time()  # Début de la négociation

        # La borne propose un prix de départ
        station_price = self.price_per_kWh

        # Négociation en boucle (max 2 sec)
        while time.time() - start_time < 2:
            if station_price <= vehicle_offer:
                return station_price  # Accord sur le prix proposé par la borne
            else:
                # Ajustement du prix si l'offre du véhicule est proche
                if abs(station_price - vehicle_offer) < 0.05:
                    return vehicle_offer  # La borne accepte l'offre

                # La borne réduit un peu son prix (stratégie simple)
                station_price -= 0.01

        return None  # Échec de la négociation

    def __repr__(self):
        return f"Borne ({self.lat}, {self.lon}) - {self.price_per_kWh}€/kWh, {self.wait_time} min attente, {self.power} kW"


In [None]:
class Vehicle:
    def __init__(self, start_location, end_location, battery_capacity=100, consumption_rate=0.2):
        """
        Initialise un véhicule autonome.
        - start_location, end_location : Coordonnées de départ et d’arrivée
        - battery_capacity : Capacité batterie en kWh
        - consumption_rate : Consommation en kWh/km
        """
        self.start_location = start_location
        self.end_location = end_location
        self.battery_capacity = battery_capacity
        self.current_battery = battery_capacity
        self.consumption_rate = consumption_rate
        self.route = []
        self.total_distance = 0
        self.needs_recharge = False

    def calculate_energy_needed(self, distance_km):
        """Calcule l’énergie nécessaire pour un trajet"""
        return distance_km * self.consumption_rate

    def check_battery(self, distance_km):
        """Vérifie si le véhicule a assez d’énergie"""
        required_energy = self.calculate_energy_needed(distance_km)
        if self.current_battery < required_energy:
            self.needs_recharge = True
            print(f"⚠️ Recharge nécessaire !")
        else:
            print(f"✅ Batterie OK. Énergie disponible : {self.current_battery:.2f} kWh")

    def find_best_station(self, stations, max_price):
        """
        Cherche la meilleure borne en fonction du prix et de l’attente.
        - stations : Liste des bornes de recharge
        - max_price : Prix max que le véhicule accepte de payer
        """
        best_station = None
        best_price = float('inf')

        for station in stations:
            print(f"💬 Négociation avec la borne à {station.lat}, {station.lon}...")
            agreed_price = station.negotiate_price(max_price)

            if agreed_price is not None and agreed_price < best_price:
                best_price = agreed_price
                best_station = station

        if best_station:
            print(f"✅ Recharge acceptée à {best_station.lat}, {best_station.lon} pour {best_price}€/kWh")
            return best_station
        else:
            print("❌ Aucune borne trouvée à un prix acceptable.")
            return None


In [None]:
# Générer des bornes de recharge
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)

# Créer un véhicule avec une batterie faible
vehicle = Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=30, consumption_rate=0.3)

# Vérifier la batterie et rechercher une station
vehicle.check_battery(50)  # Simule un trajet de 50 km

# Si recharge nécessaire, négocier avec les stations
if vehicle.needs_recharge:
    best_station = vehicle.find_best_station(charging_stations, max_price=0.35)  # Prix max accepté : 0.35€/kWh

✅ Batterie OK. Énergie disponible : 30.00 kWh


In [None]:
def display_selected_station_map(stations, best_station, start_coords, end_coords):
    """
    Affiche une carte avec toutes les bornes et la station sélectionnée en violet.
    """
    m = folium.Map(location=start_coords, zoom_start=14)

    # Ajouter le départ et l’arrivée
    folium.Marker(start_coords, popup="Départ", icon=folium.Icon(color="green")).add_to(m)
    folium.Marker(end_coords, popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)

    # Ajouter toutes les bornes
    for station in stations:
        color = "blue" if station != best_station else "purple"
        popup_text = f"💰 {station.price_per_kWh}€/kWh | ⏳ {station.wait_time} min | ⚡ {station.power} kW"
        folium.Marker([station.lat, station.lon], popup=popup_text, icon=folium.Icon(color=color)).add_to(m)

    return m

# Afficher la carte avec la borne sélectionnée
if vehicle.needs_recharge and best_station:
    display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))

## Nous pouvons réaliser un test où tout se passe bien

In [None]:
# --- Test de la négociation véhicule-borne ---

# Affiche un séparateur pour la lisibilité
print("=== Simulation de négociation pour recharge ===\n")

# 1. Générer plusieurs bornes de recharge autour du trajet
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)
print("Bornes générées :")
for station in charging_stations:
    print(station)

# 2. Créer un véhicule avec une batterie insuffisante pour un trajet simulé de 50 km
vehicle = Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=30, consumption_rate=0.3)
trip_distance = 50  # distance simulée en km
vehicle.check_battery(trip_distance)  # Affiche si recharge est nécessaire

# 3. Si recharge nécessaire, tenter de négocier avec les bornes
if vehicle.needs_recharge:
    # Le véhicule fixe un prix maximum qu'il est prêt à payer (par exemple 0.35 €/kWh)
    best_station = vehicle.find_best_station(charging_stations, max_price=0.35)

    if best_station:
        print("\nLa meilleure borne sélectionnée est :")
        print(best_station)

        # 4. Afficher la carte avec toutes les bornes et mettre en évidence la borne sélectionnée (en violet)
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affiche la carte interactif dans Colab
    else:
        print("\nAucune borne n'a proposé une offre acceptable.")
else:
    print("\nLe véhicule dispose d'une autonomie suffisante, aucune recharge n'est nécessaire.")

=== Simulation de négociation pour recharge ===

Bornes générées :
Borne (48.84694248190679, 2.289832776317515) - 0.43€/kWh, 10 min attente, 150 kW
Borne (48.862343592614444, 2.3277923920239223) - 0.44€/kWh, 6 min attente, 150 kW
Borne (48.85863432473415, 2.3429764353613085) - 0.47€/kWh, 3 min attente, 150 kW
Borne (48.84993135823379, 2.2954917725343496) - 0.49€/kWh, 4 min attente, 150 kW
Borne (48.87696612908758, 2.3171547320485284) - 0.41€/kWh, 4 min attente, 150 kW
✅ Batterie OK. Énergie disponible : 30.00 kWh

Le véhicule dispose d'une autonomie suffisante, aucune recharge n'est nécessaire.


## Puis un deuxième où notre véhicule va négocier avec les bornes car il n'a pas assez d'autonomie.

In [None]:
# --- Test de la négociation véhicule-borne ---

# Affiche un séparateur pour la lisibilité
print("=== Simulation de négociation pour recharge ===\n")

# 1. Générer plusieurs bornes de recharge autour du trajet
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)
print("Bornes générées :")
for station in charging_stations:
    print(station)

# 2. Créer un véhicule avec une batterie insuffisante pour un trajet simulé de 50 km
vehicle = Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=30, consumption_rate=0.3)
trip_distance = 500  # distance simulée en km
vehicle.check_battery(trip_distance)  # Affiche si recharge est nécessaire

# 3. Si recharge nécessaire, tenter de négocier avec les bornes
if vehicle.needs_recharge:
    # Le véhicule fixe un prix maximum qu'il est prêt à payer (par exemple 0.35 €/kWh)
    best_station = vehicle.find_best_station(charging_stations, max_price=0.35)

    if best_station:
        print("\nLa meilleure borne sélectionnée est :")
        print(best_station)

        # 4. Afficher la carte avec toutes les bornes et mettre en évidence la borne sélectionnée (en violet)
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affiche la carte interactif dans Colab
    else:
        print("\nAucune borne n'a proposé une offre acceptable.")
else:
    print("\nLe véhicule dispose d'une autonomie suffisante, aucune recharge n'est nécessaire.")

=== Simulation de négociation pour recharge ===

Bornes générées :
Borne (48.85470380711807, 2.327640629841134) - 0.23€/kWh, 2 min attente, 100 kW
Borne (48.87400373280056, 2.3046923144567284) - 0.48€/kWh, 7 min attente, 50 kW
Borne (48.87017779055223, 2.300534489701432) - 0.44€/kWh, 5 min attente, 100 kW
Borne (48.87881039664539, 2.334283642993942) - 0.25€/kWh, 10 min attente, 100 kW
Borne (48.87212804226479, 2.3519994538832276) - 0.35€/kWh, 2 min attente, 50 kW
⚠️ Recharge nécessaire !
💬 Négociation avec la borne à 48.85470380711807, 2.327640629841134...
💬 Négociation avec la borne à 48.87400373280056, 2.3046923144567284...
💬 Négociation avec la borne à 48.87017779055223, 2.300534489701432...
💬 Négociation avec la borne à 48.87881039664539, 2.334283642993942...
💬 Négociation avec la borne à 48.87212804226479, 2.3519994538832276...
✅ Recharge acceptée à 48.85470380711807, 2.327640629841134 pour 0.23€/kWh

La meilleure borne sélectionnée est :
Borne (48.85470380711807, 2.32764062984113

## Nous pouvons maintenant essaye d'améliorer les stratégies de négociation

In [None]:
class ChargingStation:
    def __init__(self, lat, lon):
        """
        Initialise une borne de recharge.
        - lat, lon : Coordonnées GPS
        - prix au kWh
        - temps d’attente
        - puissance de charge
        """
        self.lat = lat
        self.lon = lon
        self.price_per_kWh = round(random.uniform(0.20, 0.50), 2)  # Prix entre 0.20 et 0.50 €/kWh
        self.wait_time = random.randint(0, 10)  # Attente entre 0 et 10 min
        self.power = random.choice([50, 100, 150])  # Puissance de charge

    def negotiate_price(self, vehicle_offer, strategy):
        """
        Simulation de négociation entre la borne et un véhicule.
        - vehicle_offer : Prix max que le véhicule est prêt à payer
        - strategy : Stratégie utilisée par le véhicule ("aggressive", "fast", "balanced")
        """
        import time
        start_time = time.time()  # Début de la négociation
        station_price = self.price_per_kWh  # Prix initial de la borne

        # Stratégie "aggressive" (offre très basse)
        if strategy == "aggressive":
            offer = vehicle_offer * 0.8  # Offre initiale très basse
        # Stratégie "fast" (offre haute pour aller vite)
        elif strategy == "fast":
            offer = vehicle_offer * 1.1  # Offre initiale plus haute pour garantir un accord rapide
        # Stratégie "balanced" (compromis entre prix et rapidité)
        else:
            offer = vehicle_offer

        while time.time() - start_time < 2:  # Limite de 2 secondes pour la négociation
            if station_price <= offer:
                return station_price  # Accord atteint

            # Ajustement de la borne en fonction de la stratégie
            if strategy == "aggressive":
                station_price -= 0.02  # La borne baisse lentement son prix
            elif strategy == "fast":
                station_price -= 0.05  # La borne baisse rapidement son prix pour conclure vite
            else:
                station_price -= 0.03  # Ajustement moyen pour une approche équilibrée

            if station_price < 0.20:  # Prix plancher
                return None  # Échec de la négociation

        return None  # Négociation échouée

In [None]:
class Vehicle:
    def __init__(self, start_location, end_location, battery_capacity=100, consumption_rate=0.2, urgency=1.0):
        """
        Initialise un véhicule autonome.
        - start_location, end_location : Coordonnées GPS
        - battery_capacity : Capacité batterie en kWh
        - consumption_rate : Consommation en kWh/km
        - urgency : Coefficient d’urgence (1 = normal, >1 = pressé)
        """
        self.start_location = start_location
        self.end_location = end_location
        self.battery_capacity = battery_capacity
        self.current_battery = battery_capacity
        self.consumption_rate = consumption_rate
        self.urgency = urgency  # Si élevé, le véhicule choisit une stratégie plus rapide
        self.needs_recharge = False

    def check_battery(self, distance_km):
        """Vérifie si le véhicule a assez d’énergie pour effectuer un trajet donné."""
        required_energy = distance_km * self.consumption_rate
        print(f"🔋 Batterie actuelle : {self.current_battery:.2f} kWh")
        print(f"📏 Énergie requise pour {distance_km} km : {required_energy:.2f} kWh")

        if self.current_battery < required_energy:
            self.needs_recharge = True
            print(f"⚠️ Batterie insuffisante ! Recharge nécessaire.")
        else:
            print(f"✅ Batterie suffisante pour atteindre la destination.")

    def determine_strategy(self):
        """Détermine la stratégie de négociation en fonction de l’urgence et de l’énergie restante."""
        if self.urgency > 1.5:
            return "fast"  # Véhicule pressé : accepte un prix plus élevé
        elif self.current_battery < self.battery_capacity * 0.2:
            return "aggressive"  # Batterie faible : négocie fortement le prix
        else:
            return "balanced"  # Cas normal

    def find_best_station(self, stations, max_price):
        """
        Cherche la meilleure borne en fonction de sa stratégie.
        - stations : Liste des bornes de recharge
        - max_price : Prix max que le véhicule accepte de payer
        """
        strategy = self.determine_strategy()
        print(f"🚗 Stratégie de négociation sélectionnée : {strategy}")

        best_station = None
        best_price = float('inf')

        for station in stations:
            print(f"💬 Négociation avec la borne ({station.lat}, {station.lon})...")
            agreed_price = station.negotiate_price(max_price, strategy)

            if agreed_price is not None and agreed_price < best_price:
                best_price = agreed_price
                best_station = station

        if best_station:
            print(f"✅ Recharge acceptée à {best_station.lat}, {best_station.lon} pour {best_price}€/kWh")
            return best_station
        else:
            print("❌ Aucune borne trouvée à un prix acceptable.")
            return None

In [None]:
class Vehicle:
    def __init__(self, start_location, end_location, battery_capacity=100, consumption_rate=0.2, urgency=1.0):
        """
        Initialise un véhicule autonome.
        - start_location, end_location : Coordonnées GPS
        - battery_capacity : Capacité batterie en kWh
        - consumption_rate : Consommation en kWh/km
        - urgency : Coefficient d’urgence (1 = normal, >1 = pressé)
        """
        self.start_location = start_location
        self.end_location = end_location
        self.battery_capacity = battery_capacity
        self.current_battery = battery_capacity
        self.consumption_rate = consumption_rate
        self.urgency = urgency  # Si élevé, le véhicule choisit une stratégie plus rapide
        self.needs_recharge = False

    def check_battery(self, distance_km):
        """Vérifie si le véhicule a assez d’énergie pour effectuer un trajet donné."""
        required_energy = distance_km * self.consumption_rate
        print(f"🔋 Batterie actuelle : {self.current_battery:.2f} kWh")
        print(f"📏 Énergie requise pour {distance_km} km : {required_energy:.2f} kWh")

        if self.current_battery < required_energy:
            self.needs_recharge = True
            print(f"⚠️ Batterie insuffisante ! Recharge nécessaire.")
        else:
            print(f"✅ Batterie suffisante pour atteindre la destination.")

    def determine_strategy(self):
        """Détermine la stratégie de négociation en fonction de l’urgence et de l’énergie restante."""
        if self.urgency > 1.5:
            return "fast"  # Véhicule pressé : accepte un prix plus élevé
        elif self.current_battery < self.battery_capacity * 0.2:
            return "aggressive"  # Batterie faible : négocie fortement le prix
        else:
            return "balanced"  # Cas normal

    def find_best_station(self, stations, max_price):
        """
        Cherche la meilleure borne en fonction de sa stratégie.
        - stations : Liste des bornes de recharge
        - max_price : Prix max que le véhicule accepte de payer
        """
        strategy = self.determine_strategy()
        print(f"🚗 Stratégie de négociation sélectionnée : {strategy}")

        best_station = None
        best_price = float('inf')

        for station in stations:
            print(f"💬 Négociation avec la borne ({station.lat}, {station.lon})...")
            agreed_price = station.negotiate_price(max_price, strategy)

            if agreed_price is not None and agreed_price < best_price:
                best_price = agreed_price
                best_station = station

        if best_station:
            print(f"✅ Recharge acceptée à {best_station.lat}, {best_station.lon} pour {best_price}€/kWh")
            return best_station
        else:
            print("❌ Aucune borne trouvée à un prix acceptable.")
            return None


## Nous allons maintenant essayer différents niveaux d'urgences et donc de négociations.

In [None]:
# --- Test global de la négociation véhicule-borne ---
print("\n=== 🛠️ TEST COMPLET DU SYSTÈME ===\n")

# Étape 1 : Générer les bornes de recharge
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)
print("\n📍 Bornes de recharge générées :")
for station in charging_stations:
    print(station)

# Étape 2 : Créer un véhicule avec batterie faible et urgence aléatoire
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=30,   # Batterie faible pour forcer la recharge
    consumption_rate=0.3,  # Consommation élevée
    urgency=random.uniform(1.0, 2.0)  # Urgence aléatoire
)

print(f"\n🚗 Véhicule créé avec une batterie de {vehicle.battery_capacity} kWh et un facteur d'urgence de {vehicle.urgency:.2f}\n")

# Étape 3 : Vérifier la batterie
trip_distance = 50  # Distance simulée en km
vehicle.check_battery(trip_distance)

# Étape 4 : Si une recharge est nécessaire, négocier avec les bornes
if vehicle.needs_recharge:
    print("\n🔍 Début de la négociation avec les bornes...\n")
    best_station = vehicle.find_best_station(charging_stations, max_price=0.35)  # Le véhicule accepte un prix max de 0.35 €/kWh

    if best_station:
        print("\n✅ La meilleure borne sélectionnée est :")
        print(best_station)

        # Étape 5 : Afficher la carte avec la borne sélectionnée en violet
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affichage interactif de la carte
    else:
        print("\n❌ Échec de la négociation : aucune borne ne correspond aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")



=== 🛠️ TEST COMPLET DU SYSTÈME ===


📍 Bornes de recharge générées :
<__main__.ChargingStation object at 0x7c3e02b34b50>
<__main__.ChargingStation object at 0x7c3e0a0671d0>
<__main__.ChargingStation object at 0x7c3e0a064810>
<__main__.ChargingStation object at 0x7c3e05d93c50>
<__main__.ChargingStation object at 0x7c3e02b363d0>

🚗 Véhicule créé avec une batterie de 30 kWh et un facteur d'urgence de 1.37

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 50 km : 15.00 kWh
✅ Batterie suffisante pour atteindre la destination.

✅ Batterie suffisante, aucune recharge nécessaire.


In [None]:
# --- Test global de la négociation véhicule-borne ---
print("\n=== 🛠️ TEST COMPLET DU SYSTÈME ===\n")

# Étape 1 : Générer les bornes de recharge
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)
print("\n📍 Bornes de recharge générées :")
for station in charging_stations:
    print(station)

# Étape 2 : Créer un véhicule avec batterie faible et urgence aléatoire
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=30,   # Batterie faible pour forcer la recharge
    consumption_rate=0.3,  # Consommation élevée
    urgency=1  # Urgence faible
)

print(f"\n🚗 Véhicule créé avec une batterie de {vehicle.battery_capacity} kWh et un facteur d'urgence de {vehicle.urgency:.2f}\n")

# Étape 3 : Vérifier la batterie
trip_distance = 500  # Distance simulée en km
vehicle.check_battery(trip_distance)

# Étape 4 : Si une recharge est nécessaire, négocier avec les bornes
if vehicle.needs_recharge:
    print("\n🔍 Début de la négociation avec les bornes...\n")
    best_station = vehicle.find_best_station(charging_stations, max_price=0.35)  # Le véhicule accepte un prix max de 0.35 €/kWh

    if best_station:
        print("\n✅ La meilleure borne sélectionnée est :")
        print(best_station)

        # Étape 5 : Afficher la carte avec la borne sélectionnée en violet
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affichage interactif de la carte
    else:
        print("\n❌ Échec de la négociation : aucune borne ne correspond aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")



=== 🛠️ TEST COMPLET DU SYSTÈME ===


📍 Bornes de recharge générées :
<__main__.ChargingStation object at 0x7c3e09eaa610>
<__main__.ChargingStation object at 0x7c3e09ea9290>
<__main__.ChargingStation object at 0x7c3e0a3c8590>
<__main__.ChargingStation object at 0x7c3e0a3c95d0>
<__main__.ChargingStation object at 0x7c3e0a3ca990>

🚗 Véhicule créé avec une batterie de 30 kWh et un facteur d'urgence de 1.00

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 500 km : 150.00 kWh
⚠️ Batterie insuffisante ! Recharge nécessaire.

🔍 Début de la négociation avec les bornes...

🚗 Stratégie de négociation sélectionnée : balanced
💬 Négociation avec la borne (48.869894449279165, 2.355639877789863)...
💬 Négociation avec la borne (48.880007289125295, 2.342237917963059)...
💬 Négociation avec la borne (48.86009977909264, 2.3347855055206495)...
💬 Négociation avec la borne (48.87194016475565, 2.2987378009320025)...
💬 Négociation avec la borne (48.83943494355874, 2.310543074454854)...
✅ Recharge accept

In [None]:
# --- Test global de la négociation véhicule-borne ---
print("\n=== 🛠️ TEST COMPLET DU SYSTÈME ===\n")

# Étape 1 : Générer les bornes de recharge
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)
print("\n📍 Bornes de recharge générées :")
for station in charging_stations:
    print(station)

# Étape 2 : Créer un véhicule avec batterie faible et urgence aléatoire
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=30,   # Batterie faible pour forcer la recharge
    consumption_rate=0.3,  # Consommation élevée
    urgency=2  # Urgence élevée
)

print(f"\n🚗 Véhicule créé avec une batterie de {vehicle.battery_capacity} kWh et un facteur d'urgence de {vehicle.urgency:.2f}\n")

# Étape 3 : Vérifier la batterie
trip_distance = 500  # Distance simulée en km
vehicle.check_battery(trip_distance)

# Étape 4 : Si une recharge est nécessaire, négocier avec les bornes
if vehicle.needs_recharge:
    print("\n🔍 Début de la négociation avec les bornes...\n")
    best_station = vehicle.find_best_station(charging_stations, max_price=0.35)  # Le véhicule accepte un prix max de 0.35 €/kWh

    if best_station:
        print("\n✅ La meilleure borne sélectionnée est :")
        print(best_station)

        # Étape 5 : Afficher la carte avec la borne sélectionnée en violet
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affichage interactif de la carte
    else:
        print("\n❌ Échec de la négociation : aucune borne ne correspond aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")



=== 🛠️ TEST COMPLET DU SYSTÈME ===


📍 Bornes de recharge générées :
<__main__.ChargingStation object at 0x7c3e0a3c8e10>
<__main__.ChargingStation object at 0x7c3e0a3cbf10>
<__main__.ChargingStation object at 0x7c3e09eb9110>
<__main__.ChargingStation object at 0x7c3e09eb8b50>
<__main__.ChargingStation object at 0x7c3e09eb8e50>

🚗 Véhicule créé avec une batterie de 30 kWh et un facteur d'urgence de 2.00

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 500 km : 150.00 kWh
⚠️ Batterie insuffisante ! Recharge nécessaire.

🔍 Début de la négociation avec les bornes...

🚗 Stratégie de négociation sélectionnée : fast
💬 Négociation avec la borne (48.864057180299035, 2.332004082560841)...
💬 Négociation avec la borne (48.85646244376308, 2.3279989883244214)...
💬 Négociation avec la borne (48.880011428148606, 2.3139706742665123)...
💬 Négociation avec la borne (48.87511796550716, 2.312178812212354)...
💬 Négociation avec la borne (48.86930992316845, 2.293307329292189)...
✅ Recharge acceptée à

## Essayons maintenant de négocier avec plusieurs borne sen parallèle

In [None]:
import concurrent.futures
import time

class ChargingStation:
    def __init__(self, lat, lon):
        """
        Initialise une borne de recharge.
        - lat, lon : Coordonnées GPS
        - prix au kWh
        - temps d’attente
        - puissance de charge
        """
        self.lat = lat
        self.lon = lon
        self.price_per_kWh = round(random.uniform(0.20, 0.50), 2)  # Prix entre 0.20 et 0.50 €/kWh
        self.wait_time = random.randint(0, 10)  # Attente entre 0 et 10 min
        self.power = random.choice([50, 100, 150])  # Puissance de charge

    def negotiate_price(self, vehicle_offer, strategy):
        """
        Simulation de négociation entre la borne et un véhicule.
        - vehicle_offer : Prix max que le véhicule est prêt à payer
        - strategy : Stratégie utilisée par le véhicule ("aggressive", "fast", "balanced")
        """
        start_time = time.time()
        station_price = self.price_per_kWh  # Prix initial de la borne

        if strategy == "aggressive":
            offer = vehicle_offer * 0.8  # Offre initiale très basse
        elif strategy == "fast":
            offer = vehicle_offer * 1.1  # Offre initiale plus haute pour garantir un accord rapide
        else:
            offer = vehicle_offer  # Stratégie équilibrée

        while time.time() - start_time < 2:  # Limite de 2 secondes pour la négociation
            if station_price <= offer:
                return station_price  # Accord atteint

            # Ajustement du prix selon la stratégie
            if strategy == "aggressive":
                station_price -= 0.02
            elif strategy == "fast":
                station_price -= 0.05
            else:
                station_price -= 0.03

            if station_price < 0.20:
                return None  # Échec de la négociation

        return None  # Négociation échouée


In [None]:
class Vehicle:
    def __init__(self, start_location, end_location, battery_capacity=100, consumption_rate=0.2, urgency=1.0, preferences=None):
        """
        Initialise un véhicule autonome.
        - start_location, end_location : Coordonnées GPS
        - battery_capacity : Capacité batterie en kWh
        - consumption_rate : Consommation en kWh/km
        - urgency : Facteur d’urgence (1 = normal, >1 = pressé)
        - preferences : Dictionnaire de pondération des critères
        """
        self.start_location = start_location
        self.end_location = end_location
        self.battery_capacity = battery_capacity
        self.current_battery = battery_capacity
        self.consumption_rate = consumption_rate
        self.urgency = urgency
        self.needs_recharge = False

        # Préférences utilisateur (pondérations des critères)
        self.preferences = preferences if preferences else {
            "cost": 0.5,      # Poids pour le coût (50%)
            "wait_time": 0.3, # Poids pour le temps d’attente (30%)
            "detour": 0.2     # Poids pour le détour (20%)
        }

    def check_battery(self, distance_km):
        """Vérifie si le véhicule a assez d’énergie pour effectuer un trajet donné."""
        required_energy = distance_km * self.consumption_rate
        print(f"🔋 Batterie actuelle : {self.current_battery:.2f} kWh")
        print(f"📏 Énergie requise pour {distance_km} km : {required_energy:.2f} kWh")

        if self.current_battery < required_energy:
            self.needs_recharge = True
            print(f"⚠️ Batterie insuffisante ! Recharge nécessaire.")
        else:
            print(f"✅ Batterie suffisante pour atteindre la destination.")

    def determine_strategy(self):
        """Détermine la stratégie de négociation en fonction de l’urgence et de l’énergie restante."""
        if self.urgency > 1.5:
            return "fast"
        elif self.current_battery < self.battery_capacity * 0.2:
            return "aggressive"
        else:
            return "balanced"

    def parallel_negotiation(self, stations, max_price):
        """
        Permet au véhicule de négocier simultanément avec plusieurs stations.
        - stations : Liste des bornes de recharge
        - max_price : Prix max accepté
        """
        strategy = self.determine_strategy()
        print(f"🚗 Stratégie de négociation sélectionnée : {strategy}")

        best_station = None
        best_price = float('inf')

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_station = {executor.submit(station.negotiate_price, max_price, strategy): station for station in stations}
            for future in concurrent.futures.as_completed(future_to_station):
                station = future_to_station[future]
                try:
                    agreed_price = future.result()
                    if agreed_price is not None and agreed_price < best_price:
                        best_price = agreed_price
                        best_station = station
                except Exception as e:
                    print(f"⚠️ Erreur avec la borne {station}: {e}")

        if best_station:
            print(f"\n✅ Meilleure borne sélectionnée : {best_station} avec prix négocié {best_price:.2f} €/kWh")
        else:
            print("\n❌ Aucune borne n'a proposé une offre acceptable.")

        return best_station


In [None]:
# --- Simulation complète ---
print("\n=== 🚗 Test de la négociation parallèle ===\n")

# Générer 5 bornes de recharge aléatoires
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)

# Créer un véhicule avec des préférences personnalisées
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=30,
    consumption_rate=0.3,
    urgency=1,  # Urgence aléatoire
    preferences={
        "cost": 0.4,      # 40% priorité au coût
        "wait_time": 0.4, # 40% priorité au temps d’attente
        "detour": 0.2     # 20% priorité au détour
    }
)

# Vérifier la batterie
trip_distance = 50  # Distance simulée en km
vehicle.check_battery(trip_distance)

# Si recharge nécessaire, lancer une négociation parallèle
if vehicle.needs_recharge:
    print("\n🔍 Négociation simultanée avec plusieurs bornes...\n")
    best_station = vehicle.parallel_negotiation(charging_stations, max_price=0.35)

    if best_station:
        # Afficher la carte avec la borne optimale en violet
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affichage interactif
    else:
        print("\n❌ Aucune borne trouvée correspondant aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")



=== 🚗 Test de la négociation parallèle ===

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 50 km : 15.00 kWh
✅ Batterie suffisante pour atteindre la destination.

✅ Batterie suffisante, aucune recharge nécessaire.


In [None]:
# --- Simulation complète ---
print("\n=== 🚗 Test de la négociation parallèle ===\n")

# Générer 5 bornes de recharge aléatoires
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)

# Créer un véhicule avec des préférences personnalisées
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=30,
    consumption_rate=0.3,
    urgency=2,
    preferences={
        "cost": 0.4,      # 40% priorité au coût
        "wait_time": 0.4, # 40% priorité au temps d’attente
        "detour": 0.2     # 20% priorité au détour
    }
)

# Vérifier la batterie
trip_distance = 500  # Distance simulée en km
vehicle.check_battery(trip_distance)

# Si recharge nécessaire, lancer une négociation parallèle
if vehicle.needs_recharge:
    print("\n🔍 Négociation simultanée avec plusieurs bornes...\n")
    best_station = vehicle.parallel_negotiation(charging_stations, max_price=0.35)

    if best_station:
        # Afficher la carte avec la borne optimale en violet
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affichage interactif
    else:
        print("\n❌ Aucune borne trouvée correspondant aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")



=== 🚗 Test de la négociation parallèle ===

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 500 km : 150.00 kWh
⚠️ Batterie insuffisante ! Recharge nécessaire.

🔍 Négociation simultanée avec plusieurs bornes...

🚗 Stratégie de négociation sélectionnée : fast

✅ Meilleure borne sélectionnée : <__main__.ChargingStation object at 0x7c3e0a07a750> avec prix négocié 0.34 €/kWh


## Nous allons à présent essayer de négocier des tarfis de groupe.

In [None]:
class ChargingStation:
    def __init__(self, lat, lon):
        """
        Initialise une borne de recharge.
        - lat, lon : Coordonnées GPS
        - prix au kWh
        - temps d’attente
        - puissance de charge
        """
        self.lat = lat
        self.lon = lon
        self.price_per_kWh = round(random.uniform(0.20, 0.50), 2)  # Prix entre 0.20 et 0.50 €/kWh
        self.wait_time = random.randint(0, 10)  # Attente entre 0 et 10 min
        self.power = random.choice([50, 100, 150])  # Puissance de charge

    def negotiate_group_price(self, vehicles, strategy):
        """
        Simule une négociation de groupe avec plusieurs véhicules.
        - vehicles : Liste des véhicules négociant ensemble
        - strategy : Stratégie commune de négociation ("aggressive", "fast", "balanced")
        """
        import time
        start_time = time.time()
        station_price = self.price_per_kWh  # Prix initial

        # Calcul du rabais potentiel selon le nombre de véhicules
        num_vehicles = len(vehicles)
        discount_factor = min(0.1 * num_vehicles, 0.3)  # Réduction max de 30%

        # Stratégies de négociation
        if strategy == "aggressive":
            offer = station_price * (1 - discount_factor) * 0.8
        elif strategy == "fast":
            offer = station_price * (1 - discount_factor) * 1.1
        else:
            offer = station_price * (1 - discount_factor)

        while time.time() - start_time < 2:  # Limite de 2 secondes pour la négociation
            if station_price <= offer:
                return station_price  # Accord atteint

            # Ajustement du prix selon la stratégie
            if strategy == "aggressive":
                station_price -= 0.02
            elif strategy == "fast":
                station_price -= 0.05
            else:
                station_price -= 0.03

            if station_price < 0.20:
                return None  # Échec de la négociation

        return None  # Négociation échouée


In [None]:
class Vehicle:
    def __init__(self, start_location, end_location, battery_capacity=100, consumption_rate=0.2, urgency=1.0, preferences=None):
        """
        Initialise un véhicule autonome.
        - start_location, end_location : Coordonnées GPS
        - battery_capacity : Capacité batterie en kWh
        - consumption_rate : Consommation en kWh/km
        - urgency : Facteur d’urgence (1 = normal, >1 = pressé)
        - preferences : Dictionnaire de pondération des critères
        """
        self.start_location = start_location
        self.end_location = end_location
        self.battery_capacity = battery_capacity
        self.current_battery = battery_capacity
        self.consumption_rate = consumption_rate
        self.urgency = urgency
        self.needs_recharge = False

        # Préférences utilisateur (pondérations des critères)
        self.preferences = preferences if preferences else {
            "cost": 0.5,
            "wait_time": 0.3,
            "detour": 0.2
        }

    def check_battery(self, distance_km):
        """Vérifie si le véhicule a assez d’énergie pour effectuer un trajet donné."""
        required_energy = distance_km * self.consumption_rate
        print(f"🔋 Batterie actuelle : {self.current_battery:.2f} kWh")
        print(f"📏 Énergie requise pour {distance_km} km : {required_energy:.2f} kWh")

        if self.current_battery < required_energy:
            self.needs_recharge = True
            print(f"⚠️ Batterie insuffisante ! Recharge nécessaire.")
        else:
            print(f"✅ Batterie suffisante pour atteindre la destination.")

    def find_nearby_vehicles(self, vehicles, max_distance=2.0):
        """
        Recherche les véhicules proches (moins de max_distance km) pour une négociation de groupe.
        - vehicles : Liste des autres véhicules
        - max_distance : Distance max en km pour former un groupe
        """
        nearby_vehicles = []
        for v in vehicles:
            if v != self:
                distance = random.uniform(0, 5)  # Simule une distance en km
                if distance <= max_distance:
                    nearby_vehicles.append(v)

        return nearby_vehicles

    def group_negotiation(self, vehicles, stations, max_price):
        """
        Lancement de la négociation de groupe avec plusieurs véhicules sur plusieurs stations.
        - vehicles : Liste des véhicules candidats
        - stations : Liste des bornes
        - max_price : Prix max que les véhicules acceptent
        """
        group = self.find_nearby_vehicles(vehicles)
        if not group:
            print("❌ Aucun autre véhicule à proximité pour négocier en groupe.")
            return None

        strategy = self.determine_strategy()
        best_station = None
        best_price = float('inf')

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_station = {
                executor.submit(station.negotiate_group_price, group, strategy): station
                for station in stations
            }
            for future in concurrent.futures.as_completed(future_to_station):
                station = future_to_station[future]
                try:
                    agreed_price = future.result()
                    if agreed_price is not None and agreed_price < best_price:
                        best_price = agreed_price
                        best_station = station
                except Exception as e:
                    print(f"⚠️ Erreur avec la borne {station}: {e}")

        if best_station:
            print(f"\n✅ Meilleure borne pour la négociation de groupe : {best_station} avec prix négocié {best_price:.2f} €/kWh")
        else:
            print("\n❌ Aucune borne n'a proposé une offre acceptable pour le groupe.")

        return best_station


In [None]:
# --- Simulation complète ---
print("\n=== 🚗 Test de la négociation de groupe ===\n")

# Générer 5 bornes de recharge aléatoires
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)

# Créer plusieurs véhicules
vehicles = [
    Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=30, consumption_rate=0.3, urgency=random.uniform(1.0, 2.0))
    for _ in range(3)  # 3 véhicules pour la simulation
]

# Sélection du premier véhicule comme leader du groupe
leader_vehicle = vehicles[0]

# Vérifier la batterie
trip_distance = 50  # Distance simulée en km
leader_vehicle.check_battery(trip_distance)

# Si recharge nécessaire, tenter une négociation de groupe
if leader_vehicle.needs_recharge:
    print("\n🔍 Négociation de groupe avec plusieurs véhicules...\n")
    best_station = leader_vehicle.group_negotiation(vehicles, charging_stations, max_price=0.35)

    if best_station:
        # Afficher la carte avec la borne optimale en violet
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affichage interactif
    else:
        print("\n❌ Aucune borne trouvée correspondant aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")



=== 🚗 Test de la négociation de groupe ===

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 50 km : 15.00 kWh
✅ Batterie suffisante pour atteindre la destination.

✅ Batterie suffisante, aucune recharge nécessaire.


In [None]:
# --- Simulation complète ---
print("\n=== 🚗 Test de la négociation de groupe ===\n")

# Générer 5 bornes de recharge aléatoires
charging_stations = generate_charging_stations(5, start_lat, start_lon, end_lat, end_lon)

# Créer plusieurs véhicules
vehicles = [
    Vehicle((start_lat, start_lon), (end_lat, end_lon), battery_capacity=30, consumption_rate=0.3, urgency=random.uniform(1.0, 2.0))
    for _ in range(3)  # 3 véhicules pour la simulation
]

# Sélection du premier véhicule comme leader du groupe
leader_vehicle = vehicles[0]

# Vérifier la batterie
trip_distance = 500  # Distance simulée en km
leader_vehicle.check_battery(trip_distance)

# Si recharge nécessaire, tenter une négociation de groupe
if leader_vehicle.needs_recharge:
    print("\n🔍 Négociation de groupe avec plusieurs véhicules...\n")
    best_station = leader_vehicle.group_negotiation(vehicles, charging_stations, max_price=0.35)

    if best_station:
        # Afficher la carte avec la borne optimale en violet
        m = display_selected_station_map(charging_stations, best_station, (start_lat, start_lon), (end_lat, end_lon))
        m  # Affichage interactif
    else:
        print("\n❌ Aucune borne trouvée correspondant aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")


=== 🚗 Test de la négociation de groupe ===

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 500 km : 150.00 kWh
⚠️ Batterie insuffisante ! Recharge nécessaire.

🔍 Négociation de groupe avec plusieurs véhicules...

❌ Aucun autre véhicule à proximité pour négocier en groupe.

❌ Aucune borne trouvée correspondant aux critères.


## Nous allons maintenan tenter de modéliser le tafic

In [None]:
class Traffic:
    def __init__(self):
        """Simule un modèle de trafic avec des congestions aléatoires."""
        self.traffic_data = {}  # Dictionnaire {route_id: temps supplémentaire en minutes}

    def update_traffic(self, city_graph):
        """Génère un trafic aléatoire sur certaines routes."""
        for u, v, data in city_graph.edges(data=True):
            if random.random() < 0.3:  # 30% de chances d'avoir un embouteillage
                delay = random.randint(5, 20)  # Délai supplémentaire en minutes
                self.traffic_data[(u, v)] = delay

    def get_traffic_delay(self, u, v):
        """Retourne le retard sur un segment routier donné."""
        return self.traffic_data.get((u, v), 0)  # Retourne 0 si pas de retard


In [None]:
class ChargingStation:
    def __init__(self, lat, lon):
        """
        Initialise une borne de recharge.
        - lat, lon : Coordonnées GPS
        - prix au kWh
        - temps d’attente
        - puissance de charge
        - état de fonctionnement
        """
        self.lat = lat
        self.lon = lon
        self.price_per_kWh = round(random.uniform(0.20, 0.50), 2)
        self.wait_time = random.randint(0, 10)
        self.power = random.choice([50, 100, 150])
        self.operational = random.random() > 0.15  # 15% de probabilité de panne

    def is_operational(self):
        """Retourne True si la borne est fonctionnelle, False sinon."""
        return self.operational


In [None]:
class Vehicle:
    def __init__(self, start_location, end_location, battery_capacity=100, consumption_rate=0.2, urgency=1.0, preferences=None):
        """
        Initialise un véhicule autonome.
        - start_location, end_location : Coordonnées GPS
        - battery_capacity : Capacité batterie en kWh
        - consumption_rate : Consommation en kWh/km
        - urgency : Facteur d’urgence (1 = normal, >1 = pressé)
        - preferences : Dictionnaire de pondération des critères
        """
        self.start_location = start_location
        self.end_location = end_location
        self.battery_capacity = battery_capacity
        self.current_battery = battery_capacity
        self.consumption_rate = consumption_rate
        self.urgency = urgency
        self.needs_recharge = False

        # Préférences utilisateur (pondérations des critères)
        self.preferences = preferences if preferences else {
            "cost": 0.5,
            "wait_time": 0.3,
            "detour": 0.2
        }

    def check_battery(self, distance_km):
        """Vérifie si le véhicule a assez d’énergie pour effectuer un trajet donné."""
        required_energy = distance_km * self.consumption_rate
        print(f"🔋 Batterie actuelle : {self.current_battery:.2f} kWh")
        print(f"📏 Énergie requise pour {distance_km} km : {required_energy:.2f} kWh")

        if self.current_battery < required_energy:
            self.needs_recharge = True
            print(f"⚠️ Batterie insuffisante ! Recharge nécessaire.")
        else:
            print(f"✅ Batterie suffisante pour atteindre la destination.")

    def find_nearby_vehicles(self, vehicles, max_distance=2.0):
        """
        Recherche les véhicules proches (moins de max_distance km) pour une négociation de groupe.
        - vehicles : Liste des autres véhicules
        - max_distance : Distance max en km pour former un groupe
        """
        nearby_vehicles = []
        for v in vehicles:
            if v != self:
                distance = random.uniform(0, 5)  # Simule une distance en km
                if distance <= max_distance:
                    nearby_vehicles.append(v)

        return nearby_vehicles

    def group_negotiation(self, vehicles, stations, max_price):
        """
        Lancement de la négociation de groupe avec plusieurs véhicules sur plusieurs stations.
        - vehicles : Liste des véhicules candidats
        - stations : Liste des bornes
        - max_price : Prix max que les véhicules acceptent
        """
        group = self.find_nearby_vehicles(vehicles)
        if not group:
            print("❌ Aucun autre véhicule à proximité pour négocier en groupe.")
            return None

        strategy = self.determine_strategy()
        best_station = None
        best_price = float('inf')

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_station = {
                executor.submit(station.negotiate_group_price, group, strategy): station
                for station in stations
            }
            for future in concurrent.futures.as_completed(future_to_station):
                station = future_to_station[future]
                try:
                    agreed_price = future.result()
                    if agreed_price is not None and agreed_price < best_price:
                        best_price = agreed_price
                        best_station = station
                except Exception as e:
                    print(f"⚠️ Erreur avec la borne {station}: {e}")

        if best_station:
            print(f"\n✅ Meilleure borne pour la négociation de groupe : {best_station} avec prix négocié {best_price:.2f} €/kWh")
        else:
            print("\n❌ Aucune borne n'a proposé une offre acceptable pour le groupe.")

        return best_station


    def find_best_station_with_traffic(self, stations, traffic_model, city_graph):
        """
        Sélectionne la meilleure borne en fonction du trafic et des pannes.
        - stations : Liste des bornes de recharge
        - traffic_model : Modèle de trafic simulé
        - city_graph : Graphe routier
        """
        best_station = None
        best_utility = -1

        for station in stations:
            if not station.is_operational():
                print(f"⚠️ Borne {station.lat}, {station.lon} en panne. Ignorée.")
                continue  # Ignorer les bornes en panne

            # Simule un détour (aléatoire entre 0 et 5 km)
            detour_km = random.uniform(0, 5)

            # Ajoute le trafic au temps estimé
            total_delay = sum(
                traffic_model.get_traffic_delay(u, v)
                for u, v, _ in city_graph.edges(data=True)  # 🔥 Correction ici
            )

            # Calcul de l'utilité avec le trafic pris en compte
            utility = self.compute_utility(station, detour_km) - (total_delay / 60) * 0.1

            print(f"📊 Borne ({station.lat}, {station.lon}) | Utilité : {utility:.2f} | Détour : {detour_km:.2f} km | Retard : {total_delay} min")

            if utility > best_utility:
                best_utility = utility
                best_station = station

        if best_station:
            print(f"\n✅ Meilleure borne sélectionnée avec trafic : {best_station} avec utilité {best_utility:.2f}")
        else:
            print("\n❌ Aucune borne satisfaisante.")

        return best_station

    def compute_utility(self, station, detour_km):
        """
        Calcule l’utilité d’une borne en fonction des critères pondérés.
        - station : Objet ChargingStation
        - detour_km : Distance en km ajoutée par cette borne
        """
        # Normalisation des critères (on met tout sur une échelle de 0 à 1)
        max_price = 0.50  # Prix max de référence
        max_wait = 10     # Temps d'attente max de référence (10 min)
        max_detour = 5    # Détour max raisonnable (5 km)

        normalized_price = station.price_per_kWh / max_price
        normalized_wait = station.wait_time / max_wait
        normalized_detour = detour_km / max_detour

        # Calcul de l'utilité avec les pondérations définies
        utility = (
            self.preferences["cost"] * (1 - normalized_price) +  # Moins cher = mieux
            self.preferences["wait_time"] * (1 - normalized_wait) +  # Moins d'attente = mieux
            self.preferences["detour"] * (1 - normalized_detour)  # Moins de détour = mieux
        )

        return utility


In [None]:
# --- Simulation complète ---
print("\n=== 🚗 Test de l’optimisation avec gestion du trafic et des pannes ===\n")

# Générer le graphe routier
city_graph = ox.graph_from_place("Paris, France", network_type="drive")

# Générer un modèle de trafic
traffic_model = Traffic()
traffic_model.update_traffic(city_graph)

# Générer 40 bornes de recharge aléatoires
charging_stations = generate_charging_stations(40, start_lat, start_lon, end_lat, end_lon)

# Créer un véhicule avec des préférences personnalisées
vehicle = Vehicle(
    start_location=(start_lat, start_lon),
    end_location=(end_lat, end_lon),
    battery_capacity=30,
    consumption_rate=0.3,
    urgency=1,
    preferences={
        "cost": 0.4,
        "wait_time": 0.4,
        "detour": 0.2
    }
)

# Vérifier la batterie
trip_distance = 200
vehicle.check_battery(trip_distance)

# Si recharge nécessaire, choisir la borne optimale avec trafic et pannes
if vehicle.needs_recharge:
    print("\n🔍 Sélection de la meilleure borne avec gestion du trafic et des pannes...\n")
    best_station = vehicle.find_best_station_with_traffic(charging_stations, traffic_model, city_graph)

    if best_station:
        # Afficher la carte avec la borne optimale en violet
        m = display_complete_map(vehicle, charging_stations, best_station, traffic_model, city_graph, (start_lat, start_lon), (end_lat, end_lon))
        m
    else:
        print("\n❌ Aucune borne trouvée correspondant aux critères.")
else:
    print("\n✅ Batterie suffisante, aucune recharge nécessaire.")


=== 🚗 Test de l’optimisation avec gestion du trafic et des pannes ===

🔋 Batterie actuelle : 30.00 kWh
📏 Énergie requise pour 200 km : 60.00 kWh
⚠️ Batterie insuffisante ! Recharge nécessaire.

🔍 Sélection de la meilleure borne avec gestion du trafic et des pannes...

📊 Borne (48.85823318720628, 2.3509430992469165) | Utilité : -114.22 | Détour : 1.02 km | Retard : 68841 min
📊 Borne (48.86457999532343, 2.340075466678695) | Utilité : -114.55 | Détour : 3.34 km | Retard : 68841 min
📊 Borne (48.85857606719068, 2.306216977582822) | Utilité : -114.13 | Détour : 1.14 km | Retard : 68841 min
📊 Borne (48.84718904081056, 2.282096993146802) | Utilité : -114.55 | Détour : 2.64 km | Retard : 68841 min
📊 Borne (48.86818344970398, 2.324356178357448) | Utilité : -114.16 | Détour : 4.18 km | Retard : 68841 min
📊 Borne (48.85479396113941, 2.3553406150996046) | Utilité : -114.44 | Détour : 4.92 km | Retard : 68841 min
📊 Borne (48.85061570734877, 2.277710095107982) | Utilité : -114.41 | Détour : 0.25 km 

In [None]:
import folium

def display_complete_map(vehicle, stations, best_station, traffic_model, city_graph, start_coords, end_coords):
    """
    Affiche une carte interactive avec :
    - Le départ et l’arrivée du véhicule
    - Les bornes de recharge (opérationnelles en bleu, en panne en rouge)
    - La meilleure borne sélectionnée en violet
    - Les routes congestionnées (orange/rouge selon l’intensité du trafic)
    """
    m = folium.Map(location=start_coords, zoom_start=14)

    # Ajouter le départ et l’arrivée du véhicule
    folium.Marker(start_coords, popup="🚗 Départ", icon=folium.Icon(color="green")).add_to(m)
    folium.Marker(end_coords, popup="🏁 Arrivée", icon=folium.Icon(color="red")).add_to(m)

    # Ajouter les bornes de recharge
    for station in stations:
        color = "blue" if station.is_operational() else "red"  # Bleu = opérationnelle, Rouge = en panne
        popup_text = f"💰 {station.price_per_kWh}€/kWh | ⏳ {station.wait_time} min | ⚡ {station.power} kW"
        folium.Marker([station.lat, station.lon], popup=popup_text, icon=folium.Icon(color=color)).add_to(m)

    # Mettre en évidence la borne sélectionnée
    if best_station:
        folium.Marker(
            [best_station.lat, best_station.lon],
            popup=f"⭐ Borne sélectionnée : {best_station.price_per_kWh}€/kWh",
            icon=folium.Icon(color="purple")
        ).add_to(m)

    # Ajouter les routes congestionnées (Orange = 5-10 min de retard, Rouge = >10 min)
    for (u, v, data) in city_graph.edges(data=True):
        delay = traffic_model.get_traffic_delay(u, v)
        if delay > 0:
            color = "orange" if delay <= 10 else "red"
            coords = [(city_graph.nodes[u]['y'], city_graph.nodes[u]['x']), (city_graph.nodes[v]['y'], city_graph.nodes[v]['x'])]
            folium.PolyLine(coords, color=color, weight=5, opacity=0.7, tooltip=f"🚦 Retard : {delay} min").add_to(m)

    return m  # Afficher la carte

# Affichage de la carte finale avec toutes les informations
if best_station:
    final_map = display_complete_map(vehicle, charging_stations, best_station, traffic_model, city_graph, (start_lat, start_lon), (end_lat, end_lon))
    final_map  # Affichage interactif
else:
    print("\n❌ Aucune borne trouvée correspondant aux critères.")



❌ Aucune borne trouvée correspondant aux critères.


## Maitenant, essayons de faire une petite interface graphique

In [None]:
!pip install streamlit pyngrok streamlit_folium

Collecting streamlit
  Downloading streamlit-1.42.0-py2.py3-none-any.whl.metadata (8.9 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.3-py3-none-any.whl.metadata (8.7 kB)
Collecting streamlit_folium
  Downloading streamlit_folium-0.24.0-py3-none-any.whl.metadata (413 bytes)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.42.0-py2.py3-none-any.whl (9.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.6/9.6 MB[0m [31m65.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyngrok-7.2.3-py3-none-any.whl (23 kB)
Downloading streamlit_folium-0.24.0-py3-none-any.whl (328 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m328.5/

In [None]:
# import streamlit as st
# import folium
# from streamlit_folium import st_folium
# import osmnx as ox
# import networkx as nx

# # Fonction pour générer une carte Folium
# def generate_map(route_coords, color, tooltip):
#     m = folium.Map(location=route_coords[0], zoom_start=14)
#     folium.PolyLine(route_coords, color=color, weight=5, opacity=0.7, tooltip=tooltip).add_to(m)
#     folium.Marker(route_coords[0], popup="Départ", icon=folium.Icon(color="green")).add_to(m)
#     folium.Marker(route_coords[-1], popup="Arrivée", icon=folium.Icon(color="red")).add_to(m)
#     return m

# # Interface utilisateur
# st.title("Planner d'itinéraire intelligent")
# st.write("Entrez les points de départ et d'arrivée pour calculer les itinéraires.")

# # Saisie des points de départ et d'arrivée
# start_location = st.text_input("Point de départ", "Tour Eiffel, Paris, France")
# end_location = st.text_input("Point d'arrivée", "Louvre, Paris, France")

# # Bouton pour calculer l'itinéraire
# if st.button("Calculer l'itinéraire"):
#     # Conversion des adresses en coordonnées GPS
#     start_lat, start_lon = ox.geocode(start_location)
#     end_lat, end_lon = ox.geocode(end_location)

#     # Charger le graphe routier
#     city_graph = ox.graph_from_place("Paris, France", network_type="drive")

#     # Trouver les nœuds les plus proches
#     orig_node = ox.nearest_nodes(city_graph, start_lon, start_lat)
#     dest_node = ox.nearest_nodes(city_graph, end_lon, end_lat)

#     # Calculer l'itinéraire initial
#     initial_route = nx.shortest_path(city_graph, orig_node, dest_node, weight="length")
#     initial_coords = [(city_graph.nodes[node]['y'], city_graph.nodes[node]['x']) for node in initial_route]

#     # Générer et afficher la carte initiale
#     st.subheader("Itinéraire initial")
#     initial_map = generate_map(initial_coords, "blue", "Itinéraire initial")
#     st_folium(initial_map, width=700, height=500)

#     # Ajouter une simulation de recharge
#     vehicle_battery = 20  # Batterie faible pour forcer la recharge
#     distance = nx.shortest_path_length(city_graph, orig_node, dest_node, weight="length") / 1000
#     energy_needed = distance * 0.3

#     if vehicle_battery < energy_needed:
#         st.warning("Batterie insuffisante ! Ajout d'une station de recharge.")
#         # Simulation d'une station de recharge
#         station_coords = (start_lat + 0.01, start_lon + 0.01)
#         station_node = ox.nearest_nodes(city_graph, station_coords[1], station_coords[0])
#         route_to_station = nx.shortest_path(city_graph, orig_node, station_node, weight="length")
#         route_from_station = nx.shortest_path(city_graph, station_node, dest_node, weight="length")
#         modified_route = route_to_station + route_from_station[1:]
#         modified_coords = [(city_graph.nodes[node]['y'], city_graph.nodes[node]['x']) for node in modified_route]

#         # Générer et afficher la carte modifiée
#         st.subheader("Itinéraire modifié (avec recharge)")
#         modified_map = generate_map(modified_coords, "red", "Itinéraire modifié")
#         st_folium(modified_map, width=700, height=500)
#     else:
#         st.success("Batterie suffisante pour atteindre la destination.")

In [None]:
# from pyngrok import ngrok

# # Lancer l'application Streamlit
# !streamlit run app.py & npx kill-port 8501
# public_url = ngrok.connect(8501)
# print(f"📡 Accédez à l'application ici : {public_url}")