In [1]:
# Instalamos PySpark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=64c74749199ce235ee0e4420cb0f648670e4e9d6283bea3cc076174e5709321d
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
# En este caso, nosotros tenemos cora en nuestro google drive, ustedes deben poner su ruta para cora.cites
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import pandas as pd
# En este caso, nosotros tenemos cora en nuestro google drive, ustedes deben poner su ruta para cora.cites
df = pd.read_csv('/content/drive/MyDrive/cora.cites',sep="\t",
    header=None,
    names=["target", "source"])

In [5]:
df

Unnamed: 0,target,source
0,35,1033
1,35,103482
2,35,103515
3,35,1050679
4,35,1103960
...,...,...
5424,853116,19621
5425,853116,853155
5426,853118,1140289
5427,853155,853118


In [6]:
# Vamos a tomar como que todas las aristas tienen pesos aleatorios entre 1 y 30
import numpy as np

df['weight'] = np.random.randint(1, 31, size=len(df))

In [7]:
df

Unnamed: 0,target,source,weight
0,35,1033,21
1,35,103482,6
2,35,103515,12
3,35,1050679,1
4,35,1103960,25
...,...,...,...
5424,853116,19621,3
5425,853116,853155,19
5426,853118,1140289,4
5427,853155,853118,2


In [8]:
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [10]:
# Pasamos el df anterior a listas para procesarlos en nuestro algoritmo.

# Obtener edges
edges = list(zip(df['source'], df['target']))

# Obtener nodos
nodes = list(set(df['source']).union(set(df['target'])))

print(edges)
print(nodes)

