# 0- Installations des bibliothèques

In [None]:
!pip install flask
!pip install plotly
!pip install flask-cors
!pip install networkx
!pip install numpy

# 1- Importation des bibliotheque necessaire

In [None]:
from flask import Flask, request, jsonify, session 
import plotly.graph_objects as go #pour la visualisation des graphs.
from flask_cors import CORS #pour facilite a gestion des requêtes CORS (Cross-Origin Resource Sharing).
from itertools import combinations, groupby, islice  # pour travailler avec des itérations.
import networkx as nx  #manipulation de graphes complexes.
import random #pour la génération de nombres aléatoires.
from math import gamma #pour les fonctions mathématiques
import numpy as np #pour mnipulation de tableaux multidimensionnels.
import csv #pour lecture/écriture de fichiers CSV.
import threading # pour la gestion des threads.
from concurrent.futures import ThreadPoolExecutor #pour exécution asynchrone avec des threads
from multiprocessing import Manager #Pour la gestion des objets partagés entre processus.
from time import time # pour  Mesuree le temps

#### on va utiliser flask comme un framework de backend 

In [None]:
app = Flask(__name__)#on va créer une instance de l'application Flask
CORS(app) # activation du support CORS pour l'application Flask

# 2 - Génération d’un graphe


Premièrement, on va créer une fonction qui sera responsable de la génération du graphe dans notre projet.


Donc il faut tout d'abord définir la fonction qui génère les arêtes du graphe.

In [None]:
def generate_edges(startindex,endindex, n, p,G,pos):#cette fonction générer les arêtes du graphe.
    #startindex et endindex : pour spécifier la plage d’indices
    #n est le nombre total de noeuds
    #p est la probabilité de connexion entre les noeuds
    #pos est  les positions des noeuds
    # G est le graph génèrer
    for i in range(startindex,endindex):
         for j in range(i+1, n):
            distance = ((pos[i][0] - pos[j][0])**2 + (pos[i][1] - pos[j][1])**2)**0.5
            if distance <= p:
    #On utilise ici la fonction 'randint' du module 'random' pour générer aléatoirement des valeurs de distance et de charge pour les arêtes nouvellement créées.
                G.add_edge(i, j, distance=random.randint(1, 10), charge=random.randint(1, 10))
                


Alors, concernant la génération du graphe(aléatoire non orienté), pour accélérer le processus, on va utiliser des threads.

chaque thread est responsable d’une tranche de 100 noeuds (on peut la modifier en fonction de nos besoins)

In [None]:
def generate_random_graph_with_threads(n, p,nodes_per_thread=100):

    G = nx.Graph()#crée un objet de graphe à l'aide de la bibliothèque NetworkX
    G.add_nodes_from(range(n))#ajoute des nœuds au graphe G 
    pos = {node: (random.random(), random.random()) for node in range(n)}
    nx.set_node_attributes(G, pos, 'pos')#cette methode de NetworkX permet d'attribuer  les positions  générées aux nœuds du graphe G
    if p <= 0:
        return G
    if p > 1:
        return nx.complete_graph(n, create_using=G)# cette fonction du module NetworkX pour créer un graphe complet non orienté avec n nœuds, en utilisant le graphe G
    
    # calcul du nombre de threads nécessaires
    nbth = n // nodes_per_thread
    if nbth == 0:
        nbth = 1
    # créer des threads pour générer les arêtes du graphe
    threads = []
    for i in range(nbth):
        startindex = i * nodes_per_thread
        endindex = startindex + nodes_per_thread
        if i == nbth - 1:
            endindex = n
        
        th = threading.Thread(target=generate_edges, args=(startindex,endindex, n, p,G,pos))#Pour créer une instance du module threading et exécuter la fonction generate_edges en parallèle.
        th.start()
        threads.append(th)
    # attente de la fin de tous les threads
    for th in threads:
        th.join()
        
    return G

# 3 - Visualisation du graphe


Maintenant, après la génération du graphe, nous avons besoin d'une fonction qui visualise ce graphe.

