# <img style="float: left; padding-right: 0.7cm; width: 3.0cm" src="https://upload.wikimedia.org/wikipedia/commons/thumb/8/84/Escudo_de_la_Pontificia_Universidad_Cat%C3%B3lica_de_Chile.svg/179px-Escudo_de_la_Pontificia_Universidad_Cat%C3%B3lica_de_Chile.svg.png">IIC2440 - Procesamiento de Datos Masivos
Pontificia Universidad Católica de Chile<br>
Semestre 2023-1<br>
Profesores: Adrián Soto y Juan Reutter <br>
Estudiantes: Eduardo Alvarez F y Javier Arriagada S

----

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=03653326e9666778705b13b47285d9c1c0f9b2f8727165c657538bce219d4ae1
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark import SparkConf, SparkContext
import numpy as np

# 1. Page Rank

In [None]:
conf = SparkConf().setMaster("local").setAppName("PageRank")
sc = SparkContext(conf = conf)

In [None]:
# Nodos iniciales
nodes = [1, 2, 3, 4]
edges = [(1,2), (2, 3), (2, 4), (3, 2)]

In [None]:
# Crear RDD para nodos y edges
nodes_rdd = sc.parallelize(nodes)
edges_rdd = sc.parallelize(edges)

n  = int(nodes_rdd.count())

# Inicializar PageRank para cada nodo
initial_pagerank = 1/n
node_pageranks_rdd = nodes_rdd.map(lambda node: (node, initial_pagerank))

# Mostrar el RDD con los PageRanks iniciales
node_pageranks_rdd.collect()

[(1, 0.25), (2, 0.25), (3, 0.25), (4, 0.25)]

In [None]:
# Obtener los vecinos de cada nodo
node_neighbors_rdd = edges_rdd.groupByKey()

In [None]:
def prepare_message(node, neighbors, pagerank):
  n = len(neighbors)
  messages = list()
  if n > 0:
    for neighbor in neighbors:
      message = (neighbor, pagerank / n)
      messages.append(message)

  return messages

In [None]:
messages_rdd = node_pageranks_rdd.join(node_neighbors_rdd).flatMap(lambda node: prepare_message(node[0], node[1][1], node[1][0])).collect()

incoming = list(set([message[0] for message in messages_rdd]))
non_incoming = [node for node in nodes if node not in incoming]

for node in non_incoming:
  messages_rdd.append((node, 0))

messages_rdd = sc.parallelize(messages_rdd)

In [None]:
node_received_messages_rdd = messages_rdd.reduceByKey(lambda x, y: x + y)
node_received_messages_rdd.collect()

[(3, 0.125), (4, 0.125), (2, 0.5), (1, 0)]

In [None]:
damping_factor = 0.85

def update_pagerank(node, pagerank, damping_factor, messages, n):
    updated_pagerank = (1 - damping_factor)/n + damping_factor * messages
    return (node, updated_pagerank)

node_updated_pageranks_rdd = node_received_messages_rdd.join(node_pageranks_rdd) /.map(lambda node: update_pagerank(node[0], node[1][1], damping_factor, node[1][0], n))

In [None]:
node_updated_pageranks_rdd.collect()

[(4, 0.14375), (2, 0.4625), (3, 0.14375), (1, 0.037500000000000006)]

In [None]:
max_iterations = 15
epsilon = 0.0001

for iteration in range(max_iterations):

  messages_rdd = node_pageranks_rdd.join(node_neighbors_rdd).flatMap(lambda node: prepare_message(node[0], node[1][1], node[1][0])).collect()
  incoming = list(set([message[0] for message in messages_rdd]))
  non_incoming = [node for node in nodes if node not in incoming]
  for node in non_incoming:
    messages_rdd.append((node, 0))
  messages_rdd = sc.parallelize(messages_rdd)

  node_received_messages_rdd = messages_rdd.reduceByKey(lambda x, y: x + y)
  node_pageranks_rdd = node_received_messages_rdd.join(node_pageranks_rdd).map(lambda node: update_pagerank(node[0], node[1][1], damping_factor, node[1][0], n))

  if iteration > 0:
        prev_page_ranks_rdd = pageranks_rdd
        diff = prev_page_ranks_rdd.join(node_pageranks_rdd).mapValues(lambda ranks: abs(ranks[0] - ranks[1])).values().sum()

        if diff < epsilon:
          break

  pageranks_rdd = node_pageranks_rdd

In [None]:
final_page_ranks = pageranks_rdd.collect()
for node, rank in final_page_ranks:
    print("Node: {}, PageRank: {}".format(node, rank))

