In [1]:
!pip install pyspark
!pip install neo4j

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=badff95065aecf6f66f43954ef33e6e1fb0228c8f976ecbb5e98b0fffabda67d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting neo4j
  Downloading neo4j-5.21.0-py3-none-any.whl (286 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m286.8/286.8 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: neo4j
Successfully installed neo4j-5.21.0


In [2]:
from neo4j import GraphDatabase
from pyspark import SparkContext
import numpy as np
import random

In [None]:
# Función para generar un grafo de prueba de mayor tamaño
def generar_grafo_grande(num_nodos, num_aristas):
    grafo_grande = []
    for _ in range(num_aristas):
        n1 = random.randint(1, num_nodos)
        n2 = random.randint(1, num_nodos)
        if n1 != n2:
            relacion = random.choice([11, 12])
            grafo_grande.append({'n1': n1, 'label': relacion, 'n2': n2})
    return grafo_grande


grafo_grande = generar_grafo_grande(25, 60)
grafo_grande

In [None]:
# Función para eliminar todos los nodos y relaciones en Neo4j
def delete_all_nodes(uri, user, password):
    # Crear el driver de Neo4j
    driver = GraphDatabase.driver(uri, auth=(user, password))

    # Función para borrar todos los nodos y relaciones en Neo4j
    def delete_all(session):
        session.run("MATCH (n) DETACH DELETE n")

    # Ejecutar la función de borrado
    with driver.session() as session:
        delete_all(session)
        print("All nodes and relationships have been deleted.")

delete_all_nodes("bolt://localhost:7687", "neo4j", "123456789")

#### Pregunta 1

In [None]:
# Configuración de Spark
sc = SparkContext("local", "Neo4j to RDD")

def load_and_get_rdd(uri, user, password, nodes, edges, sc=sc):
    # Crear el driver de Neo4j
    driver = GraphDatabase.driver(uri, auth=(user, password))

    # Función para verificar y cargar nodos en Neo4j
    def load_nodes(session, nodes):
        for node in nodes:
            query = "MATCH (n:Node {id: $id}) RETURN n"
            result = session.run(query, id=node['id'])
            if result.single():
                print(f"Node {node['id']} already exists.")
            else:
                query = "MERGE (n:Node {id: $id})"
                session.run(query, id=node['id'])
                print(f"Node {node['id']} created.")

    # Función para verificar y cargar relaciones en Neo4j
    def load_edges(session, edges):
        for edge in edges:
            query = f"""
            MATCH (n1:Node {{id: $n1}})-[r:`{edge['label']}`]->(n2:Node {{id: $n2}}) RETURN r
            """
            result = session.run(query, n1=edge['n1'], n2=edge['n2'])
            if result.single():
                print(f"Relationship {edge['n1']} -[{edge['label']}]-> {edge['n2']} already exists.")
            else:
                query = f"""
                MATCH (n1:Node {{id: $n1}})
                MATCH (n2:Node {{id: $n2}})
                MERGE (n1)-[:`{edge['label']}`]->(n2)
                """
                session.run(query, n1=edge['n1'], n2=edge['n2'])
                print(f"Relationship {edge['n1']} -[{edge['label']}]-> {edge['n2']} created.")

    # Cargar datos en Neo4j
    with driver.session() as session:
        load_nodes(session, nodes)
        load_edges(session, edges)

    # Función para obtener aristas desde Neo4j
    def get_edges_from_neo4j(uri, user, password):
        driver = GraphDatabase.driver(uri, auth=(user, password))
        with driver.session() as session:
            result = session.run("MATCH (n1)-[r]->(n2) RETURN id(n1) AS n1, type(r) AS label, id(n2) AS n2")
            edges = [(record["n1"] + 1, record["label"], record["n2"] + 1) for record in result]
        driver.close()
        return edges

    # Obtener aristas de Neo4j
    edges = get_edges_from_neo4j(uri, user, password)

    # Crear una RDD con las aristas
    edges_rdd = sc.parallelize(edges)

    # Mostrar el contenido de la RDD
    print(edges_rdd.collect())
    return edges_rdd.collect()


In [None]:
# Datos de ejemplo (grafo pequeño)
nodes = [{'id': i} for i in range(1, 7)]
edges = [
    {'n1': 1, 'label': 11, 'n2': 2}, {'n1': 1, 'label': 11, 'n2': 3},
    {'n1': 2, 'label': 11, 'n2': 3}, {'n1': 3, 'label': 11, 'n2': 2},
    {'n1': 3, 'label': 11, 'n2': 4}, {'n1': 4, 'label': 11, 'n2': 1},
    {'n1': 4, 'label': 11, 'n2': 2}, {'n1': 4, 'label': 11, 'n2': 3},
    {'n1': 4, 'label': 12, 'n2': 5}, {'n1': 5, 'label': 12, 'n2': 1},
    {'n1': 5, 'label': 12, 'n2': 2}, {'n1': 5, 'label': 12, 'n2': 6}
]

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .getOrCreate()

sc = spark.sparkContext
sc

In [4]:
#Funciones

def hash(a, b):
  return a%b

def ordenar(x):
    lista = list(x)
    menor_elemento = min(lista)
    indices_menores = [i for i, x in enumerate(lista) if x == menor_elemento]
    rotaciones = [lista[i:] + lista[:i] for i in indices_menores]
    rotacion_ordenada = min(rotaciones)
    return tuple(rotacion_ordenada)

def crear_grafo(x, y):
  if type(x) != list and type(y) != list:
    return [x, y]
  if type(x) == list and type(y) == list:
    return x + y
  if type(x) == list:
    x.append(y)
    return x
  else:
    y.append(x)
    return y



def obtener_triangulos(x):
  completos = []
  largo = len(x)
  aristas = x
  for i in range(largo):
    triangulo = [aristas[i][0], aristas[i][2]]
    for j in range(i + 1, largo):
      buscar = False
      if triangulo[0] == aristas[j][2] and triangulo[1] != aristas[j][0]:
        nuevo_triangulo= [aristas[j][0]] + triangulo
        buscar = True
      elif triangulo[1] == aristas[j][0] and triangulo[0] != aristas[j][2]:
        nuevo_triangulo = [aristas[i][0], aristas[i][2], aristas[j][2]]
        buscar = True
        if buscar:
          for k in range(j + 1, largo):
            if aristas[k][0] == nuevo_triangulo[2] and aristas[k][2] == nuevo_triangulo[0]:
              completos.append(tuple(nuevo_triangulo))
              break
  return completos





In [5]:
#Funcion para encontrar triangulos
def patron_triangulos(grafo, b):
  rdd = sc.parallelize(grafo)
  datos_mapeados = rdd.flatMap(lambda x: [(ordenar((hash(x[0], b), hash(x[2], b),  i)), x)for i in range(b)])
  mini_grafos = datos_mapeados.reduceByKey(lambda x, y: crear_grafo(x, y))
  mini_grafos_filtrados = mini_grafos.filter(lambda x: type(x[1]) == list and len(x[1]) >= 3)
  triangulos = mini_grafos_filtrados.mapValues(lambda x: obtener_triangulos(x))
  triangulos_final= triangulos.flatMap(lambda x: x[1]).collect()
  return(triangulos_final)

In [None]:
#Ejemplo de uso
grafo_1 = [(1,11,2),(1,11,3),(2,11,3),(3,11,2),(3,11,4),(4,11,1),(4,11,2),(4,11,3)]
respuesta = patron_triangulos(grafo_1, 4)
respuesta


[(3, 4, 2), (3, 4, 1)]

Parte 2

In [6]:
#funciones 2
def matriz_a_relaciones(subgrafo, A, L):
  patron = []
  for i in range(len(subgrafo)):
    for j in range(len(subgrafo[i])):
      for k in range(len(subgrafo[i][j])):
        if subgrafo[i][j][k] == 1:
          patron.append((A[i], L[j], A[k]))
  return patron

def relaciones_patron(patron):
  relaciones = []
  for nodo in patron:
    relaciones.append(nodo[1])
  relaciones = set(relaciones)
  return relaciones

In [7]:
#funciones 3
def crear_patron(aristas, R):
  cantidad_relaciones = len(R)
  dicc_nodos = {}

  contador_nodos = 0
  contador_relaciones = 0
  patron_encontrado = [[[0 for _ in range(4)] for _ in range(cantidad_relaciones)] for _ in range(4)]
  for i in aristas:
    if i[0] not in dicc_nodos.keys():
      dicc_nodos[i[0]] = contador_nodos
      contador_nodos += 1
    indice_relacion = R.index(i[1])
    if i[2] not in dicc_nodos.keys():
      dicc_nodos[i[2]] = contador_nodos
      contador_nodos += 1
    if contador_nodos > 4:
      return None, None
    patron_encontrado[dicc_nodos[i[0]]][indice_relacion][dicc_nodos[i[2]]] = 1
  return patron_encontrado, tuple(dicc_nodos.keys())



def encontrar_patron(mini_grafo, patron, R):
  resultados = []
  revisados = []
  for i in mini_grafo:
    for j in mini_grafo:
      if i != j:
        for k in mini_grafo:
          if i != k and j != k:
            for l in mini_grafo:
              if i != l and j != l and k != l:
                if (i, j, k, l) not in revisados:
                  revisados.append((i, j, k, l))
                  patron_encontrado, nodos = crear_patron([i, j, k, l], R)
                  if patron_encontrado == patron:
                      nodos = ordenar(nodos)
                      if nodos not in resultados:
                        resultados.append(nodos)
  return resultados





In [8]:
#funcion para encontrar patrones de 4 aristas y 4 variables
import itertools
def patron_4(grafo, b, A, L, subgrafo):
  patron = matriz_a_relaciones(subgrafo, A, L)
  relaciones = relaciones_patron(patron)
  rdd = sc.parallelize(grafo)
  datos_filtrados = rdd.filter(lambda x: x[1] in relaciones)
  datos_mapeados = datos_filtrados.flatMap(lambda x: [
    ((perm[0], perm[1], perm[2], perm[3]), x)
    for i in range(b) for j in range(b)
    for perm in itertools.permutations((hash(x[0], b), hash(x[2], b), i, j))
])
  mini_grafos = datos_mapeados.reduceByKey(lambda x, y: crear_grafo(x, y))
  mini_grafos_filtrados = mini_grafos.map(lambda x: (x[0], list(set(x[1]))))
  mini_grafos_filtrados_2 = mini_grafos_filtrados.filter(lambda x: type(x[1]) == list and len(x[1]) >= 4)
  return set(mini_grafos_filtrados_2.flatMap(lambda x: encontrar_patron(x[1], subgrafo, L)).collect())



In [9]:
#datos de prueba
grafo_2 = [(1,"11",2),(1,"11",3),(2,"11",3),(3,"11",2),(3,"11",4),(4,"11",1),(4,"11",2),(4,"11",3),(4,"12",5),(5,"12",1),(5,"12",2),(5,"12",6)]

A = ["x", "y", "z", "w"]
L = ["11", "12"]
subgrafo_1 = [[[0, 1 , 0 , 0], [0, 0, 0, 0]], [[0, 0, 1, 0], [0, 0, 0, 0]], [[0, 0, 0, 1], [0, 0, 0, 0]], [[1, 0, 0, 0], [0, 0, 0, 0]]] #(x,11,y), (y,11,z), (z,11,w), (w,11,x)
subgrafo_2 = [[[0, 1 , 0 , 0], [0, 0, 0, 0]], [[0, 0, 1, 0], [0, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 1]], [[0, 0, 0, 0], [1, 0, 0, 0]]] #(x,11,y), (y,11,z), (z,12,w), (w,12,x)





De aqui en adelante se mostrará lo que hace la funcion patron_4 paso a paso

In [10]:
patron = matriz_a_relaciones(subgrafo_1, A, L)
patron

[('x', '11', 'y'), ('y', '11', 'z'), ('z', '11', 'w'), ('w', '11', 'x')]

In [11]:
relaciones = relaciones_patron(patron)
relaciones

{'11'}

In [12]:
rdd = sc.parallelize(grafo_2)
datos_filtrados = rdd.filter(lambda x: x[1] in relaciones)
datos_filtrados.collect()


[(1, '11', 2),
 (1, '11', 3),
 (2, '11', 3),
 (3, '11', 2),
 (3, '11', 4),
 (4, '11', 1),
 (4, '11', 2),
 (4, '11', 3)]

In [13]:
b = 4

datos_mapeados = datos_filtrados.flatMap(lambda x: [
    ((perm[0], perm[1], perm[2], perm[3]), x)
    for i in range(b) for j in range(b)
    for perm in itertools.permutations((hash(x[0], b), hash(x[2], b), i, j))
])

In [14]:
mini_grafos = datos_mapeados.reduceByKey(lambda x, y: crear_grafo(x, y))
mini_grafos.collect()

[((1, 2, 0, 1),
  [(1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 2),
   (4, '11', 2)]),
 ((1, 2, 1, 0),
  [(1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 2),
   (4, '11', 2)]),
 ((1, 0, 2, 1),
  [(1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 2),
   (4, '11', 2)]),
 ((1, 0, 1, 2),
  [(1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 2),
   (4, '11', 2)]),
 ((1, 1, 2, 0),
  [(1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (1, '11', 2),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 1),
   (4, '11', 2),
   (4, '11', 2)]),
 ((1, 1, 0, 2),
  [(1, '11', 2),
   (1, '11', 2),
   (1, '11

In [15]:
mini_grafos_filtrados = mini_grafos.map(lambda x: (x[0], list(set(x[1]))))
mini_grafos_filtrados.collect()

[((1, 2, 0, 1), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((1, 2, 1, 0), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((1, 0, 2, 1), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((1, 0, 1, 2), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((1, 1, 2, 0), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((1, 1, 0, 2), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((2, 1, 0, 1), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((2, 1, 1, 0), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((2, 0, 1, 1), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((0, 1, 2, 1), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((0, 1, 1, 2), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((0, 2, 1, 1), [(4, '11', 1), (4, '11', 2), (1, '11', 2)]),
 ((1, 2, 0, 3),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 2),
   (3, '11', 4),
   (4, '11', 1)]),
 ((1, 2, 3, 0),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 

In [16]:
mini_grafos_filtrados_2 = mini_grafos_filtrados.filter(lambda x: type(x[1]) == list and len(x[1]) >= 4)
mini_grafos_filtrados_2.collect()

[((1, 2, 0, 3),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 2),
   (3, '11', 4),
   (4, '11', 1)]),
 ((1, 2, 3, 0),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 2),
   (3, '11', 4),
   (4, '11', 1)]),
 ((1, 0, 2, 3),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 2),
   (3, '11', 4),
   (4, '11', 1)]),
 ((1, 0, 3, 2),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 2),
   (3, '11', 4),
   (4, '11', 1)]),
 ((1, 3, 2, 0),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 2),
   (3, '11', 4),
   (4, '11', 1)]),
 ((1, 3, 0, 2),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '11', 2),
   (2, '11', 3),
   (1, '11', 2),
   (3, '11', 4),
   (4, '11', 1)]),
 ((2, 1, 0, 3),
  [(3, '11', 2),
   (1, '11', 3),
   (4, '11', 3),
   (4, '1

In [17]:
set(mini_grafos_filtrados.flatMap(lambda x: encontrar_patron(x[1], subgrafo_1, L)).collect())

{(1, 2, 3, 4)}

In [18]:
#Aqui usamos la funcion y vemos que se llega al mismo resultado
respuesta = patron_4(grafo_2, 4, A, L, subgrafo_1)
respuesta

{(1, 2, 3, 4)}

In [19]:
#Probamos con el segundo patrón
respuesta = patron_4(grafo_2, 4, A, L, subgrafo_2)
respuesta

{(1, 3, 4, 5), (2, 3, 4, 5)}

Ahora lo probaremos con un grafo más grande

In [None]:
edges = [(16, '11', 17),
 (5, '12', 18),
 (17, '12', 19),
 (22, '12', 7),
 (23, '11', 5),
 (24, '11', 19),
 (7, '12', 18),
 (5, '12', 3),
 (7, '12', 20),
 (18, '12', 21),
 (7, '12', 19),
 (4, '11', 12),
 (24, '12', 12),
 (21, '12', 7),
 (25, '11', 2),
 (19, '11', 25),
 (8, '12', 4),
 (1, '11', 23),
 (1, '12', 6),
 (7, '11', 1),
 (21, '12', 15),
 (14, '11', 7),
 (3, '12', 8),
 (5, '11', 9),
 (19, '12', 6),
 (10, '11', 24),
 (25, '11', 12),
 (14, '12', 9),
 (25, '12', 10),
 (15, '11', 19),
 (21, '11', 19),
 (17, '11', 24),
 (16, '12', 1),
 (11, '12', 23),
 (19, '11', 16),
 (22, '11', 7),
 (24, '11', 9),
 (17, '11', 12),
 (8, '12', 5),
 (2, '12', 23),
 (17, '12', 21),
 (25, '12', 9),
 (20, '12', 21),
 (8, '12', 2),
 (4, '12', 8),
 (22, '12', 1),
 (17, '12', 23),
 (12, '12', 16),
 (14, '12', 1),
 (6, '12', 7),
 (21, '11', 14),
 (10, '11', 2),
 (13, '11', 23),
 (7, '12', 25),
 (24, '12', 13)]

In [None]:
subgrafo_3 = [[[0, 0 , 0 , 0], [0, 1, 0, 1]], [[0, 0, 0, 0], [0, 0, 1, 0]], [[0, 0, 0, 0], [1, 0, 0, 0]], [[0, 0, 0, 0], [0, 0, 0, 0]]] #(x, 12, y), (y, 12, z), (z, 12, x), (x, 12, w)
A = ["x", "y", "z", "w"]
L = ["11", "12"]

Nuevamente el paso a paso

In [None]:
patron = matriz_a_relaciones(subgrafo_3, A, L)
patron

[('x', '12', 'y'), ('x', '12', 'w'), ('y', '12', 'z'), ('z', '12', 'x')]

In [None]:
relaciones = relaciones_patron(patron)
relaciones

{'12'}

In [None]:
rdd = sc.parallelize(edges)
datos_filtrados = rdd.filter(lambda x: x[1] in relaciones)
datos_filtrados.collect()

[(5, '12', 18),
 (17, '12', 19),
 (22, '12', 7),
 (7, '12', 18),
 (5, '12', 3),
 (7, '12', 20),
 (18, '12', 21),
 (7, '12', 19),
 (24, '12', 12),
 (21, '12', 7),
 (8, '12', 4),
 (1, '12', 6),
 (21, '12', 15),
 (3, '12', 8),
 (19, '12', 6),
 (14, '12', 9),
 (25, '12', 10),
 (16, '12', 1),
 (11, '12', 23),
 (8, '12', 5),
 (2, '12', 23),
 (17, '12', 21),
 (25, '12', 9),
 (20, '12', 21),
 (8, '12', 2),
 (4, '12', 8),
 (22, '12', 1),
 (17, '12', 23),
 (12, '12', 16),
 (14, '12', 1),
 (6, '12', 7),
 (7, '12', 25),
 (24, '12', 13)]

In [None]:
b = 25
datos_mapeados = datos_filtrados.flatMap(lambda x: [
    ((perm[0], perm[1], perm[2], perm[3]), x)
    for i in range(b) for j in range(b)
    for perm in itertools.permutations((hash(x[0], b), hash(x[2], b), i, j))
])
datos_mapeados.collect()

[((5, 18, 0, 0), (5, '12', 18)),
 ((5, 18, 0, 0), (5, '12', 18)),
 ((5, 0, 18, 0), (5, '12', 18)),
 ((5, 0, 0, 18), (5, '12', 18)),
 ((5, 0, 18, 0), (5, '12', 18)),
 ((5, 0, 0, 18), (5, '12', 18)),
 ((18, 5, 0, 0), (5, '12', 18)),
 ((18, 5, 0, 0), (5, '12', 18)),
 ((18, 0, 5, 0), (5, '12', 18)),
 ((18, 0, 0, 5), (5, '12', 18)),
 ((18, 0, 5, 0), (5, '12', 18)),
 ((18, 0, 0, 5), (5, '12', 18)),
 ((0, 5, 18, 0), (5, '12', 18)),
 ((0, 5, 0, 18), (5, '12', 18)),
 ((0, 18, 5, 0), (5, '12', 18)),
 ((0, 18, 0, 5), (5, '12', 18)),
 ((0, 0, 5, 18), (5, '12', 18)),
 ((0, 0, 18, 5), (5, '12', 18)),
 ((0, 5, 18, 0), (5, '12', 18)),
 ((0, 5, 0, 18), (5, '12', 18)),
 ((0, 18, 5, 0), (5, '12', 18)),
 ((0, 18, 0, 5), (5, '12', 18)),
 ((0, 0, 5, 18), (5, '12', 18)),
 ((0, 0, 18, 5), (5, '12', 18)),
 ((5, 18, 0, 1), (5, '12', 18)),
 ((5, 18, 1, 0), (5, '12', 18)),
 ((5, 0, 18, 1), (5, '12', 18)),
 ((5, 0, 1, 18), (5, '12', 18)),
 ((5, 1, 18, 0), (5, '12', 18)),
 ((5, 1, 0, 18), (5, '12', 18)),
 ((18, 5, 

In [None]:
mini_grafos = datos_mapeados.reduceByKey(lambda x, y: crear_grafo(x, y))
mini_grafos.collect()

[((5, 18, 0, 1), [(5, '12', 18), (5, '12', 18)]),
 ((5, 18, 1, 0), [(5, '12', 18), (5, '12', 18)]),
 ((5, 0, 18, 1), [(5, '12', 18), (5, '12', 18)]),
 ((5, 0, 1, 18), [(5, '12', 18), (5, '12', 18)]),
 ((5, 1, 18, 0), [(5, '12', 18), (5, '12', 18)]),
 ((5, 1, 0, 18), [(5, '12', 18), (5, '12', 18)]),
 ((18, 5, 0, 1), [(5, '12', 18), (5, '12', 18)]),
 ((18, 5, 1, 0), [(5, '12', 18), (5, '12', 18)]),
 ((18, 0, 5, 1), [(5, '12', 18), (5, '12', 18)]),
 ((18, 0, 1, 5), [(5, '12', 18), (5, '12', 18)]),
 ((18, 1, 5, 0), [(5, '12', 18), (5, '12', 18)]),
 ((18, 1, 0, 5), [(5, '12', 18), (5, '12', 18)]),
 ((0, 5, 18, 1), [(5, '12', 18), (5, '12', 18)]),
 ((0, 5, 1, 18), [(5, '12', 18), (5, '12', 18)]),
 ((0, 18, 5, 1), [(5, '12', 18), (5, '12', 18)]),
 ((0, 18, 1, 5), [(5, '12', 18), (5, '12', 18)]),
 ((0, 1, 5, 18), [(5, '12', 18), (5, '12', 18)]),
 ((0, 1, 18, 5), [(5, '12', 18), (5, '12', 18)]),
 ((1, 5, 18, 0), [(5, '12', 18), (5, '12', 18)]),
 ((1, 5, 0, 18), [(5, '12', 18), (5, '12', 18)]),


In [None]:
mini_grafos_filtrados = mini_grafos.map(lambda x: (x[0], list(set(x[1]))))
mini_grafos_filtrados.collect()

[((5, 18, 0, 1), [(5, '12', 18)]),
 ((5, 18, 1, 0), [(5, '12', 18)]),
 ((5, 0, 18, 1), [(5, '12', 18)]),
 ((5, 0, 1, 18), [(5, '12', 18)]),
 ((5, 1, 18, 0), [(5, '12', 18)]),
 ((5, 1, 0, 18), [(5, '12', 18)]),
 ((18, 5, 0, 1), [(5, '12', 18)]),
 ((18, 5, 1, 0), [(5, '12', 18)]),
 ((18, 0, 5, 1), [(5, '12', 18)]),
 ((18, 0, 1, 5), [(5, '12', 18)]),
 ((18, 1, 5, 0), [(5, '12', 18)]),
 ((18, 1, 0, 5), [(5, '12', 18)]),
 ((0, 5, 18, 1), [(5, '12', 18)]),
 ((0, 5, 1, 18), [(5, '12', 18)]),
 ((0, 18, 5, 1), [(5, '12', 18)]),
 ((0, 18, 1, 5), [(5, '12', 18)]),
 ((0, 1, 5, 18), [(5, '12', 18)]),
 ((0, 1, 18, 5), [(5, '12', 18)]),
 ((1, 5, 18, 0), [(5, '12', 18)]),
 ((1, 5, 0, 18), [(5, '12', 18)]),
 ((1, 18, 5, 0), [(5, '12', 18)]),
 ((1, 18, 0, 5), [(5, '12', 18)]),
 ((1, 0, 5, 18), [(5, '12', 18)]),
 ((1, 0, 18, 5), [(5, '12', 18)]),
 ((5, 18, 0, 3), [(5, '12', 3), (5, '12', 18)]),
 ((5, 18, 3, 0), [(5, '12', 3), (5, '12', 18)]),
 ((5, 0, 18, 3), [(5, '12', 3), (5, '12', 18)]),
 ((5, 0, 3, 1

In [None]:
mini_grafos_filtrados_2 = mini_grafos_filtrados.filter(lambda x: type(x[1]) == list and len(x[1]) >= 4)
mini_grafos_filtrados_2.collect()

[((5, 18, 3, 8), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((5, 18, 8, 3), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((5, 3, 18, 8), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((5, 3, 8, 18), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((5, 8, 18, 3), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((5, 8, 3, 18), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((18, 5, 3, 8), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((18, 5, 8, 3), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((18, 3, 5, 8), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((18, 3, 8, 5), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((18, 8, 5, 3), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((18, 8, 3, 5), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),
 ((3, 5, 18, 8), [(8, '12', 5), (5, '12', 3), (3, '12', 8), (5, '12', 18)]),

In [None]:
set(mini_grafos_filtrados_2.flatMap(lambda x: encontrar_patron(x[1], subgrafo_3, L)).collect())

{(2, 8, 5, 3),
 (3, 4, 8, 5),
 (3, 8, 18, 5),
 (6, 18, 7, 19),
 (6, 20, 7, 19),
 (6, 25, 7, 19),
 (7, 18, 15, 21),
 (7, 18, 21, 19),
 (7, 18, 21, 20),
 (7, 18, 21, 25),
 (7, 20, 15, 21),
 (7, 20, 21, 18),
 (7, 20, 21, 19),
 (7, 20, 21, 25)}

In [None]:
#Finalmente probamos la funcion y vemos que se llega al mismo resultado
patron_4(edges, 25, A, L, subgrafo_3)

{(2, 8, 5, 3),
 (3, 4, 8, 5),
 (3, 8, 18, 5),
 (6, 18, 7, 19),
 (6, 20, 7, 19),
 (6, 25, 7, 19),
 (7, 18, 15, 21),
 (7, 18, 21, 19),
 (7, 18, 21, 20),
 (7, 18, 21, 25),
 (7, 20, 15, 21),
 (7, 20, 21, 18),
 (7, 20, 21, 19),
 (7, 20, 21, 25)}