In [None]:
def get_fig(G):
     #des listes pour les coordonnées des arêtes
    edge_x = []
    edge_y = []
    # parcours des arêtes du graphe
    for edge in G.edges():
        x0, y0 = G.nodes[edge[0]]['pos']
        x1, y1 = G.nodes[edge[1]]['pos']
        edge_x.append(x0)
        edge_x.append(x1)
        edge_x.append(None)
        edge_y.append(y0)
        edge_y.append(y1)
        edge_y.append(None)
# définition de la trace pour les arêtes du graphe
    edge_trace = {
        'x': edge_x,
        'y': edge_y,
        'line': {'width': 0.5, 'color': '#888'},
        'hoverinfo': 'none',
        'mode': 'lines'
    }
 # création des listes pour les coordonnées des nœuds
    node_x = []
    node_y = []
    for node in G.nodes():#on utilise la méthode nodes() de NetworkXest  pour itérer sur tous les nœuds du graphe G  pour la visualisation du graphe
        x, y = G.nodes[node]['pos']
        node_x.append(x)
        node_y.append(y)
# la trace pour les nœuds du graphe
    node_trace = {
        'x': node_x,
        'y': node_y,
        'mode': 'markers',
        'hoverinfo': 'text',
        'marker': {
            'showscale': True,
            'colorscale': 'YlGnBu',
            'reversescale': True,
            'color': [],
            'size': 10,
            'colorbar': {
                'thickness': 15,
                'title': 'Node Connections',
                'xanchor': 'left',
                'titleside': 'right'
            },
            'line_width': 2
        }
    }
 # calcul des adjacences(d'arêtes connectées à ce nœud) et du texte pour chaque nœud
    node_adjacencies = []
    node_text = []
    i=0
    for node, adjacencies in enumerate(G.adjacency()):
        node_adjacencies.append(len(adjacencies[1]))
        node_text.append('Node Id: '+str(i)+' is of connections: ' + str(len(adjacencies[1])))
        i+= 1
    # attribution des valeurs calculées aux propriétés de la trace des nœuds

    node_trace['marker']['color'] = node_adjacencies
    node_trace['text'] = node_text

    # la mise en page de la visualisation
    layout = {
        'title': '<br>The Graph Generated',
        'titlefont_size': 16,
        'showlegend': False,
        'hovermode': 'closest',
        'margin': {'b': 20, 'l': 5, 'r': 5, 't': 40},
        'annotations': [
            {'text': "Choose your Source and Destination then click on Graph Navigator to find the optimal path for them",
             'showarrow': False,
             'xref': "paper", 'yref': "paper",
             'x': 0.005, 'y': -0.002}
        ],
        'xaxis': {'showgrid': False, 'zeroline': False, 'showticklabels': False},
        'yaxis': {'showgrid': False, 'zeroline': False, 'showticklabels': False}
    }

    return {"edge_trace": edge_trace,"node_trace": node_trace, "layout": layout}

Ensuite, on définit cette fonction pour la visualisation du graphe avec un chemin spécifié, afin de visualiser le graphe avec le meilleur chemin

