# IIC2440: Procesamiento de Datos Masivos
# Tarea 2 - Problema 2: Single Source Shortest Path

* Agustín Urrutia
* Hernán Larraín

Cuando hablamos de Single Source Shortest Path, es cuando queremos partir de un nodo y descubrir los caminos más cortos hacia todos los nodos que son alcanzables desde el nodo inicial

## **Setup**

### Librerias

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=bb7a2d7706cf09ed2a6b6169e6f8347279b8ee8dd35e9a35a1f9d2c56bcd2fc9
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .getOrCreate()

sc = spark.sparkContext
sc

In [4]:
from fractions import Fraction
import networkx as nx
import matplotlib.pyplot as plt
import random
import numpy as np

### Funciones auxiliares

In [5]:
def show_graph(edges_rdd, n_nodes:int = -1):
  # Crear un grafo
  G = nx.DiGraph()

  if n_nodes > 0:
    # Agregar nodos
    nodes = range(1, n_nodes+1)
    G.add_nodes_from(nodes)

  # Agregar aristas con costo
  edges = edges_rdd.map(lambda row: (row[0], row[1], row[2])).collect()
  edge_labels = {(row[0], row[1]): row[2] for row in edges}  # Etiquetas de aristas con costo
  G.add_weighted_edges_from(edges)

  # Posicionamiento de nodos en un círculo
  pos = nx.circular_layout(G)

  # Plotear el grafo dirigido con nodos en un círculo y etiquetas de aristas
  plt.figure(figsize=(10, 8))

  # Dibujar aristas
  nx.draw_networkx_edges(G, pos, alpha=0.5, edge_color='gray')

  # Dibujar los nodos y las etiquetas de los nodos
  nx.draw_networkx_nodes(G, pos, node_color='lightblue', node_size=600)
  nx.draw_networkx_labels(G, pos, font_size=12, font_color='black')

  # Dibujar las etiquetas de aristas
  nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color='red')

  # Mostrar el plot
  plt.axis('off')
  plt.show()


## **Desarrollo Pregunta 2**

Para la construcción de las aristas, se asume que no hay dos aristas que que comiencen en un nodo X y terminen en un mismo nodo Y.

In [6]:
def build_nodes(n_nodes:int, echo:bool = False):
  nodes = range(1, n_nodes + 1)
  if echo:
    print(f"Creating {n_nodes} nodes")

  return sc.parallelize(nodes)

def build_random_edges(n_nodes:int, n_edges:int = -1, min_cost:int = 1,
                       max_cost:int = 10, echo:bool = False):

  if n_edges < 0:
    n_edges = 2 * n_nodes

  if n_edges > (n_nodes * (n_nodes - 1)) // 2:
    raise Exception("Number of edges is not valide for the number of nodes")

  if echo:
    print(f"Creating {n_edges} edges for the {n_nodes} nodes and cost" + \
          f" between ({min_cost}, {max_cost})")

  edges = {}
  while len(edges) < n_edges:
    n_1 = random.randint(1, n_nodes)
    n_2 = random.randint(1, n_nodes)

    if n_1 == n_2:
      continue

    n_1, n_2 = min(n_1, n_2), max(n_1, n_2)

    edges[(n_1, n_2)] = (n_1, n_2, random.randint(min_cost, max_cost + 1))

  return sc.parallelize(list(edges.values()))

In [7]:
N_NODES = 100
N_EDGES = 293
nodes = build_nodes(N_NODES)
edges = build_random_edges(N_NODES, N_EDGES)

# show_graph(edges)

## Funciones

In [8]:
def initial_costs(initial_node, nodes, echo=False):
  costs = nodes.map(lambda x: (x, 0 if x == initial_node else np.inf))

  if echo:
    for element in costs.collect():
      print(element)

  return costs

In [9]:
def update_costs(costs, edges, echo=False):
  distance_1 = edges.map(lambda row: (row[0], (row[1], row[2])))
  distance_2 = edges.map(lambda row: (row[1], (row[0], row[2])))

  # distancia de A y B para todos los nodos contiguos: (nodo A, (nodo B, distancia A-B))
  distance = distance_1.union(distance_2)

  # (nodo A, (nodo B, distancia A-B), distancia A-0)
  new_distance = distance.join(costs)\
                      .map(lambda row: (row[1][0][0], row[1][0][1] + row[1][1]))\
                      .reduceByKey(lambda x, y: min(x, y))

  new_costs = costs.leftOuterJoin(new_distance)\
                 .map(lambda row: (row[0], min(row[1][0], row[1][1] if row[1][1] is not None else np.inf)))
  if echo:
    for element in new_costs.sortByKey().collect():
      print(element)

  return new_costs

In [10]:
def check_convergence(current_costs, echo=False):
  iteration = 0

  while True:
    if echo:
      print(f"iteration {iteration}")
    upd_costs = update_costs(current_costs, edges)

    change_count = current_costs.join(upd_costs)\
                              .map(lambda row: 0 if row[1][0] == row[1][1] else 1)\
                              .sum()
    if change_count == 0:
        break

    current_costs = upd_costs
    iteration += 1

  if echo:
    print(current_costs.collect())
  return current_costs

## Ejecución

In [13]:
initial_node = 37

In [14]:
costs = initial_costs(initial_node, nodes)
check_convergence(costs, True)

iteration 0
iteration 1
iteration 2
iteration 3
iteration 4
iteration 5
iteration 6
iteration 7
[(1, 15), (2, 11), (3, 15), (4, 10), (5, 8), (6, 13), (7, 11), (8, 13), (9, 9), (10, 23), (11, 8), (12, 14), (13, 15), (14, 17), (15, 2), (16, 13), (17, 18), (18, 16), (19, 16), (20, 14), (21, 17), (22, 16), (23, 12), (24, 6), (25, 14), (26, 13), (27, 15), (28, 10), (29, 20), (30, 11), (31, 18), (32, 18), (33, 7), (34, 14), (35, 10), (36, 11), (37, 0), (38, 7), (39, 18), (40, 17), (41, 10), (42, 10), (43, 23), (44, 15), (45, 12), (46, 17), (47, 15), (48, 11), (49, 6), (50, 12), (51, 11), (52, 9), (53, 15), (54, 11), (55, 14), (56, 8), (57, 12), (58, 16), (59, 19), (60, 10), (61, 12), (62, 10), (63, 15), (64, 13), (65, 19), (66, 11), (67, 16), (68, 21), (69, 12), (70, 12), (71, 14), (72, 12), (73, 14), (74, 15), (75, 14), (76, 11), (77, 18), (78, 20), (79, 8), (80, 14), (81, 1), (82, 9), (83, 13), (84, 9), (85, 19), (86, 9), (87, 16), (88, 15), (89, 12), (90, 13), (91, 13), (92, 16), (93, 12)

PythonRDD[264] at collect at <ipython-input-10-b09bbea972fe>:19