In [1]:
!pip install neo4j==5.3.0 pyspark==3.3.1


Collecting neo4j==5.3.0
  Downloading neo4j-5.3.0.tar.gz (157 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m157.8/157.8 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.3.1)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: neo4j, pyspark
  Building wheel for neo4j (pyproject

In [2]:
from neo4j import GraphDatabase, basic_auth
import pandas as pd
from pyspark import SparkContext
from collections import deque

In [4]:
# Nos conectamos a nuestra BDD de pruebas
# Para conectar una base de datos de neo4j basta con llenar la variables de abajo
# con los datos necesarios.
#---------------------------RELLENAR---------------------------------
URI = "neo4j+s://086a21e0.databases.neo4j.io"
AUTH = ("neo4j", "thlPXPR9u7opMHv3r9PMOVRRWjDPX1BzWQg1R0u_wqQ") # (user, password)
#---------------------------RELLENAR---------------------------------


driver = GraphDatabase.driver(URI,
  auth=AUTH)
with driver.session() as session:
    try:
        session.run("RETURN 1")
        print("Connection to Neo4j established successfully!")
    except Exception as e:
        print(f"Failed to connect to Neo4j: {e}")


Connection to Neo4j established successfully!


# Códigos de prueba usados para la tarea

In [10]:
#Query para cargar los datos de prueba a una base de datos de neo4j de prueba
query = '''
MERGE (n1:Node {id: 1})
MERGE (n2:Node {id: 2})
MERGE (n3:Node {id: 3})
MERGE (n4:Node {id: 4})
MERGE (n5:Node {id: 5})
MERGE (n6:Node {id: 6})


MERGE (n1)-[:RELATIONSHIP_TYPE_11]->(n2)
MERGE (n1)-[:RELATIONSHIP_TYPE_11]->(n3)
MERGE (n2)-[:RELATIONSHIP_TYPE_11]->(n3)
MERGE (n3)-[:RELATIONSHIP_TYPE_11]->(n2)
MERGE (n3)-[:RELATIONSHIP_TYPE_11]->(n4)
MERGE (n4)-[:RELATIONSHIP_TYPE_11]->(n1)
MERGE (n4)-[:RELATIONSHIP_TYPE_11]->(n2)
MERGE (n4)-[:RELATIONSHIP_TYPE_11]->(n3)
MERGE (n4)-[:RELATIONSHIP_TYPE_12]->(n5)
MERGE (n5)-[:RELATIONSHIP_TYPE_12]->(n1)
MERGE (n5)-[:RELATIONSHIP_TYPE_12]->(n2)
MERGE (n5)-[:RELATIONSHIP_TYPE_12]->(n6)
'''
with driver.session() as session:
    result = session.run(query)
    for record in result:
        print(record)

In [11]:
# Funció para obtener una lista de aritas del grafo en neo4j, retorna una lista de tuplas de la forma (nodo1, relación, nodo2)
def fetch_edges(tx):
    query = "MATCH (n1)-[r]->(n2) RETURN n1.id AS n1, type(r) AS r, n2.id AS n2"
    result = tx.run(query)
    edges = [(record["n1"], record["r"], record["n2"]) for record in result]
    return edges

# Función para ejecutar la obtención de las aristas, llama a la función fetch_edges(tx)
def get_edges(driver):
    with driver.session() as session:
        edges = session.execute_read(fetch_edges)
    return edges

In [102]:
# Código de prueba
# Buscmamos la lista de aristas en neo4j
try:
    edges = get_edges(driver)
    print(f"Fetched {len(edges)} edges from Neo4j.")
except Exception as e:
    print(f"An error occurred while fetching edges: {e}")

Fetched 12 edges from Neo4j.


In [15]:
B# Función para pasar la lista de aristas a una rdd en pyspark
def generte_rdd(edges):
    rdd = sc.parallelize(edges)
    return rdd

In [16]:
# Códig de prueba
edge_rdd = generte_rdd(edges)

In [21]:
# Hasher retorna los haches de los dos nodos pertenecientes a la arista "edge"
# La función de hash que usamos es módulo de b, siendo b el numero de reducers.
def hasher(edge,b):
  n1_hash = hash(edge[0]) % b
  n2_hash = hash(edge[2]) % b
  return n1_hash, n2_hash

#función de mapeo para la fase de distribución de aristas.
#si queremos mapear a patrones de ciclos mas grandes, ejemplo, cuadrados,
#pentagonos, etc. solo debemos ajustar el parametro de pattern_size.
def mapper(edge, b, pattern_size=3):
  b1, b2 = hasher(edge, b)
  keys = []
  #calculamos todas las permutaciones necesarias para las llaves
  perms = deque([b1, b2] + [None] * (pattern_size - 2))
  for i in range(pattern_size):
    perms.rotate(1)
    for j in range(b):
      key = tuple(j if x is None else x for x in perms)
      keys.append(key)

  for key in keys:
    yield (key, edge)


In [22]:
# Codigo de prueba
for edge in edges:
  for key in mapper(edge, 3, pattern_size=4):
    print(key)

((0, 1, 2, 0), (1, 'RELATIONSHIP_TYPE_11', 2))
((1, 1, 2, 1), (1, 'RELATIONSHIP_TYPE_11', 2))
((2, 1, 2, 2), (1, 'RELATIONSHIP_TYPE_11', 2))
((0, 0, 1, 2), (1, 'RELATIONSHIP_TYPE_11', 2))
((1, 1, 1, 2), (1, 'RELATIONSHIP_TYPE_11', 2))
((2, 2, 1, 2), (1, 'RELATIONSHIP_TYPE_11', 2))
((2, 0, 0, 1), (1, 'RELATIONSHIP_TYPE_11', 2))
((2, 1, 1, 1), (1, 'RELATIONSHIP_TYPE_11', 2))
((2, 2, 2, 1), (1, 'RELATIONSHIP_TYPE_11', 2))
((1, 2, 0, 0), (1, 'RELATIONSHIP_TYPE_11', 2))
((1, 2, 1, 1), (1, 'RELATIONSHIP_TYPE_11', 2))
((1, 2, 2, 2), (1, 'RELATIONSHIP_TYPE_11', 2))
((0, 1, 0, 0), (1, 'RELATIONSHIP_TYPE_11', 3))
((1, 1, 0, 1), (1, 'RELATIONSHIP_TYPE_11', 3))
((2, 1, 0, 2), (1, 'RELATIONSHIP_TYPE_11', 3))
((0, 0, 1, 0), (1, 'RELATIONSHIP_TYPE_11', 3))
((1, 1, 1, 0), (1, 'RELATIONSHIP_TYPE_11', 3))
((2, 2, 1, 0), (1, 'RELATIONSHIP_TYPE_11', 3))
((0, 0, 0, 1), (1, 'RELATIONSHIP_TYPE_11', 3))
((0, 1, 1, 1), (1, 'RELATIONSHIP_TYPE_11', 3))
((0, 2, 2, 1), (1, 'RELATIONSHIP_TYPE_11', 3))
((1, 0, 0, 0)

In [100]:
# Función de reducción estricta, tiene como input:
# - edges; una lista de tuplas de las aristas de la forma (nodo1, Rel, nodo2)
# - cycle_length; el largo del ciclo que estamos buscando (triangulo=3, cadrado=4, etc)
# - depth; profundidad de la recursión para la busqueda de caminos
# - rel_types; una lista de las relaciones que queremos que se cumplan en el camino, en orden
# - path; el camino que se lleva explorado en cada iteración
def strict_relationships(edges, cycle_length, depth, rel_types, path):
  #si llegamos a un profundidad de busqueda igual al largo del ciclo, paramos
  if depth == cycle_length-1:
    print(f'camino cerrado {path}')
    return path
  else:
    #para cada arista en edges, expandimos la busqueda, si el nodo de salida se
    #encuentra en el camino que llevamos, si ademas el nodo de llegada aún no
    #está en el camino que llevamos, y si el tipo de relación que se pide es el
    #indicado.
    for edge in edges: # iteramos directamente sobre las aristas
      if ((edge[0] in path) and (edge[2] not in path) and (rel_types[depth] == edge[1])) or (cycle_length==depth): # Access elements of 'edge' directly
        new_path = path.copy()
        new_path.append(edge[2])
        result = strict_relationships(edges, cycle_length, depth + 1, rel_types, new_path)
        #retornamos el resultado si y solo si el camino explorado no es vacío
        if result is not None:
          return result
  return []


# Función de reducción laxa, tiene como input:
# - edges; una lista de tuplas de las aristas de la forma (nodo1, Rel, nodo2)
# - cycle_length; el largo del ciclo que estamos buscando (triangulo=3, cadrado=4, etc)
# - depth; profundidad de la recursión para la busqueda de caminos
# - path; el camino que se lleva explorado en cada iteración
def lax_relationships(edges, cycle_length, depth, path):
  #si llegamos a un profundidad de busqueda igual al largo del ciclo, paramos
  if depth == cycle_length-1:
    print(f'camino cerrado {path}')
    return path
  else:
    #para cada arista en edges, expandimos la busqueda, si el nodo de salida se
    #encuentra en el camino que llevamos y ademas el nodo de llegada aún no
    #está en el camino que llevamos, no nos importa que tipo de relación tienen.

    for edge in edges: # Iteramos directamente sobre las arista
      if (edge[0] in path and edge[2] not in path) or (cycle_length==depth):
        new_path = path.copy()
        new_path.append(edge[2])
        result = lax_relationships(edges, cycle_length, depth + 1, new_path)
        #retornamos el resultado si y solo si el camino explorado no es vacío
        if result is not None:
          return result
  return []

# Funcion de reduccion general, es la que se llama al hacer el reduce mismo en spark
# - key; la llave de cada valor en la RDD.
# - values; valores de los datos para cada llave en la RDD.
# - cycle_length; el largo del ciclo que estamos buscando (triangulo=3, cadrado=4, etc).
# - strict_rules; booleano que define si se usa la funcion strict_relationships o lax_relationships.
# - rel_types; una lista de las relaciones que queremos que se cumplan en el camino, en orden
def generalized_reducer(key, values, cycle_length, strict_rels=False, rel_types=None):
  edges = list(values) # Aristas
  cycles = list() # ciclos encontrados


  if strict_rels: # Llamado a la funcion de reduccion estricta
  #comenzamos revisando que se pueda usar la función estricta
    if rel_types is None: # Revisamos que se le haya entregado una lista de relaciones
      raise ValueError("rel_types must be specified in strict mode")
    for rel_type in rel_types:
      if not isinstance(rel_type, str):
        raise ValueError("rel_types must contain only strings")
    if len(rel_types) != cycle_length: # Revisamos que la cantidad de relaciones sea la correcta.
      raise rel_types("rel_types must have the same length as the cycle length")
    path=[]
    cycles.extend(strict_relationships(edges, cycle_length, 0, rel_types, path=[edge[0]]) for edge in values)
  else: # Llamado a la función de reducción laxa
    path = []
    cycles.extend(lax_relationships(edges, cycle_length, 0, path=[edge[0]]) for edge in values)
  return cycles

In [42]:
#-------------------------------CÓDIGO DE PRUEBA---------------------------
pattern_size = 3  # Lasrgo del patrón que se desea buscar
b = 3  # Numero de reducers que se usarán

# Enviamos a cada nodo de la RDD las aristas correspondientes según la función de map definifa mas arriba
# Enviamos el tamaño del patrón para que se puedan generar llaves para todos los ordenes posibles del patrón.
mapped_rdd = edge_rdd.flatMap(lambda edge: mapper(edge, b, pattern_size))

# Aplicamos la función de reducción general
cycle_length = 3  # Largo del patrón que queremos buscar
strict_rels = True  # True: funcion de reduccion estricta, False: funcion de reducciómn laxa

# Lista de relaciones que buscamos igualar en la reducción.
# IMPORTANTE, el orden de las relaciones que se busca dee venir definido en esta lista.
rel_types = ["RELATIONSHIP_TYPE_11", "RELATIONSHIP_TYPE_11", "RELATIONSHIP_TYPE_11"]  # Relaciones que buscamos

# Aplicamos la reducción general
reduced_rdd = mapped_rdd.groupByKey().flatMap(lambda x: generalized_reducer(x[0], x[1], cycle_length, strict_rels=strict_rels, rel_types=rel_types))

# Recolectamos y mostramos los datos
result = reduced_rdd.collect()
result = [tuple(cycle) for cycle in result]

# Creamos un set de los resultados para eliminar duplicados
unique_cycles = set(result)
print(unique_cycles)
#-------------------------------CÓDIGO DE PRUEBA---------------------------

{(4, 3, 2), (1, 3, 2), (1, 2, 3), (1, 3, 4), (3, 4, 2), (2, 3, 4), (3, 4, 1), (4, 1, 3), (4, 2, 3), (), (4, 1, 2)}


### Hasta este punto definimos las funciones que ayudarán a la escritura efciente y clara de las funciones que efectivamente nos piden para la tarea

Funciones definidas:
- fetch_edges()
- get_edges()
- generate_rdd()
- hasher()
- mapper()
- strict_relationships()
- lax_relationhips()
- generalized_reducer()

# Pregunta 2.1

In [None]:
def neo4j_to_spark():
  try:
    edges = get_edges(driver) # Funcion para obtener las aristas de la forma (nodo1, Rel, nodo2)
    print(f"Fetched {len(edges)} edges from Neo4j.")
  except Exception as e:
    print(f"An error occurred while fetching edges: {e}")

  sc = SparkContext() # Spark Context
  edge_rdd = generte_rdd(edges) # Funcion que nos genera la RDD
  return edge_rdd # Retornamos la RDD

In [None]:
edge_rdd = neo4j_to_spark() # Llamamos al generador de la RDD

# Pregunta 2.2

In [104]:
# Función para buscar triangulos
# Suposiciones:
# - La conección con la BDD de neo4j ya está bien establecida.
# - Se entrega una lista ordenada con las relaciones que se buscan que conformen el triangulo.
# - Solo se puede buscar una combinación de relaciones a la vez.
def triangulos(rel_type, b=3):
  mapped_rdd = edge_rdd.flatMap(lambda edge: mapper(edge, b, 3)) # Mapeamos con los parametros pedidos en el enunciado (b=3, triangulos)

  # Aplicamos la función de reducción general
  cycle_length = 3  # Largo del patrón que queremos buscar, en este caso, triangulos
  strict_rels = True  # True: funcion de reduccion estricta

  rel_types = rel_type  # Relaciones que buscamos

  # Aplicamos la reducción general
  reduced_rdd = mapped_rdd.groupByKey().flatMap(lambda x: generalized_reducer(x[0], x[1], cycle_length, strict_rels=True, rel_types=rel_types))

  # Recolectamos y mostramos los datos
  result = reduced_rdd.collect()
  result = [tuple(cycle) for cycle in result]

  # Creamos un set de los resultados para eliminar duplicados
  unique_cycles = set(result)
  return unique_cycles # Retornamos el set de triangulos

In [None]:
triangulos = triangulos(rel_type=1)

# Pregunta 2.4

In [45]:
# Funcion para encontrar cuadrados
# Está construida bajo la suposición de que la definicion de M es que es un array
# de tres dimensiones donde las coordenadas M[x,y,z] representan si existe o no
# la conección de x e y mediante la relación z. Así fue como interprete la definición
# del enunciado, y lo discutido en el issue #31 del GitHub del curso.
# También, según lo que entendí del enunciado, para este punto (2.4) se recibirá
# una lista de variables, que suponemos es la lista ordenada de relaciones que
# queremos que se cumplan en el cuadrado.
def cuadrados(M, rels, b=4):

  nonzeros = M.nonzero()
  edges = []
  for dim in range(len(M.nonzero()[0])):
    x = nonzeros[0][dim]
    y = nonzeros[1][dim]
    z = nonzeros[2][dim]
    edges.append((x, y, z))

  mapped_rdd = edges.flatMap(lambda edge: mapper(edge, b, 4)) # Mapeamos con los parametros pedidos en el enunciado (b=4, cuadrados)

  cycle_length = 4  # Largo del patrón que queremos buscar, en este caso, cuadrados
  strict_rels = True  # True: funcion de reduccion estricta

  rel_types = rels  # Relaciones que buscamos

  # Aplicamos la reducción general
  reduced_rdd = mapped_rdd.groupByKey().flatMap(lambda x: generalized_reducer(x[0], x[1], cycle_length, strict_rels=True, rel_types=rel_types))

  # Recolectamos y mostramos los datos
  result = reduced_rdd.collect()
  result = [tuple(cycle) for cycle in result]

  # Creamos un set de los resultados para eliminar duplicados
  unique_cycles = set(result)
  return unique_cycles # Retornamos el set de cuadrados

[None, None, None]