In [None]:
def get_fig_with_path(G, best_path_text, best_path_info, path=[]):
   # création des listes pour les coordonnées des arêtes
    edge_x = []
    edge_y = []
    # parcours des arêtes du graphe
    for edge in G.edges():
        x0, y0 = G.nodes[edge[0]]['pos']
        x1, y1 = G.nodes[edge[1]]['pos']
        edge_x.append(x0)
        edge_x.append(x1)
        edge_x.append(None)
        edge_y.append(y0)
        edge_y.append(y1)
        edge_y.append(None)
 # la trace pour les arêtes du graphe
    edge_trace = {
            'x': edge_x,
            'y': edge_y,
            'line': {'width': 0.5, 'color': '#888'},
            'hoverinfo': 'none',
            'mode': 'lines'
        }

    # on va séparer les arêtes en fonction de leur appartenance ou non au chemin.
    edge_sets = [path, list(set(G.edges()) - set(path))]
    colors = ['#ff0000', '#888']  # couleur pour les arêtes dans le chemin et couleur par défaut

    edge_traces = []
    for edge_set, c in zip(edge_sets, colors):
        edge_x = []
        edge_y = []
        for edge in edge_set:
            x0, y0 = G.nodes[edge[0]]['pos']
            x1, y1 = G.nodes[edge[1]]['pos']
            edge_x.append(x0)
            edge_x.append(x1)
            edge_x.append(None)
            edge_y.append(y0)
            edge_y.append(y1)
            edge_y.append(None)

        # la trace pour les arêtes spécifiques au chemin ou non
        edge_traces.append({
            'x': edge_x, 
            'y': edge_y,
            'line':  {'width':0.5, 'color':c},
            'hoverinfo': 'none',
            'mode': 'lines'})
        
    # des listes pour les coordonnées des nœuds
    node_x = []
    node_y = []
    for node in G.nodes():
        x, y = G.nodes[node]['pos']
        node_x.append(x)
        node_y.append(y)

    # la trace pour les nœuds du graphe
    node_trace = {
        'x': node_x,
        'y': node_y,
        'mode': 'markers',
        'hoverinfo': 'text',
        'marker': {
            'showscale': True,
            'colorscale': 'YlGnBu',
            'reversescale': True,
            'color': [],
            'size': 10,
            'colorbar': {
                'thickness': 15,
                'title': 'Node Connections',
                'xanchor': 'left',
                'titleside': 'right'
            },
            'line_width': 2
        }
    }

    # calcul des adjacences et du texte pour chaque nœud
    node_adjacencies = []
    node_text = []
    i = 0
    for node, adjacencies in enumerate(G.adjacency()):
        node_adjacencies.append(len(adjacencies[1]))
        node_text.append('Node Id: '+str(i)+' is of connections: ' + str(len(adjacencies[1])))
        i += 1

    # attribution des valeurs calculées aux propriétés de la trace des nœuds
    node_trace['marker']['color'] = node_adjacencies
    node_trace['text'] = node_text    
    # définition du layout de la visualisation
    layout = {
        'title': best_path_info,
        'titlefont_size': 16,
        'showlegend': False,
        'hovermode': 'closest',
        'margin': {'b': 20, 'l': 5, 'r': 5, 't': 40},
        'annotations': [
            {'text': best_path_text,
             'showarrow': False,
             'xref': "paper", 'yref': "paper",
             'x': 0.005, 'y': -0.002}
        ],
        'xaxis': {'showgrid': False, 'zeroline': False, 'showticklabels': False},
        'yaxis': {'showgrid': False, 'zeroline': False, 'showticklabels': False}
    }

    return {"data": edge_traces + [node_trace], "layout": layout}

# 4 - Normalisation du graphe


Alors, on a besoin de normaliser ce graphe, c'est-à-dire que l'on doit normaliser les charges et distances dans l'intervalle [0,1],en utilisant les threads.

In [None]:
def find_max_distance(edges):#on cherche la distance maximale parmi les arêtes du graphe.
    if not edges:
        return 0
    max_distance = max(data['distance'] for _, _, data in edges)
    return max_distance

def find_max_charge(edges):#on cherche la charge maximale parmi les arêtes du graphe.
    if not edges:
        return 0
    max_charge = max(data['charge'] for _, _, data in edges)
    return max_charge

def normalized_graph_edges(list_of_edges , max_distance , max_charge , startindex , endindex):
    #normalise les attributs des arêtes du graphe entre startindex et endindex
    for i in range(startindex , endindex):
        list_of_edges[i][2]['distance'] = round(list_of_edges[i][2]['distance'] / max_distance, 4)#la fonction round()our arrondir à 4 chiffres après la virgule le résultat de la division
        list_of_edges[i][2]['charge'] = round(list_of_edges[i][2]['charge'] / max_charge, 4)
    
