In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.


In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

In [None]:
import random

In [None]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
sqlContext = SQLContext(sc)



In [None]:
nodos = [('A','B'),('A','C'),('B','C'),('B','A'),('C','B'),('B','D')]

In [None]:
grafoRDD = sc.parallelize(nodos)

Opción 1

In [None]:
def centralidad_aproximada(grafoRDD,walk_number=5,walk_length=100):
  cent = {}
  visitados = set()
  for i in range(walk_number):
    v = grafoRDD.takeSample(False,1)[0][0]
    if v not in visitados:
        cent[v] = 1
        visitados.add(v)
    else: cent[v] += 1
    
    for j in range(walk_length-1):
      vecinos = grafoRDD.filter(lambda x: x[0] == v).map(lambda x: x[1]).collect()
      while not vecinos:
        v = grafoRDD.takeSample(False,1)[0][0]
        vecinos = grafoRDD.filter(lambda x: x[0] == v).map(lambda x: x[1]).collect()
      w = random.choice(vecinos)
    
      if w not in visitados:
        cent[w] = 1
        visitados.add(w)
      else: cent[w] += 1
      v = w
  return cent

In [None]:
centralidad_aproximada(grafoRDD)

{'A': 95, 'B': 199, 'C': 129, 'D': 77}

Opción 2

In [None]:
listaAdyacencias = grafoRDD.groupByKey().map(lambda x: (x[0], list(x[1]))).cache()

In [None]:
listaAdyacencias.collect()

[('C', ['B']), ('A', ['B', 'C']), ('B', ['C', 'A', 'D'])]

In [None]:
def centralidad_aproximada(listaAdyacencias,walk_number=5,walk_length=100):
  cent = {}
  visitados = set()
  for i in range(walk_number):
    v,adyacentes = listaAdyacencias.takeSample(False,1)[0] # obtengo un vertice y 
                                                           # sus adyacentes aleatoriamente
                                                           # ejemplo: ('B', ['C', 'A', 'D']).
    if v not in visitados: 
        cent[v] = 1
        visitados.add(v)
    else: cent[v] += 1
    for j in range(walk_length-1):
      siguiente_nodo = listaAdyacencias.filter(lambda x: x[0] == v).collect()
      # puede ser que el nodo no apunte a nada y al filtrar me devuelva una lista vacía
      if not siguiente_nodo: 
        v,adyacentes = listaAdyacencias.takeSample(False,1)[0] # si me devuelve una lista vacía,
                                                               # obtengo otro nodo aleatoriamente
                                                               # y sus adyacentes.
      else:
        v,adyacentes = (siguiente_nodo[0][0],siguiente_nodo[0][1])
      w = random.choice(adyacentes)
      if w not in visitados:
        cent[w] = 1
        visitados.add(w)
      else: cent[w] += 1
      v = w

  return cent

In [48]:
centralidad_aproximada(listaAdyacencias)

{'A': 84, 'B': 206, 'C': 138, 'D': 72}

Opcion 3

In [None]:
def random_walk(grafoRDD, n = 10):
  '''
  recibe un rdd grafo con el formato (nodo1,nodo2) y un largo n
  devuelve un random walk de largo n. En caso de llegar a un nodo
  que no apunta a nada comienza de un nodo aleatorio nuevamente.
  '''
  camino = sc.parallelize(grafoRDD.takeSample(False,1)).map(lambda x: (x[1],x[0]))
  nodos_adyacentes = camino.leftOuterJoin(grafoRDD).map(lambda x: (x[0],x[1][1]))
  nodo_aleatorio = sc.parallelize(nodos_adyacentes.takeSample(False,1)).map(lambda x: (x[1],x[0]))
  camino = camino.union(nodo_aleatorio)
  for _ in range(n-2):
    nodos_adyacentes = nodo_aleatorio.leftOuterJoin(grafoRDD).map(lambda x: (x[0],x[1][1]))
    if nodos_adyacentes.collect()[0] == (None,None):
      nodo_aleatorio = sc.parallelize(grafoRDD.takeSample(False,1)).map(lambda x: (x[1],x[0]))
    else:
      nodo_aleatorio = sc.parallelize(nodos_adyacentes.takeSample(False,1)).map(lambda x: (x[1],x[0]))
    camino = camino.union(nodo_aleatorio)
  return camino.map(lambda x: x[1]).collect()

In [None]:
def centralidad(camino):
  cent = {}
  visitados = set()
  for v in camino:
    if v not in visitados:
      cent[v] = 1
      visitados.add(v)
    else: cent[v] += 1
  return cent

In [56]:
# hago varios random walks
camino = []
for _ in range(5):
  camino.extend(random_walk(grafoRDD,100))
centralidad(camino)

{'A': 93, 'B': 213, 'C': 123, 'D': 71}

Como podemos observan en los 3 casos obtuvimos que B tiene la mayor cantidad de apariciones.