# Tarea 2

## Problema 1

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=01b4cb331a0f04c4d9c9d1ccbd8af851bb0f0bad10d7b72b39b62359ff2b3b91
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark import SparkContext

# Paso 1: Preparar RDD con nodos y Page Ranks iniciales
sc = SparkContext("local", "PageRank")
nodes = [1, 2, 3, 4]
num_nodes = len(nodes)
initial_page_rank = 1.0 / num_nodes
node_ranks = sc.parallelize([(node, initial_page_rank) for node in nodes])

# Paso 2: Crear diccionario de vecinos para cada nodo
edges = [(1, 2), (2, 3), (2, 4), (3, 2)]
node_neighbors = sc.parallelize(edges).groupByKey().collectAsMap()

# Función para preparar los mensajes que cada nodo enviará
def prepare_messages(node_rank):
    node, rank = node_rank
    neighbors = node_neighbors.get(node, [])
    num_neighbors = len(neighbors)
    if num_neighbors == 0:
        return []
    rank_per_neighbor = rank / num_neighbors
    return [(neighbor, rank_per_neighbor) for neighbor in neighbors]

# Función para realizar el intercambio de mensajes y merge de los recibidos
def exchange_messages(node_ranks):
    messages = node_ranks.flatMap(prepare_messages)
    node_ranks = messages.reduceByKey(lambda x, y: x + y)
    return node_ranks

# Función para actualizar el valor de PageRank para cada nodo considerando el damping factor
def update_page_rank(node_rank):
    node, rank = node_rank
    new_rank = (rank * damping_factor) + ((1 - damping_factor) / num_nodes)
    return (node, new_rank)

# Parámetros de PageRank
damping_factor = 0.85  # Factor de amortiguación (d)
max_iterations = 25  # Número máximo de iteraciones
min_diff = 0.0001  # Diferencia mínima entre iteraciones para detenerse

# Paso 3 y 4: Iterar intercambio de mensajes, merge y actualización del valor de PageRank
for iteration in range(max_iterations):
    # Intercambio de mensajes y merge
    node_ranks = exchange_messages(node_ranks)

    # Actualización del valor de PageRank de cada nodo
    node_ranks = node_ranks.map(update_page_rank)

    # Mostrar los mensajes recibidos en cada iteración
    print(f"Iteration {iteration + 1}:")
    for node_rank in node_ranks.collect():
        print(node_rank)

    # Verificar la diferencia entre iteraciones
    if iteration > 0:
        diff = node_ranks.join(prev_node_ranks).map(lambda x: abs(x[1][0] - x[1][1])).sum()
        if diff < min_diff:
            break

    # Guardar el valor de PageRank para la siguiente iteración
    prev_node_ranks = node_ranks

# Mostrar los nodos y sus valores finales de Page Rank
print("Final Page Ranks:")
for node_rank in node_ranks.collect():
    print(node_rank)

Iteration 1:
(2, 0.4625)
(3, 0.14375)
(4, 0.14375)
Iteration 2:
(3, 0.2340625)
(4, 0.2340625)
(2, 0.15968749999999998)
Iteration 3:
(2, 0.23645312500000001)
(3, 0.1053671875)
(4, 0.1053671875)
Iteration 4:
(3, 0.13799257812500001)
(4, 0.13799257812500001)
(2, 0.127062109375)
Iteration 5:
(2, 0.15479369140625002)
(3, 0.09150139648437501)
(4, 0.09150139648437501)
Iteration 6:
(3, 0.10328731884765627)
(4, 0.10328731884765627)
(2, 0.11527618701171877)
Iteration 7:
(2, 0.12529422102050783)
(3, 0.08649237947998048)
(4, 0.08649237947998048)
Iteration 8:
(3, 0.09075004393371583)
(4, 0.09075004393371583)
(2, 0.11101852255798342)
Iteration 9:
(2, 0.11463753734365846)
(3, 0.08468287208714295)
(4, 0.08468287208714295)
Iteration 10:
(3, 0.08622095337105484)
(4, 0.08622095337105484)
(2, 0.10948044127407151)
Iteration 11:
(2, 0.11078781036539662)
(3, 0.0840291875414804)
(4, 0.0840291875414804)
Iteration 12:
(3, 0.08458481940529357)
(4, 0.08458481940529357)
(2, 0.10892480941025834)
Iteration 13:
(2, 0