[(1033, 35), (103482, 35), (103515, 35), (1050679, 35), (1103960, 35), (1103985, 35), (1109199, 35), (1112911, 35), (1113438, 35), (1113831, 35), (1114331, 35), (1117476, 35), (1119505, 35), (1119708, 35), (1120431, 35), (1123756, 35), (1125386, 35), (1127430, 35), (1127913, 35), (1128204, 35), (1128227, 35), (1128314, 35), (1128453, 35), (1128945, 35), (1128959, 35), (1128985, 35), (1129018, 35), (1129027, 35), (1129573, 35), (1129683, 35), (1129778, 35), (1130847, 35), (1130856, 35), (1131116, 35), (1131360, 35), (1131557, 35), (1131752, 35), (1133196, 35), (1133338, 35), (1136814, 35), (1137466, 35), (1152421, 35), (1152508, 35), (1153065, 35), (1153280, 35), (1153577, 35), (1153853, 35), (1153943, 35), (1154176, 35), (1154459, 35), (116552, 35), (12576, 35), (128540, 35), (132806, 35), (135130, 35), (141342, 35), (141347, 35), (148170, 35), (15670, 35), (1688, 35), (175291, 35), (178727, 35), (18582, 35), (190697, 35), (190706, 35), (1956, 35), (197054, 35), (198443, 35), (198653, 

In [11]:
rdd_nodes = spark.sparkContext.parallelize(nodes)
rdd_edges = spark.sparkContext.parallelize(edges)

# PageRank

In [12]:
# Esta funcion se encarga de asignar el PageRank inicial para cada nodo
def asignar_pagerank(nodo, cantidad_nodos):
    return (nodo, 1 / cantidad_nodos)


cantidad_nodos = rdd_nodes.count()
rdd_nodes = rdd_nodes.map(lambda nodo: asignar_pagerank(nodo, cantidad_nodos))

In [13]:
# Conteo se encarga de contar las aristas asociadas a un nodo (Solo aristas que salen desde el nodo, no las que llegan al nodo)
conteo = rdd_edges.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

In [14]:
# Los mensajes corresponden a tuplas las cuales tienen la siguiente forma (nodo, mensajes_recibidos)

def mensajes_inicialesrdd(rdd_nodes,conteo):
  rdd_salidas = rdd_nodes.map(lambda x: (x[0], x[1])).join(conteo)
  rdd_salidas = rdd_salidas.map(lambda tupla: (tupla[0],tupla[1][0]/tupla[1][1]))
  return rdd_salidas

rdd_mensajes = mensajes_inicialesrdd(rdd_nodes,conteo)

In [15]:
# Hacemos un Join entre los mensajes y las aristas
join_rdds = rdd_mensajes.join(rdd_edges).map(lambda line: (line[1][1], line[1][0])) \
            .reduceByKey(lambda a, b: a + b)

In [16]:
# Rellenamos aquellos nodos que no se incluyeron debido a que no reciben mensajes
nodes_with_zero = rdd_nodes.map(lambda x: (x[0], 0))

In [17]:
# Creamos un RDD final para trabajar en el algoritmo
rdd_final = join_rdds.union(nodes_with_zero).reduceByKey(lambda a, b: a + b).map(lambda x: (x[0], 1/4, x[1]))

In [18]:
# Nuestro dumping factor va a ser considerado como 0.85

# Define el factor de dumping
dumping_factor = 0.85

# Calcula el número total de nodos en el RDD
num_nodos = rdd_final.count()
val_inicial = 1 / num_nodos
# Calcula el factor de redistribución
redistribution_factor = (1 - dumping_factor) / num_nodos

# Función para actualizar los valores de PageRank para cada nodo
def actualizar_page_rank(nodo):
    nodo_id, page_rank, ref = nodo
    page_rank = (1 - dumping_factor)/num_nodos
    new_page_rank =  page_rank + (dumping_factor * ref)
    return (nodo_id, new_page_rank, (dumping_factor * ref))

In [19]:
# Algorithm es un algoritmo que utiliza actulizar_page_rank
def algorithm(rdd_final):

  dumping_factor = 0.85
  num_nodos = rdd_final.count()
  redistribution_factor = (1 - dumping_factor) / num_nodos
  rdd_actualizado = rdd_final.map(actualizar_page_rank)

  return rdd_actualizado

In [34]:
# Definimos una iteracion maxima para terminar, en este caso es 10, o su cambio es menor a epsilon.
# El print lo hace del tipo (nodo, PageRank, Redistribution Factor)

count = 0
epsilon = 0.0000001
iteracion_anterior = None
while True:
    rdd_final = algorithm(rdd_final)
    if iteracion_anterior is not None:
        # veamos si los valores de la iteracion anterior y la actual son muy parecidos
        if iteracion_anterior.join(rdd_final).filter(lambda x: x[1][0] !=x [1][1]).filter(lambda x: abs(x[1][0] - x[1][1]) < epsilon).count() != 0:
            break
    print(rdd_final.collect())
    count += 1
    iteracion_anterior = rdd_final
    if count == 10:
        break

[(307656, 5.635123231556087e-05, 9.59799523832648e-07), (4584, 6.301863967445167e-05, 7.6272068827234455e-06), (1272, 6.529656387768115e-05, 9.905131085952928e-06), (936, 5.724704520447134e-05, 1.8556124127431202e-06), (82920, 6.0958270029957584e-05, 5.566837238229359e-06), (16008, 5.836041265211721e-05, 2.9689798603889915e-06), (3240, 5.6824733413984976e-05, 1.4333006222567541e-06), (12576, 6.0830296760113227e-05, 5.4388639683850056e-06), (641976, 5.6415218950483044e-05, 1.0237861587548245e-06), (644448, 5.667116549017175e-05, 1.2797326984435308e-06), (459216, 5.5698568639354666e-05, 3.071358476264476e-07), (126912, 5.573696062030797e-05, 3.455278285797533e-07), (124296, 5.564737933141693e-05, 2.5594653968870614e-07), (5064, 5.825803403624173e-05, 2.8666012445135083e-06), (7032, 5.7055085299704815e-05, 1.6636525079765907e-06), (376704, 5.615927241079434e-05, 7.678396190661184e-07), (71904, 5.577535260126128e-05, 3.839198095330592e-07), (270456, 5.65431922203274e-05, 1.1517594285991783

# 1.2 SINGLE SURCE SHORTEST PATH

- Escoge el nodo inicial, este nodo tiene costo acumulado 0 y todos los demás tienen costo acumulado
infinito.

- En cada iteración, cada nodo comunica el costo acumulado a sus vecinos. Cada nodo recibe este costo,
sumado con el costo de atravesar la arista.

- Para hacer merge de todos los mensajes dejamos el mÍnimo de todos los costos. Así, actualizamos cada
nodo con el costo mínimo recibido solo si es menor al costo acumulado que ya tenía ese nodo.

- Si en dos iteraciones el costo en llegar para cada nodo no cambia, entonces nos detenemos.

In [35]:
# Creamos nodos y aristas aleatorias para usarlo en el algoritmo
import random

nodes = [int(i) for i in range(20)]

edges = []
for _ in range(50):
    nodo_inicial = random.randint(0, 20)
    nodo_destino = random.randint(0, 20)
    peso = random.randint(1, 40)
    arista = (nodo_inicial, nodo_destino, peso)
    edges.append(arista)

In [36]:
print(nodes)
print(edges)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[(13, 14, 38), (3, 5, 9), (1, 11, 39), (18, 7, 15), (11, 3, 18), (10, 18, 10), (20, 17, 33), (16, 10, 27), (16, 14, 27), (13, 14, 39), (20, 8, 31), (18, 12, 29), (10, 17, 30), (16, 8, 26), (10, 16, 32), (10, 18, 2), (12, 8, 14), (11, 19, 27), (15, 6, 10), (4, 1, 23), (1, 17, 32), (14, 16, 15), (3, 10, 37), (7, 20, 23), (14, 3, 38), (7, 11, 5), (8, 17, 21), (8, 2, 13), (3, 13, 11), (14, 14, 8), (1, 5, 8), (2, 3, 30), (9, 13, 1), (9, 10, 13), (15, 13, 19), (14, 16, 35), (18, 16, 11), (16, 8, 37), (19, 10, 26), (13, 13, 31), (14, 0, 14), (5, 3, 6), (0, 20, 24), (8, 11, 2), (14, 11, 31), (0, 3, 19), (20, 12, 5), (20, 10, 23), (18, 7, 7), (5, 11, 26)]


In [37]:
# Paralelizamos
rdd_nodes = spark.sparkContext.parallelize(nodes)
rdd_edges = spark.sparkContext.parallelize(edges)

# Selección del nodo_inicial

In [38]:
# Seleccionamos un nodo incial (Tener ojo con la seleccion, si por algun motivo este nodo no esta conectado a ningun otro, va a dar infinito en todos los caminos, debido
# a que no existe un camino entre ese nodo y otro)
nodo_inicial = 3

In [39]:
# Filtramos los caminos que salen de ese nodo
rdd_caminos_iniciales = rdd_edges.filter(lambda x: x[0] == nodo_inicial)

In [40]:
# inicialicemos los costes de los nodos a infinito y el coste del nodo origen a 0
rdd_costs = rdd_nodes.map(lambda x: (x, float('inf')))
rdd_costs = rdd_costs.map(lambda x: (x[0], 0.0) if x[0] == nodo_inicial else x)

In [41]:
# realicemos lo anterior de manera iterativa hasta que no existan caminos que mejoren el coste de llegada a los nodos destino
# creemos un ciclo que se repita hasta que no existan caminos que mejoren el coste de llegada a los nodos destino
while True:
    # veamos los caminos que encontramos desde el nodo_destino de los caminos iniciales

    rdd_costs_ac = rdd_edges.map(lambda x: (x[0],(x))).join(rdd_caminos_iniciales.map(lambda x: (x[1], (x[0],x[1],x[2]))))
    rdd_caminos = rdd_caminos_iniciales.union(rdd_costs_ac.map(lambda x: (x[1][1][0],x[1][0][1],x[1][1][2] + x[1][0][2])))

    # actualicemos los costes de los nodos destino si el costo del camino encontrado es menor que el costo del nodo destino
    # unamos los rdd y luego hagamos un reduceByKey para quedarnos con el menor coste

    rdd_costs_nuevos = rdd_costs.union(rdd_caminos.map(lambda x: (x[1], x[2])))
    rdd_costs_nuevos = rdd_costs_nuevos.reduceByKey(lambda x,y: x if x < y else y)

    # veamos si los costes encontrados son iguales a los ya encontrados
    if rdd_costs_nuevos.join(rdd_costs).filter(lambda x: x[1][0] != x[1][1]).count() == 0:
        break

    # actualicemos los costes iniciales con los costes encontrados
    rdd_costs = rdd_costs_nuevos
    # actualicemos los caminos iniciales con los caminos encontrados
    rdd_caminos_iniciales = rdd_caminos

In [42]:
rdd_costs.collect()

[(0, 63),
 (1, inf),
 (2, 89),
 (3, 0.0),
 (4, inf),
 (5, 9),
 (6, inf),
 (7, 46),
 (8, 76),
 (9, inf),
 (10, 37),
 (11, 35),
 (12, 68),
 (13, 11),
 (14, 49),
 (15, inf),
 (16, 50),
 (17, 67),
 (18, 39),
 (19, 62),
 (20, 69)]