# Pregunta 1

Instalamos pyspark por si es necesario en google colab.

In [9]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


El código de abajo implementa las funciones necesarias para el calculo de PageRank con Spark.

**inicializar(apunta_a)**: Recibe como parametro una lista de nodos a los que apunta y devuelve un diccionario: 'apunta_a' (lista de nodos a los cuales apunta), 'length' (cantidad de nodos) y 'pr' (valor inicial de PageRank de cada nodo, en este caso segun el enunciado 1/numero_nodos).

**init_pr(dict)**: Recibe un diccionario y establece el valor de 'pr' (PageRank) en 0. Luego devuelve el diccionario modificado.

**repartir_pr(acumulado, sig_value)**: Esta función se encarga de la distribución del valor de PageRank desde un nodo original a los nodos vecinos. Luego actualiza el diccionario 'acumulado' con los nuevos valores de PageRank para los nodos de vecinos y lo devuelve.

**refresh_pr(update_target, iter_target)**: Esta función se encarga de actualizar los valores de PageRank acumulados durante una iteración. Recibe dos diccionarios: 'update_target', que contiene los valores acumulados hasta el momento, y 'iter_target', que tiene los nuevos valores de PageRank calculados en la iteración actual. Itera sobre los elementos de 'iter_target' y actualiza los valores de PageRank en 'update_target'. Si hay nodos nuevos en 'iter_target', los agrega a 'update_target'. Al final, devuelve el diccionario 'update_target' actualizado con los nuevos valores de PageRank.

In [10]:
from pyspark import SparkContext

# Restructura RDD, como (llave, {...}), tambien inicializa algunos valores, como page rank.

def inicializar(apunta_a):
    cora_node_number = 2708
    return {'apunta_a': apunta_a, 'length': len(apunta_a), 'pr': 1/cora_node_number}

# Inicializa page rank

def init_pr(dict):
    dict['pr'] = 0
    return dict

# Reparte valores

def repartir_pr(acumulado, sig_value):
    cora_node_number = 2708
    damping_factor = 0.85
    key, value = sig_value
    valor = (value['pr'] * damping_factor) / (value['length'] + 0.000000001) + ((1 - damping_factor) / cora_node_number) #El +0.0..01 es para que pyspark no tire un error.
    for puntero in value['apunta_a']: # Itera nodos conectados
        if puntero not in acumulado.keys():
            acumulado[puntero] = {'apunta_a':[], 'length': 0}
        opp_rating = acumulado.get(puntero , {}).get('pr' , 0) # Consigue pr de diccionario acumulado
        acumulado[puntero]['pr'] = opp_rating + valor # Suma valor al valor ya existente de diccionario acumulado
    return acumulado

# Actualiza Page Rank

def refresh_pr(update_target, iter_target):
    for key, value in iter_target.items():
        if key in update_target:
            update_target[key]['pr'] += value['pr']
        else:
            update_target[key] = value
    return update_target

Creamos contexto, leemos datos y los restructuramos.

In [15]:
# Contexto

try:
    sc = SparkContext.getOrCreate()
except ValueError:
    SparkContext.getOrCreate().stop()
    sc = SparkContext.getOrCreate()

# Datos

datos = sc.textFile("cora.cites")

# Restructura

citas = datos.map(lambda line: line.split("\t")).groupByKey().mapValues(inicializar)

Iteramos hasta que converja, mientras hacía pruebas me di cuenta que con 30 los valores ya cambian muy poco, en otras palabras, con 30 iteraciones converje.

In [16]:
# Iter

for iter in range(30):
    if iter > 0:
        citas = sc.parallelize(act_iter.items())
    acumulado = dict(citas.mapValues(init_pr).collect())
    act_iter = citas.aggregate(acumulado, repartir_pr, refresh_pr)

Printeamos los resultados en orden ascendiente, por lo tanto para ver el nodo con el PageRank mas grande, hay que ir al ultimo valor printeado.

In [18]:
final = [(key, value['pr']) for key, value in act_iter.items()]

print("Node : Page Rank Value")
sorted_final = sorted(final, key=lambda x: x[1], reverse=False)
for node, pr_value in sorted_final:
    print(f"{node} : {pr_value}")

Node : Page Rank Value
504 : 0
936 : 0
1237 : 0
1481 : 0
1951 : 0
2653 : 0
3222 : 0
3223 : 0
3232 : 0
3236 : 0
4584 : 0
4649 : 0
6209 : 0
6216 : 0
6318 : 0
6343 : 0
6346 : 0
6786 : 0
6814 : 0
7296 : 0
8594 : 0
8617 : 0
8619 : 0
8696 : 0
8872 : 0
11148 : 0
11335 : 0
12194 : 0
12238 : 0
13205 : 0
13213 : 0
13966 : 0
13972 : 0
14062 : 0
14090 : 0
14428 : 0
14529 : 0
14531 : 0
15987 : 0
16008 : 0
16437 : 0
17201 : 0
17488 : 0
17821 : 0
18313 : 0
18536 : 0
18770 : 0
18773 : 0
20821 : 0
22386 : 0
24530 : 0
24966 : 0
24974 : 0
25791 : 0
27203 : 0
27243 : 0
27246 : 0
27623 : 0
28385 : 0
28487 : 0
28489 : 0
28542 : 0
28649 : 0
28957 : 0
29492 : 0
29738 : 0
30817 : 0
31055 : 0
31083 : 0
32083 : 0
42221 : 0
45533 : 0
45603 : 0
47682 : 0
50807 : 0
50838 : 0
51049 : 0
51052 : 0
51879 : 0
51934 : 0
56708 : 0
57119 : 0
58453 : 0
59626 : 0
59798 : 0
61312 : 0
63812 : 0
66564 : 0
66596 : 0
66809 : 0
68224 : 0
70281 : 0
70444 : 0
72805 : 0
73323 : 0
74821 : 0
74975 : 0
75693 : 0
75694 : 0
75983 : 0
7710