In [2]:
import networkx as nx
import random
import string
from pyspark import SparkContext

# Crear el grafo

Este grafo G contiene los pesos de ir de una nodo a otro

In [3]:
G = nx.DiGraph()
G.add_weighted_edges_from([
    ("A", "B", 3.0), ("A", "C", 10.0), ("A", "E", 4.0),
    ("B", "C", 2.0), ("B", "D", 8.0),  ("B", "F", 7.0),
    ("C", "D", 5.0), ("C", "G", 3.0),
    ("D", "H", 6.0),
    ("E", "F", 2.0), ("E", "I", 9.0),
    ("F", "G", 1.0), ("F", "J", 5.0),
    ("G", "H", 2.0), ("G", "K", 4.0),
    ("I", "J", 3.0),
    ("J", "K", 6.0)
])

Se modifica el grafo para poder ustilizarlo en Spark. Para cada nodo se crea una lista con los nodos a los que puede llegar (y lo que le cuesta).

In [4]:
pyspark_graph = []

for node in G.nodes():
    neighbors = [(nbr, G.edges[node, nbr]["weight"]) for nbr in G.successors(node)]
    if node == "A":
        weight = 0
    else:
        weight = float("inf")
    pyspark_graph.append(
        (node, (neighbors, weight, False, []))
    )

In [5]:
for city in pyspark_graph:
    print(city)

('A', ([('B', 3.0), ('C', 10.0), ('E', 4.0)], 0, False, []))
('B', ([('C', 2.0), ('D', 8.0), ('F', 7.0)], inf, False, []))
('C', ([('D', 5.0), ('G', 3.0)], inf, False, []))
('E', ([('F', 2.0), ('I', 9.0)], inf, False, []))
('D', ([('H', 6.0)], inf, False, []))
('F', ([('G', 1.0), ('J', 5.0)], inf, False, []))
('G', ([('H', 2.0), ('K', 4.0)], inf, False, []))
('H', ([], inf, False, []))
('I', ([('J', 3.0)], inf, False, []))
('J', ([('K', 6.0)], inf, False, []))
('K', ([], inf, False, []))


Se inicia el Spark

SparkContext iniciado.


## Algoritmo dijkstra

Funciones que permiten ejecutar el algoritmo de dijkstra

In [14]:
def dijkstra_map(node_data):
    """
    Fase MAP: Expande la frontera de Dijkstra.
    Input: (node_id, (neighbors, distance, is_active, path))
    """
    node_id, data = node_data
    neighbors, distance, is_active, path = data
    results = []
    
    # 1. Siempre emitimos el nodo actual (para no perderlo en el siguiente paso)
    # Lo marcamos como inactivo (False) porque ya lo estamos procesando ahora
    results.append((node_id, (neighbors, distance, False, path)))
    
    # 2. Si el nodo está ACTIVO (acaba de mejorar su distancia), propagamos a los vecinos
    if is_active and distance < float('inf'):
        for neighbor_id, weight in neighbors:
            new_distance = distance + weight
            new_path = path + [node_id]
            
            # Notifica al vecino de una nueva distancia
            results.append((neighbor_id, ([], new_distance, True, new_path)))
            
    return results

def dijkstra_reduce(data1, data2):
    """
    Fase REDUCE: Selecciona el mejor camino (Greedy).
    """
    neighbors1, dist1, active1, path1 = data1
    neighbors2, dist2, active2, path2 = data2
    
    # Preservar la lista de vecinos (uno tendrá la lista real, el otro estará vacío)
    neighbors = neighbors1 if neighbors1 else neighbors2
    
    # Quedarse con la distancia mínima
    if dist1 < dist2:
        return (neighbors, dist1, active1, path1)
    elif dist2 < dist1:
        return (neighbors, dist2, active2, path2)
    else:
        # Si son iguales, combinamos el estado activo
        return (neighbors, dist1, active1 or active2, path1)

Ejecución del algoritmo de dijkstra. Se buscan las rutas óptimas a todos los nodos desde A.

In [15]:
start_node = "A"

# Modificamos tu lista 'pyspark_graph' para activar solo el nodo origen
processed_graph = []
for node, (nbrs, dist, _, path) in pyspark_graph:
    # Solo el nodo "A" empieza activo (True)
    is_active = (node == start_node)
    processed_graph.append((node, (nbrs, dist, is_active, path)))

# CREAR RDD
rdd = sc.parallelize(processed_graph)

# BUCLE DIJKSTRA
iteration = 0
while True:
    iteration += 1
    
    # Contamos cuántos nodos necesitan procesarse
    num_active_nodes = rdd.filter(lambda x: x[1][2]).count()
    
    print(f"Iteración {iteration}: Procesando {num_active_nodes} nodos activos...")
    
    # SI NO HAY NODOS ACTIVOS, EL ALGORITMO HA TERMINADO
    if num_active_nodes == 0:
        break
        
    # Ejecutamos Map y Reduce
    rdd = rdd.flatMap(dijkstra_map).reduceByKey(dijkstra_reduce)
    
    # Cacheamos para no recalcular todo el historial en la siguiente vuelta
    rdd.cache()

print("\n¡Algoritmo finalizado con éxito!")

Iteración 1: Procesando 1 nodos activos...
Iteración 2: Procesando 3 nodos activos...
Iteración 3: Procesando 5 nodos activos...
Iteración 4: Procesando 5 nodos activos...
Iteración 5: Procesando 2 nodos activos...
Iteración 6: Procesando 0 nodos activos...

¡Algoritmo finalizado con éxito!


Se muestran los resultados. La columna ciudad representa la ciudad destino desde la ciudad A, la distancia la suma de los costes de la ruta óptima, y la ruta óptima muestra los nodos que se recorren desde A hasta llegar al nodo destino con el menor coste posible.

In [16]:
results = rdd.collect()
# Ordenamos alfabéticamente
sorted_results = sorted(results, key=lambda x: x[0])

print(f"{'CIUDAD':<8} | {'DISTANCIA':<10} | {'RUTA ÓPTIMA'}")
print("-" * 50)

for node, (nbrs, dist, _, path) in sorted_results:
    if dist == float('inf'):
        route = "Inalcanzable"
    else:
        # Añadimos el nodo final para completar la visualización
        route = " -> ".join(path + [node])
        
    print(f"{node:<8} | {str(dist):<10} | {route}")

CIUDAD   | DISTANCIA  | RUTA ÓPTIMA
--------------------------------------------------
A        | 0          | A
B        | 3.0        | A -> B
C        | 5.0        | A -> B -> C
D        | 10.0       | A -> B -> C -> D
E        | 4.0        | A -> E
F        | 6.0        | A -> E -> F
G        | 7.0        | A -> E -> F -> G
H        | 9.0        | A -> E -> F -> G -> H
I        | 13.0       | A -> E -> I
J        | 11.0       | A -> E -> F -> J
K        | 11.0       | A -> E -> F -> G -> K
