In [2]:
!pip install pyspark
!pip install neo4j
!pip install pandas

import pandas as pd
import numpy as np

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ef694b21efd3cdb4241c122f9b2922cc28317f835656ea260cb8021c0bd27e75
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting neo4j
  Downloading neo4j-5.21.0-py3-none-any.whl (286 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m286.8/286.8 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: neo4j
Successfully installed neo4j-5.21.0


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Ejemplo de PySpark en Jupyter Notebook") \
    .getOrCreate()

# Obtener el SparkContext
sc = spark.sparkContext

# Parte 2
## Problemas a resolver

### Funciones para implementar PySpark

In [4]:
def hash(x, B):
    return x % B

In [5]:
def map_pdm(x, arista, y, B):

    x_hash = hash(x, B)
    y_hash = hash(y, B)

    l = []
    for i in range(B):
        l.append(((x_hash, y_hash, i), (x, arista, y)))
        l.append(((i, x_hash, y_hash), (x, arista, y)))
        l.append(((y_hash, i, x_hash), (x, arista, y)))

    return set(l)

In [6]:
def triangulo(nodos):

  triangulos_detectados = []
  for i in range(len(nodos)):
    nodo_actual = nodos[i]

    for j in range(i + 1, len(nodos)):

      nodo_sig = nodos[j]

      for k in range(j + 1, len(nodos)):
        nodo_sub_sig = nodos[k]

        if nodo_actual[2] == nodo_sig[0] and nodo_sig[2] == nodo_sub_sig[0] and nodo_actual[0] == nodo_sub_sig[2]:
          triangulos_detectados.append((nodo_actual[0], nodo_sig[0], nodo_sub_sig[0]))

        elif nodo_actual[2] == nodo_sub_sig[0] and nodo_sig[0] == nodo_sub_sig[2] and nodo_actual[0] == nodo_sig[2]:
          triangulos_detectados.append((nodo_actual[0], nodo_sig[0], nodo_sub_sig[0]))

        elif (nodo_actual[0] == nodo_sub_sig[0] and nodo_actual[2] == nodo_sig[2] and nodo_sub_sig[2] == nodo_sig[0]):
          triangulos_detectados.append((nodo_actual[0], nodo_sig[0], nodo_sub_sig[0]))

  return triangulos_detectados

*Conección a Neo4j*

In [7]:
from neo4j import GraphDatabase

URI = "neo4j+s://06ae1fa1.databases.neo4j.io"
AUTH = ("neo4j","QjwVk3kN-OI5bTt-fg6LZy-F4LMHCpL9HFxuvSuq-OE")

driver = GraphDatabase.driver(URI, auth=AUTH)
with driver.session() as session:
    try:
        session.run("RETURN 1")
        print("Connection to Neo4j established successfully!")
    except Exception as e:
        print(f"Failed to connect to Neo4j: {e}")

Connection to Neo4j established successfully!


### Cargar el grafo




In [8]:
def get_data_from_neo4j():
    with driver.session() as session:
        result = session.run("""
        MATCH (n1:Node)-[r:RELATED]->(n2:Node)
        RETURN n1.id AS id_form, n2.id AS id_to, r.weight AS weight
        """)
        data = [record.data() for record in result]
        tuples = [(d['id_form'], d['weight'], d['id_to']) for d in data]
    return tuples

# Obtiene los datos de Neo4j
neo4j_data = get_data_from_neo4j()

In [9]:
# Neo4j a RDD
rdd = sc.parallelize(neo4j_data)

In [10]:
rdd.take(5)

[(1, 1, 2), (1, 7, 3), (4, 3, 5), (6, 5, 7), (6, 14137, 8)]

Función para la busqueda de triángulos

In [11]:
# toma el rdd y un b para los buckets de la función de hash y
# devuelve un diccionario con las claves originales y los triángulos detectados.
def buscar_triangulos(rdd, b):
    rdd_neo4j_map = rdd.flatMap(lambda dato: map_pdm(*dato, b))
    reduce_neo4j = rdd_neo4j_map.groupByKey().mapValues(list)
    trangulos = reduce_neo4j.map(lambda nodos: (nodos[0], triangulo(nodos[1])))

    dicc = {}
    for i in trangulos.collect():
        dicc[i[0]] = i[1]

    return dicc


In [12]:
#buckets
B = 100

In [13]:
triangulos_detectados = buscar_triangulos(rdd, B)

In [14]:
list(triangulos_detectados.items())[:5]

[((13, 15, 81), []),
 ((54, 44, 51), [(44, 51, 54)]),
 ((44, 52, 77), []),
 ((53, 44, 64), []),
 ((42, 24, 75), [])]

In [49]:
#buscamos el nodo de valor maximo

max_nodo = rdd.map(lambda x: (x[0], x[2])).reduce(lambda a, b: (max(a[0], b[0]), max(a[1], b[1])))
max_nodo = max(max_nodo[0], max_nodo[1])

#Si comparamos el valor maximo con la cantidad de nodos, son iguales
#Es decir van del 1 hasta el nodo maximo

cant_nodo = rdd.flatMap(lambda x: [x[0], x[2]]).distinct().count()

print(f"Max nodo: {max_nodo}, cantidad de nodo: {cant_nodo}")

# En cambio el maximo de una arista es bastante mayor a la cantidad de aristas
# por lo tanto vamos a crear un diccionario que tome el valor de una arista
# y entregue en que posición va si ordenamos las aristas de menor a mayor
# para que nuestra matriz no ocupe tanta memoria

max_arista = rdd.map(lambda x: x[1]).reduce(lambda a, b: max(a, b))

cant_aristas = rdd.map(lambda x: x[1]).distinct().count()

print(f"Max arista: {max_arista}, cantidad de aristas: {cant_aristas}")

aristas = rdd.map(lambda x: x[1]).distinct().sortBy(lambda x: x).collect()

aristas_dict = {val: id + 1 for id, val in enumerate(aristas)}

# Crear un diccionario inverso para mapear posiciones de vuelta a valores de aristas

inverse_aristas_dict = {v: k for k, v in aristas_dict.items()}

Max nodo: 1574, cantidad de nodo: 1574
Max arista: 1489618, cantidad de aristas: 9968


In [39]:
def rdd_a_matriz(rdd, dicc):

  max_nodo = rdd.map(lambda x: (x[0], x[2])).reduce(lambda a, b: (max(a[0], b[0]), max(a[1], b[1])))
  max_nodo = max(max_nodo[0], max_nodo[1])

  max_arista = rdd.map(lambda x: x[1]).reduce(lambda a, b: max(a, b))

  max_arista = dicc[max_arista]

  M = np.zeros((max_nodo + 1, max_arista + 1, max_nodo + 1), dtype=int)

  # Creamos la matriz y le ingresamos los valores

  for i in rdd.collect():
    M[i[0]][dicc[i[1]]][i[2]] = 1

  return M

In [48]:
M = rdd_a_matriz(rdd, aristas_dict)

In [45]:
def matriz_a_rdd(M, inverse_dict):

  grafo = []

  for nodo in range(len(M)):

    for arista in range(len(M[nodo])):

        for nodo_sig in range(len(M[nodo][arista])):

            if M[nodo][arista][nodo_sig] == 1:
                grafo.append((nodo, inverse_dict[arista], nodo_sig))

  return sc.parallelize(grafo)

In [47]:
new_rdd = sc.parallelize(rdd.take(4))

M_test = rdd_a_matriz(new_rdd, aristas_dict)
matriz_a_rdd(M_test, inverse_aristas_dict).collect()

[(1, 1, 2), (1, 7, 3), (4, 3, 5), (6, 5, 7)]

### Resivimos el patron

Si el patron viene en matriz lo pasamos a grafo con la función `matriz_a_rdd()`, y si el patrón viene en rdd lo trabajamos tal que así:

In [57]:
def patron_in_rdd(patron : list, invers_dict):

  for nodo_1, arista, nodo_2 in patron:

    if arista not in invers_dict:
        return False

    if M[nodo_1, invers_dict[arista], nodo_2] == 0:
      return False

  return True


In [52]:
patron_in_rdd(rdd.take(4), inverse_aristas_dict)

True

In [60]:
patron_in_rdd([(1, 7842654, 3)], inverse_aristas_dict)

False

In [62]:
def variables(nodos):

  for i in range(len(nodos) - 1):
        nodo_actual = nodos[i]
        nodo_siguiente = nodos[i + 1]

        if not np.any(M[nodo_actual, :, nodo_siguiente]):
            return False

        return True

In [63]:
variables((5, 54, 56, 98))

False

In [65]:
rango_numeros = range(1, 1575)

while True:
    numeros_azar = np.random.choice(rango_numeros, size=4, replace=False)

    if variables(tuple(numeros_azar)):
        print("Se encontró una secuencia con camino en el grafo:")
        print(numeros_azar)
        break

Se encontró una secuencia con camino en el grafo:
[ 44  94 340 788]


In [61]:
driver.close()