# 1. Page Rank

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=d005058ea79ce52b3dac68dadf326cf400522f762c5f4b0b2955a9a06d792db7
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark import SparkContext

In [None]:
class PageRank:
    def __init__(self, nodes, edges, damping_factor=0.85):
        # Inicializa nodos y bordes
        self.nodes = SparkContext.getOrCreate().parallelize(nodes).map(lambda node: (node, 1.0/len(nodes)))
        self.edges = SparkContext.getOrCreate().parallelize(edges).groupByKey().mapValues(list)
        self.damping_factor = damping_factor

    @staticmethod
    def prepare_messages(node):
        # Prepara los mensajes a enviar desde un nodo a sus vecinos
        node_id, (rank, neighbors) = node
        num_neighbors = len(neighbors)
        for neighbor in neighbors:
            yield (neighbor, rank / num_neighbors)

    @staticmethod
    def exchange_messages(messages):
        # Reduce los mensajes por clave (nodo de destino)
        return messages.reduceByKey(lambda a, b: a + b)

    @staticmethod
    def update_rank(node, damping_factor, num_nodes):
        # Actualiza el PageRank de un nodo
        node_id, rank = node
        updated_rank = (1 - damping_factor) / num_nodes + damping_factor * rank
        return (node_id, updated_rank)

    def calculate(self, max_iterations=10, convergence_threshold=0.01):
        nodes = self.nodes
        edges = self.edges
        damping_factor = self.damping_factor
        num_nodes = self.nodes.count()
        for i in range(max_iterations):
            prev_nodes = nodes  # Guarda los nodos de la iteración anterior
            join_nodes_edges = nodes.join(edges)  # Une los nodos y los bordes por id de nodo
            messages = join_nodes_edges.flatMap(PageRank.prepare_messages)  # Prepara y envía mensajes
            final_messages = PageRank.exchange_messages(messages)  # Reduce los mensajes por nodo de destino
            nodes = final_messages.map(lambda node: PageRank.update_rank(node, damping_factor, num_nodes))  # Actualiza los PageRank

            # Calcula la norma L1 de la diferencia en los puntajes de PageRank entre las iteraciones
            diff = (nodes.join(prev_nodes)
                      .map(lambda node: abs(node[1][0] - node[1][1]))
                      .sum())

            # Comprueba si la diferencia es menor que el umbral de convergencia
            if diff < convergence_threshold:
                print(f"La ejecución terminó por convergencia en {i+1} iteraciones.")
                break  # Si la diferencia es menor que el umbral, se detiene la iteración

        else:
            print(f"La ejecución terminó porque se alcanzó el máximo de {max_iterations} iteraciones.")

        self.nodes = nodes
        return self.nodes.collect()

In [None]:
sc = SparkContext.getOrCreate()

nodes = [1, 2, 3, 4]
edges = [(1, 2), (2, 3), (2, 4), (3, 2), (3, 1)]

pagerank = PageRank(nodes, edges)
result = pagerank.calculate(max_iterations=2, convergence_threshold=0.001)

for node in result:
    print(f"Node {node[0]} has PageRank: {node[1]}")

La ejecución terminó porque se alcanzó el máximo de 2 iteraciones.
Node 1 has PageRank: 0.09859375000000001
Node 2 has PageRank: 0.22078124999999998
Node 3 has PageRank: 0.18890624999999997
Node 4 has PageRank: 0.18890624999999997