def normalize_graph_attributes(G,edges_per_thread=150 ):# normalise les attributs des arêtes du graphe avec des threads.
    normalized_graph = G.copy()#pour créer une copie indépendante du graphe G , pour normaliser les attributs des arêtes du graphe sans modifier le graphe d'origine.
    
    with ThreadPoolExecutor() as executor:#est une classe de concurrent.futures , permettant l'exécution simultanée via un pool de threads.
        future_distances = executor.submit(find_max_distance, G.edges(data=True))
        future_charges = executor.submit(find_max_charge, G.edges(data=True))
    
     # récupération des résultats
    max_distance = future_distances.result()
    max_charge = future_charges.result()
    
    list_of_edges = list(normalized_graph.edges(data=True))  # liste des arêtes du graphe normalisé
    
    # on calcul du nombre d'arêtes et du nombre de threads nécessaires
    num_edges = len(list_of_edges)    
    nbth = num_edges // edges_per_thread 
    if nbth == 0:
        nbth = 1

    # Création et démarrage des threads pour la normalisation
    threads = []
    for i in range(nbth):
        
        startindex = i * edges_per_thread 
        endindex = startindex + edges_per_thread 
        if i == nbth - 1:
            endindex = num_edges
        th = threading.Thread(target=normalized_graph_edges, args=(list_of_edges , max_distance , max_charge,
                                                                   startindex,endindex))
        th.start()
        threads.append(th)

    # on va attendre que tous les threads aient terminé 
    for th in threads:
        th.join()
        
    #création d'un nouveau graphe normalisé à partir de la liste des arêtes    
    new_normalized_graph = nx.Graph()    
    new_normalized_graph.add_edges_from(list_of_edges)
    return new_normalized_graph  

# 5 - Enregistrement des informations du graphe


Ensuite,on va sauvegarder les informations(source,target,distance,charge) sous forme de fichier CSV ,en utilisant les threads.


Chaque thread écrire une partie des informations du graphe dans le fichier CSV

In [None]:
def write_edge_info(list_of_edges , writer,startindex,endindex):# cette fonction pour écrire les informations sur les arêtes dans un fichier CSV.
    for i in range(startindex,endindex):
        writer.writerow({'source': list_of_edges[i][0], 'target': list_of_edges[i][1], 
                         'distance': list_of_edges[i][2]['distance'], 'charge': list_of_edges[i][2]['charge']})
    
def save_graph_info(G,edges_per_thread=150 ):#on va sauvegarder les informations sur les arêtes du graphe dans un fichier CSV.
    edges_file = "graph_info.csv"#nom du fichier CSV
    with open(edges_file, 'w', newline='') as csvfile: # ouverir du fichier CSV en mode écriture
        fieldnames = ['source', 'target', 'distance', 'charge'] # on adéfinir des noms de colonnes dans le fichier CSV
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames) # on a créer l'objet DictWriter pour écrire dans le fichier CSV
        writer.writeheader() # écriture de l'en-tête du fichier CSV       
        list_of_edges = list(G.edges(data=True)) # pour récupérer la liste des arêtes du graphe avec leurs données
        num_edges = len(list_of_edges) # pour calculer du nombre total d'arêtes dans le graphe
        
        # on va calculer le nombre de threads nécessaires 
        nbth = num_edges // edges_per_thread 
        if nbth == 0:
            nbth = 1
        threads = []  # liste pour stocker les threads

        # création et démarrage des threads pour écrire les sections d'arêtes dans le fichier CSV
        for i in range(nbth):
            startindex = i * edges_per_thread 
            endindex = startindex + edges_per_thread 
            if i == nbth - 1:
                endindex = num_edges
            # on créer d'un thread qui utilise la fonction write_edge_info
            th = threading.Thread(target=write_edge_info, args=(list_of_edges , writer,startindex,endindex))
            th.start()
            threads.append(th)

        # on va attendre que tous les threads aient terminé 
        for th in threads:
            th.join()

# 6 -  Algorithme de recherche du coucou



Alors, maintenant, pour trouver un chemin plus court ou bien le chemin optimal, on applique l'algorithme de recherche du coucou

In [None]:
def calculate_total_distance(graph, path):#calcule la distance totale le long du path
        #source_node est source
        #target_node est distination 
        total_distance = 0
        for i in range(len(path) - 1):
            source_node = path[i]
            target_node = path[i + 1]
            total_distance += graph[source_node][target_node]['distance']
        
        return total_distance 
                             
