In [65]:
!pip install pyspark



In [66]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc

[link text](https://)### 1) Problema 1:

#### Para facilitarnos el trabajo de los nodos en RDD, definimos nuestro grafo como una lista de tuplas donde cada tupla es un nodo con una lista de los nodos a los que se conecta.

In [67]:
# Lista de tuplas (nodo, vecinos)
lista_tuplas = [(1, [2]), (2, [3, 4]), (3, [2]), (4, [])]
#lista_tuplas = [(1, [2]), (2, [3]), (3, [4]), (4, [5]), (5, [6]), (6, [1, 3])]

# Creamos un RDD con las tuplas (nodo, vecinos)
graph = sc.parallelize(lista_tuplas)

# Cantidad de nodos
cant_nodos = len(lista_tuplas)

# Valor inicial del page rank
initial_value = 1 / cant_nodos

# Inicializamos el page rank de cada nodo
nodes = graph.keys().distinct()
ranks = nodes.map(lambda node: (node, initial_value))

print(ranks.collect())

[(2, 0.25), (4, 0.25), (1, 0.25), (3, 0.25)]


In [68]:
# Función que genera los mensajes a partir de un nodo y su PageRank actual
def generate_messages(node, rank):
    num_neighbors = len(graph_dict[node])  # Obtiene el número de vecinos
    if num_neighbors == 0:
        return []
    message_rank = rank / num_neighbors
    return [(neighbor, message_rank) for neighbor in graph_dict[node]]

# Se generan los mensajes y se crea un diccionario para poder trabajar los mensajes
messages = ranks.flatMap(lambda x: generate_messages(x[0], x[1]))
graph_dict = dict(graph.collect())
messages.collect()

[(3, 0.125), (4, 0.125), (2, 0.25), (2, 0.25)]

In [69]:
# Función que actualiza el valor del Page Rank
def update_rank(node, ranks):
    damping_factor = 0.85
    new_rank = damping_factor * ranks + (1 - damping_factor) / cant_nodos
    return (node, new_rank)

# Se actualiza los PageRanks utilizando los mensajes recibidos
new_ranks = messages.reduceByKey(lambda x, y: x + y).map(lambda x: update_rank(x[0], x[1]))
new_ranks.collect()

[(4, 0.14375), (2, 0.4625), (3, 0.14375)]

#### Ahora se ejecuta un "for" loop para repetir este proceso n veces.

In [70]:
max_iterations = 10
epsilon = 0.001
cant_iteraciones = 0
for iteration in range(max_iterations):

    messages = ranks.flatMap(lambda x: generate_messages(x[0], x[1]))
    collected_messages = messages.collect()
    collected_messages.append((1, 0))
    messages = sc.parallelize(collected_messages)

    new_ranks = messages.reduceByKey(lambda x, y: x + y).map(lambda x: update_rank(x[0], x[1]))

    # Calculamos la diferencia entre los PageRanks actuales y anteriores
    rank_diff = ranks.join(new_ranks).map(lambda x: abs(x[1][0] - x[1][1])).sum()

    # Actualizamos los PageRanks para la siguiente iteración
    ranks = new_ranks

    # Verifica la condición de terminación
    cant_iteraciones+=1
    if rank_diff < epsilon:
        break

In [71]:
ranks.collect()

[(4, 0.10706817498735734),
 (2, 0.15907557964792862),
 (3, 0.10706817498735734),
 (1, 0.037500000000000006)]

In [72]:
print(f'Se llego al resultado en {cant_iteraciones} iteraciones')

Se llego al resultado en 10 iteraciones


### Para probar que nuestro codigo es escalable, probaremos el el algoritmo para el grafo de "cora" de la actividad 12.

In [73]:
!pip install networkx



In [74]:
import pandas as pd
import networkx as nx

In [75]:
# IMPORTANTE: Si el archivo cora.cites esta tiene otra ruta, hay que cambiar la variable RUTA
RUTA = 'cora.cites'
citas = pd.read_csv(RUTA,sep="\t",
    header=None,
    names=["target", "source"])
G = nx.from_pandas_edgelist(citas, source="source", target="target",create_using=nx.DiGraph())

In [76]:
def transformar_lista_tuplas(graph):
  edges = sc.parallelize(graph.edges)
  nodes = sc.parallelize(graph.nodes)
  grouped_edges_rdd = edges.groupByKey()
  grouped_edges_dict = grouped_edges_rdd.collectAsMap()
  lista_tuplas_rdd = nodes.map(lambda x: (x, list(grouped_edges_dict.get(x, []))))
  lista_tuplas = lista_tuplas_rdd.collect()
  return lista_tuplas

In [77]:
lista_tuplas = transformar_lista_tuplas(G)
cant_nodos = len(lista_tuplas)
graph = sc.parallelize(lista_tuplas)
initial_value = 1 / cant_nodos
nodes = graph.keys().distinct()
ranks = nodes.map(lambda node: (node, initial_value))

In [78]:
def generate_messages(node, rank):
    num_neighbors = len(graph_dict[node])
    if num_neighbors == 0:
        return []
    message_rank = rank / num_neighbors
    return [(neighbor, message_rank) for neighbor in graph_dict[node]]

messages = ranks.flatMap(lambda x: generate_messages(x[0], x[1]))
graph_dict = dict(graph.collect())

In [79]:
def update_rank(node, ranks):
    damping_factor = 0.85
    new_rank = damping_factor * ranks + (1 - damping_factor) / cant_nodos
    return (node, new_rank)

new_ranks = messages.reduceByKey(lambda x, y: x + y).map(lambda x: update_rank(x[0], x[1]))

In [80]:
max_iterations = 10
epsilon = 0.001
cant_iteraciones = 0
for iteration in range(max_iterations):
    messages = ranks.flatMap(lambda x: generate_messages(x[0], x[1]))
    new_ranks = messages.reduceByKey(lambda x, y: x + y).map(lambda x: update_rank(x[0], x[1]))
    rank_diff = ranks.join(new_ranks).map(lambda x: abs(x[1][0] - x[1][1])).sum()
    ranks = new_ranks
    cant_iteraciones+=1
    if rank_diff < epsilon:
        break

#### Con la siguiente funcion se pueden ver cuales nodos son los que tienen mayor PageRank.

In [83]:
# cant es la variable que se refiere a la cantidad de nodos que se quieren mostrar
def top_pageranks(cant, ranks=ranks):
  top = ranks.top(cant, key=lambda x: x[1])
  return top

In [84]:
top_pageranks(10)

[(15429, 0.006969569967219403),
 (10177, 0.006736096934060898),
 (35, 0.0027045376673630046),
 (6898, 0.002092012505474962),
 (643221, 0.0016660190498489556),
 (12631, 0.0015440014715572578),
 (5348, 0.0015308550234253812),
 (210871, 0.0014942063451139683),
 (2696, 0.001397585426094191),
 (10531, 0.0012931592765430262)]