Node: 1, PageRank: 0.037500000000000006
Node: 2, PageRank: 0.15875678790883002
Node: 3, PageRank: 0.10489912403485036
Node: 4, PageRank: 0.10489912403485036


# 2. Single Source Shortest Path

In [None]:
nodes = [1, 2, 3, 4]
edges = [(1,2,2), (2, 3, 10), (2, 4, 5), (3, 2, 9)]

nodes_sssp_rdd = sc.parallelize(nodes)
edges_sssp_rdd = sc.parallelize(edges)

In [None]:
def initial_values(node, initial_node):
  if node != initial_node:
    return (node, float('inf'))
  return (node, 0)

In [None]:
initial_node = 2
node_sssp_rdd = nodes_sssp_rdd.map(lambda node: initial_values(node, initial_node))

In [None]:
node_sssp_rdd.collect()

[(1, inf), (2, 0), (3, inf), (4, inf)]

In [None]:
def prepare_message(edge):
    src, dest, cost = edge
    return [(dest, current_cost + cost) for (node, current_cost) in actual_costs_list if node == src and current_cost != float('inf')]

def update_costs(node, costs):
    current_cost = costs
    if type(costs) == list():
      current_cost = float('inf')
      for cost in costs:
          if cost < current_cost:
              current_cost = cost
    return (node, current_cost)

In [None]:
edges_sssp_rdd.collect()

[(1, 2, 2), (2, 3, 10), (2, 4, 5), (3, 2, 9)]

In [None]:
messages = edges_sssp_rdd.flatMap(prepare_message)
messages.collect()

[(3, 10), (4, 5)]

In [None]:
actual_costs = node_sssp_rdd
actual_costs.collectAsMap()

{1: inf, 2: 0, 3: inf, 4: inf}

In [None]:
max_iterations = 15
iteration = 0
convergence_threshold = 2
consecutive_no_change = 0
actual_costs_list = node_sssp_rdd.collect()
actual_costs = node_sssp_rdd
while consecutive_no_change < convergence_threshold and iteration < max_iterations:
    # Genera los mensajes a partir de las aristas y los costos acumulados actuales
    messages = edges_sssp_rdd.flatMap(prepare_message)

    # Une los mensajes con los costos acumulados actuales y actualiza los costos
    actual_costs = actual_costs.union(messages).reduceByKey(min).map(lambda x: update_costs(x[0], x[1]))
    actual_costs_list = actual_costs.collect()

    # Verifica si ha habido cambios en los costos acumulados
    previous_costs = actual_costs.collectAsMap()
    if previous_costs == actual_costs.collectAsMap():
        consecutive_no_change += 1
    else:
        consecutive_no_change = 0

    iteration += 1

In [None]:
actual_costs.collect()

[(3, 10), (4, 5), (1, inf), (2, 0)]

# 3. Una estrategia general

1. Preparación de los nodos:

Inicializar las propiedades de los nodos, como el valor de PageRank o la distancia más corta en el caso de SSSP.
Asignar un identificador único a cada nodo del grafo.

2. Regla para pasar mensajes entre nodos:

En cada iteración del algoritmo, cada nodo envía mensajes a sus vecinos para compartir información.
Los mensajes pueden contener información relevante para el cálculo, como el valor de PageRank actualizado o la distancia más corta encontrada.
Los nodos reciben los mensajes enviados por sus vecinos y los utilizan para actualizar su información local.

3. Definición de funciones para hacer merge de varios mensajes:

Los nodos pueden recibir múltiples mensajes de diferentes vecinos en una iteración.
Se debe definir una función de merge que combine los mensajes recibidos en uno solo, considerando la información relevante para el cálculo.
Por ejemplo, en el caso de PageRank, los mensajes contienen contribuciones de los vecinos al PageRank del nodo receptor, y la función de merge sumaría estas contribuciones para obtener el nuevo valor de PageRank.

4. Actualización de las propiedades de los nodos:

Después de cada iteración, los nodos actualizan sus propiedades locales con la información recibida de los mensajes.
Por ejemplo, en el caso de PageRank, los nodos actualizan su valor de PageRank con la suma de las contribuciones recibidas de los vecinos.

5. Condiciones de término del algoritmo:

Se debe establecer un criterio de convergencia para determinar cuándo el algoritmo ha terminado.
Por ejemplo, en el caso de PageRank, se puede definir un umbral de cambio máximo en los valores de PageRank de los nodos.
Si la diferencia entre los valores de PageRank de dos iteraciones consecutivas es menor que el umbral, se considera que el algoritmo ha convergido y se detiene.