def calculate_total_charge(graph, path):#calcule la charge totale le long du path
        total_charge = 0
        for i in range(len(path) - 1):
            source_node = path[i]
            target_node = path[i + 1]
            total_charge += graph[source_node][target_node]['charge']
        
        return total_charge 
                             

def evaluate_fitness(graph, path):#évalue la forme physique d'un chemin dans le graphe.
    
    with ThreadPoolExecutor() as executor:
        future_distances = executor.submit(calculate_total_distance, graph,path)
        future_charges = executor.submit(calculate_total_charge, graph,path)
  
    total_distance  = future_distances.result()
    total_charge = future_charges.result()                                    
    return total_distance + total_charge


def k_shortest_paths(G, source, target, k, weight=None):#pour trouve les k plus courts chemins entre la source et la cible dans le graphe.
    try :
        return list(
        islice(nx.shortest_simple_paths(G, source, target, weight=weight), k)
        )
    except nx.NetworkXNoPath: #si aucun chemin n'existe
            return [()]  

def cuckoo_generation(graph,population,not_used_population,n,Pa,shared_best_paths,startindex,endindex):# Génère une nouvelle population d'oiseaux coucous.
    for i in range(startindex , endindex) :

        # évaluer fitness pour chaque chemin de coucou.
        fitness_list = [evaluate_fitness(graph, path) for path in population]

        # trouver l'indice du meilleur coucou
        best_cuckoo_index = np.argmin(fitness_list)
        best_path = population[best_cuckoo_index]
        best_fitness = fitness_list[best_cuckoo_index]

            
        if not_used_population :
            # ici nous allons générer une nouvelle solution de coucou (œuf)
            new_cuckoo = random.choice(not_used_population)
            
            #on va calculer fitness de la nouvelle solution
            new_cuckoo_fitness = evaluate_fitness(graph, new_cuckoo)
            
            # choisissez un nid au hasard (un nid étant une solution possible sélectionnée au hasard dans la population).
            random_nest_index = random.randint(0, n - 1)

            # compare la valeur fitness entre le nouvel œuf coucou et celui choisi aléatoirement dans le nid, puis remplace la solution si la nouvelle est meilleure.
            if new_cuckoo_fitness < fitness_list[random_nest_index]:
                population[random_nest_index] = new_cuckoo
                not_used_population.remove(new_cuckoo)

            # au cas où le nombre d'œufs dans le nid augmente, abandonnez une fraction Pa des pires nids. Cela vise à imiter le fait qu'il existe une probabilité Pa pour que l'oiseau hôte découvre l'œuf du coucou.
            if len(population) > n:
                num_to_abandon = int(Pa * n)
                indices_to_abandon = np.argpartition(fitness_list, num_to_abandon)[:num_to_abandon]
                for index in indices_to_abandon:
                    population[index] = random.choice(not_used_population)
                    
        # mettre à jour la valeur de la liste partagée avec le dernier meilleur chemin
        shared_best_paths[0] = best_path

def cuckoo_search(graph, source, target, population_size=50, generations=300, Pa=0.25,iterations_per_thread=100):
    
    start_time = time ()

    # nbr de solution avec laquelle nous allons travailler
    n = population_size
    max_iterations = generations
    # le nombre de solutions que nous utiliserons
    nbr_population = generations * 3

    all_population = k_shortest_paths(G, source, target, nbr_population)
    if( all_population == [()]):
        end_time = time()
        return [()],[] , end_time-start_time


    # nous travaillerons avec 50 solutions possibles
    population = all_population[:n]
    # le reste des solutions possibles, nous travaillerons avec eux en générant de nouvelles solutions à partir d'eux.
    not_used_population = all_population[n:]

    best_path = None
    best_fitness = float('-inf')

    threads = []
    manager = Manager()
    shared_best_paths = manager.list([None])
    
    nbth = max_iterations // iterations_per_thread
    if nbth == 0:
        nbth = 1
    for i in range(nbth):
        
        startindex = i * iterations_per_thread
        endindex = startindex + iterations_per_thread
        if i == nbth - 1:
            endindex = max_iterations
        th = threading.Thread(target=cuckoo_generation, args=(graph,population,not_used_population,n,Pa,shared_best_paths,
                                                             startindex , endindex))
        th.start()
        threads.append(th)
        
    for th in threads:
        th.join()
        
    #obtenir le dernier best_path trouvé
    best_path = shared_best_paths[0]   

    # convertir le meilleur chemin en une liste de tuples de bord
    best_path_edges = [(best_path[i], best_path[i + 1]) for i in range(len(best_path) - 1)]
    
    end_time = time()
    execution_time = end_time-start_time
    return best_path_edges , best_path , execution_time

