In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=bcc68df04c55ce9b924b1178caf8fdf50cef8a3d76f96f1f994d50e4d7f154e8
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark import SparkContext as sc
def initialize_page_ranks(nodes):
    """
    Prepara un RDD con cada nodo y su Page Rank inicial.
    """
    total_nodes = len(nodes)
    initial_rank = 1 / total_nodes
    return sc.parallelize([(node, initial_rank) for node in nodes])

def prepare_messages(links, ranks):
    """
    Prepara el mensaje que cada nodo va a enviar.
    """
    return links.join(ranks).flatMap(lambda url_urls_rank: [(dest, url_urls_rank[1][1] / len(url_urls_rank[1][0])) for dest in url_urls_rank[1][0]])

def exchange_messages(links, ranks):
    """
    Realiza el intercambio de mensajes entre nodos.
    Envía los mensajes a los nodos correspondientes y realiza el merge de los mensajes recibidos.
    Retorna un RDD que indica el mensaje final recibido para cada nodo.
    """
    contribs = prepare_messages(links, ranks)
    return contribs.reduceByKey(lambda x, y: x + y)

def update_page_ranks(messages, damping_factor):
    """
    Actualiza el valor de Page Rank para cada nodo considerando el damping factor.
    """
    return messages.mapValues(lambda rank: rank * damping_factor + (1 - damping_factor))

def page_rank(nodes, edges, num_iterations=10, damping_factor=0.85):
    """
    Calcula el Page Rank para los nodos y aristas dados.
    Itera los pasos correspondientes por un número máximo de iteraciones
    o hasta que la diferencia entre dos iteraciones del valor de Page Rank sea mínima.
    """
    links = sc.parallelize(edges).groupByKey().mapValues(list)
    ranks = initialize_page_ranks(nodes)
    previous_ranks = None

    for i in range(num_iterations):
        messages = exchange_messages(links, ranks)
        ranks = update_page_ranks(messages, damping_factor)

        # Verificar la convergencia
        if previous_ranks is not None:
            diff = ranks.join(previous_ranks).mapValues(lambda x: abs(x[0] - x[1]))
            max_diff = diff.values().max()
            if max_diff < 0.0001:  # Umbral de convergencia
                break

        previous_ranks = ranks

    return ranks

# Supongamos que tenemos los siguientes nodos y aristas
nodes = [1, 2, 3, 4]
edges = [(1, 2), (2, 3), (2, 4), (3, 2)]

# Calcular el Page Rank
result = page_rank(nodes, edges)
print(result.collect())
