De la documentacion de Neo4J aura DB tenemos que:

In [2]:
!pip install neo4j
!pip install pyspark

Collecting neo4j
  Downloading neo4j-5.21.0-py3-none-any.whl (286 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m286.8/286.8 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: neo4j
Successfully installed neo4j-5.21.0
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=3b192fffeecc2207f2e5ad83ba1e13e817d7ee3f55a8301791c295413f91c16a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


Preparando el entorno de programación:

In [3]:
# Import the driver
from neo4j import GraphDatabase

# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://85e8aff6.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = "BjfDCM2mLhBLRhzlS629LFnZzpVHUfDFs4jVBnnvjPE"

# Instantiate the driver
driver = GraphDatabase.driver(
    AURA_CONNECTION_URI,
    auth=(AURA_USERNAME, AURA_PASSWORD)
)

In [4]:
# Prueba de conexión
try:
    with driver.session() as session:
        result = session.run("RETURN 1")
        print("Conexión exitosa, credenciales correctas.")
except Exception as e:
    print(f"Error al conectar a Neo4j: {e}")

Conexión exitosa, credenciales correctas.


#1.1 Precalentamiento: triángulos

Punto 1

In [5]:
def create_graph_in_neo4j(driver, edge_list):
    with driver.session() as session:
        # Limpiar la base de datos eliminando todos los nodos y relaciones existentes
        session.run("MATCH (n) DETACH DELETE n")

        # Crear nodos y relaciones a partir de la lista de aristas
        for u, l, v in edge_list:
            session.run(
                "MERGE (a:Node {id: $u}) "
                "MERGE (b:Node {id: $v}) "
                "MERGE (a)-[:RELATION {label: $l}]->(b)",
                u=u, l=l, v=v
            )

In [6]:
# Lista de aristas
edge_list = [(1,11,2),(1,11,3),(2,11,3),(3,11,2),(3,11,4),(4,11,1),(4,11,2),(4,11,3),(4,12,5),(5,12,1),(5,12,2),(5,12,6)]

# Crear el grafo en Neo4j
create_graph_in_neo4j(driver, edge_list)

Punto 2

In [7]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [8]:
# Función para obtener aristas del grafo en Neo4j
def get_edges_from_neo4j(driver):
    query = """
    MATCH (a)-[r:RELATION]->(b)
    RETURN a.id AS src, r.label AS label, b.id AS dst
    """
    with driver.session() as session:
        result = session.run(query)
        edges = [(record['src'], record['label'], record['dst']) for record in result]
    return edges

In [9]:
# Obtener aristas del grafo
edges = get_edges_from_neo4j(driver)

In [10]:
# Inicializar Spark
spark = SparkSession.builder.appName("Neo4jToSpark").getOrCreate()
sc = spark.sparkContext

In [11]:
# Crear RDD a partir de las aristas
edges_rdd = sc.parallelize(edges)

In [12]:
# Mostrar el contenido del RDD
print(edges_rdd.collect())

[(5, 12, 6), (5, 12, 1), (5, 12, 2), (1, 11, 2), (1, 11, 3), (2, 11, 3), (3, 11, 2), (3, 11, 4), (4, 12, 5), (4, 11, 1), (4, 11, 2), (4, 11, 3)]


In [13]:
# Detener Spark
sc.stop()

Cada tupla en esta lista representa una arista en el grafo que se extrajo de Neo4j.

In [14]:
# Inicializar Spark
conf = SparkConf().setAppName("TriangleFinding").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [15]:
# Ejemplo de lista de aristas (solo con etiqueta 11)
edge_list = [(1, 11, 2), (1, 11, 3), (2, 11, 3), (3, 11, 2), (3, 11, 4),
         (4, 11, 1), (4, 11, 2), (4, 11, 3), (4, 12, 5), (5, 12, 1),
         (5, 12, 2), (5, 12, 6)]

In [16]:
# Parámetro b
b = 2  # Este valor se puede ajustar según el tamaño del clúster y el grafo

In [17]:
# Función mapper que genera combinaciones necesarias para formar triángulos
def mapper(arista):
    n1, R, n2 = arista
    b1 = n1 % b
    b2 = n2 % b
    results = []

    # Generar todas las combinaciones necesarias para formar triángulos
    keys = [
        (b1, b2, i) for i in range(b)
    ] + [
        (b2, i, b1) for i in range(b)
    ] + [
        (i, b1, b2) for i in range(b)
    ]

    for key in keys:
        results.append((key, (n1, R, n2)))

    return results

In [18]:
# Función reducer que busca triángulos en el grupo de aristas
def reducer(key, values):
    triangles = []
    value_list = list(values)

    # Buscar triángulos en el grupo de aristas
    for i in range(len(value_list)):
        n1, R, n2 = value_list[i]
        for j in range(i+1, len(value_list)):
            m1, S, m2 = value_list[j]
            if n2 == m1:
                for k in range(j+1, len(value_list)):
                    p1, T, p2 = value_list[k]
                    if m2 == p1 and p2 == n1:
                        triangles.append((n1, n2, m2))

    return triangles

In [19]:
# Crear RDD con las aristas
edges_rdd = sc.parallelize(edge_list)

In [20]:
# Aplicar el mapper para generar combinaciones de triángulos
mapped_edges = edges_rdd.flatMap(mapper)

In [21]:
# Agrupar por clave y reducir
grouped_edges = mapped_edges.groupByKey()
triangles = grouped_edges.flatMap(lambda x: reducer(x[0], x[1]))

In [22]:
# Obtener los triángulos encontrados
found_triangles = set(triangles.collect())

In [23]:
# Mostrar los triángulos encontrados
print("Triángulos encontrados:")
for triangle in found_triangles:
    print(triangle)

Triángulos encontrados:
(2, 3, 4)
(1, 3, 4)


In [24]:
# Detener Spark
sc.stop()

# Precalentamiento2: triángulos que usen cualquier arista

De manera similar a la parte **1.1**, pero ajustamos las funciones para que considere que la etiqueta puede ser cualquiera.

In [25]:
# Inicializar Spark
conf = SparkConf().setAppName("PatternFinding").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [26]:
# Función para leer la matriz M y obtener las aristas
def leer_matriz(M, A, L):
    aristas = []
    for i in range(len(M)):
        for j in range(len(M[i])):
            for k in range(len(M[i][j])):
                if M[i][j][k] == 1:
                    aristas.append((A[i], L[j], A[k]))
    return aristas

In [27]:
# Definir nodos (A) de manera simbólica y etiquetas de aristas (L)

A = [1, 2, 3, 4]
L = [11, 12]

In [28]:
# Definir la matriz M (como ejemplo)
M = [
    [
        [0, 1, 0, 0],  # x
        [0, 0, 0, 0],  # y
    ],
    [
        [0, 0, 0, 0],  # z
        [0, 0, 1, 0],  # w
    ],
    [
        [1, 0, 0, 0],  # x
        [0, 0, 0, 0],  # y
    ],
    [
        [0, 0, 0, 0],  # w
        [0, 0, 0, 0],  # x
    ]
]

In [29]:
# Leer la matriz M y obtener las aristas
aristas = leer_matriz(M, A, L)

In [30]:
# Crear RDD con las aristas
edges_rdd = sc.parallelize(aristas)

In [31]:
# Mostrar el contenido del RDD
print(edges_rdd.collect())

[(1, 11, 2), (2, 12, 3), (3, 11, 1)]


punto 3

In [32]:
# Función mapper que genera combinaciones necesarias para formar triángulos
def mapper(arista):
    n1, R, n2 = arista
    b1 = A.index(n1) % b
    b2 = A.index(n2) % b
    results = []

    # Generar todas las combinaciones necesarias para formar triángulos
    keys = [
        (b1, b2, i) for i in range(b)
    ] + [
        (b2, i, b1) for i in range(b)
    ] + [
        (i, b1, b2) for i in range(b)
    ]

    for key in keys:
        results.append((key, (n1, R, n2)))

    return results

In [33]:
# Función reducer que busca triángulos en el grupo de aristas
def reducer(key, values):
    triangles = []
    value_list = list(values)

    # Buscar triángulos en el grupo de aristas
    for i in range(len(value_list)):
        n1, R, n2 = value_list[i]
        for j in range(i+1, len(value_list)):
            m1, S, m2 = value_list[j]
            if n2 == m1:
                for k in range(j+1, len(value_list)):
                    p1, T, p2 = value_list[k]
                    if m2 == p1 and p2 == n1:
                        triangles.append((n1, n2, m2))

    return triangles

In [34]:
# Parámetro b
b = 2  # Este valor se puede ajustar según el tamaño del clúster y el grafo

In [35]:
# Aplicar el mapper para generar combinaciones de triángulos
mapped_edges = edges_rdd.flatMap(mapper)

In [36]:
# Agrupar por clave y reducir
grouped_edges = mapped_edges.groupByKey()
triangles = grouped_edges.flatMap(lambda x: reducer(x[0], x[1]))

In [37]:
# Obtener los triángulos encontrados
found_triangles = set(triangles.collect())

In [38]:
# Mostrar los triángulos encontrados
print("Triángulos encontrados:")
for triangle in found_triangles:
    print(triangle)

Triángulos encontrados:
(1, 2, 3)


In [39]:
# Detener Spark
sc.stop()

#3. Busqueda de patrones de 4 variables

In [40]:
# Convertimos la matriz M en una lista de aristas
def matriz_a_aristas(M, A, L):
    aristas = []
    for i, a1 in enumerate(A):
        for j, a2 in enumerate(A):
            for k, etiqueta in enumerate(L):
                if M[i][k][j] == 1:
                    aristas.append((a1, etiqueta, a2))
    return aristas

In [41]:
# La funcion mapper cambia, para patrones de 4 variables

b = 2  # definimos b como 2, pero puede ser ajustado

def mapper(arista):
    n1, R, n2 = arista
    b1 = n1 % b
    b2 = n2 % b
    results = []

    # Generar todas las combinaciones necesarias para formar patrones
    keys = [
        (b1, b2, 0, 0), (b1, b2, 1, 0),
	      (b1, b2, 0, 1), (b1, b2, 1, 1),
        (0, b1, b2, 0), (1, b1, b2, 0),
        (0, b1, b2, 1), (1, b1, b2, 1),
        (b2, 0, 0, b1), (b2, 1, 0, b1),
        (b2, 0, 1, b1), (b2, 1, 1, b1),

    ]


    for key in keys:
        results.append((key, (n1, R, n2)))

    return results

In [42]:
# Estas dos funciones encuentran los conjuntos de nodos que cumplen con el patron dado como cunsulta.
# Toman como input el conjunto de aristas que comparten el mismo reducer.

def combinar(aristas, patron, etiquetas_permitidas):
    posibles_patrones = []

    # Generar todas las combinaciones de aristas posibles
    for primera_arista in aristas:
        for segunda_arista in aristas:
            if segunda_arista == primera_arista:
                continue
            for tercera_arista in aristas:
                if tercera_arista in [primera_arista, segunda_arista]:
                    continue
                for cuarta_arista in aristas:
                    if cuarta_arista in [primera_arista, segunda_arista, tercera_arista]:
                        continue
                    # Verificar si estas 4 aristas cumplen el patrón
                    if cumple_patron([primera_arista, segunda_arista, tercera_arista, cuarta_arista], patron, etiquetas_permitidas):
                        posibles_patrones.append((primera_arista[0], primera_arista[2], segunda_arista[2], tercera_arista[2]))

    return posibles_patrones

def cumple_patron(aristas, patron, etiquetas_permitidas):
    mapeo = {}
    #for arista, (x, y) in zip(aristas, patron):
    for arista, (x, _, y) in zip(aristas, patron):
        if arista[0] not in mapeo:
            mapeo[arista[0]] = x
        if arista[2] not in mapeo:
            mapeo[arista[2]] = y
        if arista[1] not in etiquetas_permitidas:
            return False
        if mapeo[arista[0]] != x or mapeo[arista[2]] != y:
            return False
    return True

In [45]:
def main_ejecutar_algoritmo(aristas, patron, etiquetas_permitidas):
  sc = SparkContext("local", "PatronCuatroNodos")

  # Convertimos las aristas extraidas de M a un RDD
  aristas_rdd = sc.parallelize(aristas)

  # Generamos las combinaciones necesarias y verificamos patrones
  grouped_edges = aristas_rdd.flatMap(mapper).groupByKey().mapValues(list)
  # grouped_edges consiste en las aristas mapeadas y agrupadas segun reducer

  patrones_rdd_4 = grouped_edges.flatMap(lambda x: combinar(x[1], patron_4, L))

  return set(patrones_rdd_4.collect())


In [46]:
# Vamos a definir datos que funciones como ejemplo:

# Datos recibidos como input
A = [1, 2, 3, 4]
L = [11, 12]
M = [
    [[0,0,0,1],
     [0,0,0,0]],  # desde 1
    [[1,0,0,0],
     [0,0,1,0]],  # desde 2
    [[0,1,0,0],
     [0,1,0,0]],  # desde 3
    [[0,0,1,0],
     [0,0,0,0]]   # desde 4
]

patron_4 = [('x', 'u', 'y'), ('y', 'u', 'z'), ('z', 'u', 'w'), ('w', 'u', 'x')]


aristas = matriz_a_aristas(M, A, L)     #funcion definida para extraer aristas de M


# Ahora ejucutamos el algoritmo
resultados = main_ejecutar_algoritmo(aristas, patron_4, L)
print(resultados)
sc.stop()



{(1, 4, 3, 2), (2, 1, 4, 3), (3, 2, 1, 4), (4, 3, 2, 1)}