# 7 -  Configuration des threads


Il ne nous reste plus qu'à configurer les paramètres pour les threads

In [None]:
# le nombre de nœuds par thread à 100
nodes_per_thread = 100
# le nombre d'arêtes par thread à 200
edges_per_thread = 200
# le nombre d'itérations par thread à 100
iterations_per_thread = 100
# la probabilité de connexion des nœuds
probability = 0.1

G = None

On initialise de l'API pour générer un graphe aléatoire

In [None]:
@app.route('/get_graph', methods=['POST'])
def get_graph_api():
    
    data = request.get_json()# extraire les données JSON .
    nodes = int(data)# convertir le nombre de nœuds en entier.
    global G
    G = generate_random_graph_with_threads(nodes, probability)# générer un graphe aléatoire avec des threads.
    fig_data = get_fig(G)# obtenir les données de la figure pour le graphe généré

    return jsonify(fig_data) # renvoyer les données de la figure en tant que réponse JSON.


Enfin, pour obtenir un graphe avec le meilleur chemin trouvé par l'algorithme coucou ,on initialise de l'API Flask

In [None]:
@app.route('/get_graph_with_path', methods=['POST'])
def get_graph_path_api():# API pour obtenir un graphe avec le meilleur chemin trouvé par l'algorithme coucou.
    
    data = request.get_json()  # extraire les données JSON de la requête.  
    source = data.get('source', None) # obtenir le nœud source des données.
    destination = data.get('destination', None)# obtenir le nœud de destination des données.

    if source is not None and destination is not None:
        source_node = int(source) # convertir la source en entier.
        target_node = int(destination) # convertir la destination en entier.      
        global G
        # si le graphe n'a pas encore été généré
        if G == None:
            return jsonify({"path_exist": 0, "path_data": "Please generate a graph first!!"})
        
        # normaliser les attributs du graphe G
        normlized_graph = normalize_graph_attributes(G) 

        # on applique l'algorithme coucou pour trouver le meilleur chemin
        best_pathtuple ,best_path , execution_time = cuckoo_search(normlized_graph, source_node, target_node,iterations_per_thread)
        
        if(best_path != []):
            print("############################################## Best path ##############################################")
            print(best_pathtuple)
            # on calcule la distance totale et la charge totale du meilleur chemin
            total_distance = calculate_total_distance(G, best_path)
            total_charge = calculate_total_charge(G, best_path)
            print("Total distance : ", total_distance)
            print("Total charge : ", total_charge)
            best_path_text = f'<br>Best Path from Node ID: {source_node} to Node ID: {target_node} is {best_path}. Total distance is = {total_distance} and consumed charge is = {total_charge}.'
            best_path_info = f'Execution Time of the Algorithm is: {execution_time:.4f} seconds.'

            # Obtenir les données pour le graphe avec le meilleur chemin
            fig_data_with_path = get_fig_with_path(G, best_path_text, best_path_info, best_pathtuple)

            return jsonify({"path_exist": 1, "path_data": fig_data_with_path})
        else :
            #si aucune arête entre la source et la destination
            no_path_message = f'There is no path between  {source_node} and {target_node}'
            print(no_path_message)
            return jsonify({"path_exist": 0, "path_data": no_path_message})
            #pour l'affichage de temps d'exécution
        print("The execution time of the research phase is {:.4f} seconds".format(execution_time))
        
    else:
        #si la source et la destination ne sont pas fournies en tant qu'entiers.
        return jsonify({'error': 'Source and destination must be provided as integers.'}), 400

In [None]:
# Le serveur de Backend va utiliser le port 5200 pour recevoir les requêtes HTTP envoyées par notre application web
if __name__ == '__main__':
    app.run(port=5200)