In [1]:
!pip install pyspark
!pip install neo4j


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=82ef023f1cd65bb499177a7267674f5dbc518ca16d727127111ac3311882fe54
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting neo4j
  Downloading neo4j-5.21.0-py3-none-any.whl (286 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m286.8/286.8 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: neo4j
Successfully installed neo4j-5.21.0


In [2]:
from pyspark.sql import SparkSession

In [3]:
import neo4j
from neo4j import GraphDatabase
import pandas as pd

# Subir datos a una Base de Neo4j AuraDB

### Para la tarea hicimos uso de los datos que se entregaron en las actividades, en Grafos, pues tenian varias conexiones que permitian realizar pruebas a nuestro codigo, para el problema de los triangulos y rectangulos

### Si los datos ya se encuentran subidos hay que saltar esto

#### Extraido del jupyter notebook de Grafos en el repositorio del curso Syllabus-2024-1 -> Actividades -> 06 - Grafos -> cora

https://github.com/IIC2440/Syllabus-2024-1/tree/main/Actividades/06%20-%20Grafos

## Conexion a la base

In [42]:
### Conexión a la BD
URI = ""
AUTH = ("user","password")

driver = GraphDatabase.driver(URI, auth=AUTH)
with driver.session() as session:
    try:
        session.run("RETURN 1")
        print("Connection to Neo4j established successfully!")
    except Exception as e:
        print(f"Failed to connect to Neo4j: {e}")

Connection to Neo4j established successfully!


## Subir los datos

#### Si esta en colab hay que crear una carpeta llamada "cora" y en esta agregar los datos "cora.cities" y "cora.content"


In [None]:
### Datos desde CSV a pandas

#### papers que citan a otro paper

cites = pd.read_csv('cora/cora.cites',sep='\t',header=None,names=['target','source'])
cites

citas_por_paper = cites.groupby('target')['source'].nunique()
citas_por_paper

#### papers y los temas de los papers.

column_names = ["paper_id"] + [f"word_{idx}" for idx in range(1433)] + ["subject"]
papers = pd.read_csv(
    'cora/cora.content', sep="\t", names=column_names,
)
subjects = papers[["paper_id","subject"]]
subjects

Unnamed: 0,paper_id,subject
0,31336,Neural_Networks
1,1061127,Rule_Learning
2,1106406,Reinforcement_Learning
3,13195,Reinforcement_Learning
4,37879,Probabilistic_Methods
...,...,...
2703,1128975,Genetic_Algorithms
2704,1128977,Genetic_Algorithms
2705,1128978,Genetic_Algorithms
2706,117328,Case_Based


## Mandar los datos a la base

In [None]:
### Metiendo datos en Neo4j

### estas consultas procesan los diccionarios de input y lo van metiendo a neo4j

def load_papers(session, subjects):
    query = """
    UNWIND $nodes AS row
    MERGE (p:Paper {id: row.paper_id})
    SET p.subject = row.subject;
    """
    session.run(query, nodes=subjects)

def load_relationships(session, cites):
    query = """
    UNWIND $edges AS row
    MATCH (p1:Paper {id: row.source})
    MATCH (p2:Paper {id: row.target})
    MERGE (p1)-[:CITES]->(p2);
    """
    session.run(query, edges=cites)

In [None]:
### agregamos datos y consultamos

with driver.session() as session:
    load_papers(session,subjects.to_dict('records'))
    load_relationships(session,cites.to_dict('records'))
    query = "MATCH (n) RETURN n LIMIT 5"
    result = session.run(query)
    for record in result:
        print(record)


In [43]:
driver.close()

# Tarea

## Parte 1 Precalentamiento: triángulos, El precalentamiento 1.1 no se hizo, pues el 1.2, lo contempla pues la diferencia es que se añaden más llaves donde las posiciones de los hash cambian.

In [45]:
spark = SparkSession.builder \
    .getOrCreate()

sc = spark.sparkContext
sc

In [46]:
data = [(1, 11, 2), (1, 11, 3), (2, 11, 3), (3, 11, 2), (3, 11, 4), (4, 11, 1), (4, 11, 2),
        (4, 11, 3), (4, 12, 5), (5, 12, 1), (5, 12, 2), (5, 12, 6)]

rdd = spark.sparkContext.parallelize(data)

# Crear un DataFrame a partir del RDD
df = spark.createDataFrame(rdd, ["Nodo", "Arista", "Nodo"])
df.show()

+----+------+----+
|Nodo|Arista|Nodo|
+----+------+----+
|   1|    11|   2|
|   1|    11|   3|
|   2|    11|   3|
|   3|    11|   2|
|   3|    11|   4|
|   4|    11|   1|
|   4|    11|   2|
|   4|    11|   3|
|   4|    12|   5|
|   5|    12|   1|
|   5|    12|   2|
|   5|    12|   6|
+----+------+----+



### Funcion de Hash por medio del modulo

In [47]:
def hash(node, b):
    return node % b

### Funcion para realizar el mapeo de las llaves para cada relación

In [48]:
def map_function(edges, b):
    mapped = []
    for edge in edges:
        n1, R, n2 = edge
        b1 = hash(n1, b)
        b2 = hash(n2, b)
        for i in range(b):
            key = (b1, b2, i)
            key2 =  (b1, i, b2)
            key3 = (i, b1, b2)

            mapped.append((key, (n1, R, n2)))
            mapped.append((key2, (n1, R, n2)))
            mapped.append((key3, (n1, R, n2)))

    return mapped

### Funcion para determinar los triangulos en cada subgrafo

In [84]:
def reduce_function(key, edges):
    triangles = []

    for edge1 in edges:
        for edge2 in edges:
            if edge1[2] == edge2[0]:
                for edge3 in edges:
                    if edge3[0] == edge2[2] and edge3[2] == edge1[0]:
                        triangles.append((edge1[0], edge1[2], edge3[0]))
    return triangles

In [50]:
# Datos de ejemplo
edges = [(1, 11, 2), (1, 11, 3), (2, 11, 3), (3, 11, 2), (3, 11, 4), (4, 11, 1), (4, 11, 2),
 (4, 11, 3), (4, 12, 5), (5, 12, 1), (5, 12, 2), (5, 12, 6), (2, 11, 3), (3, 11, 1), (2, 11, 4)]

# RDD
rdd = sc.parallelize(edges)

# Parámetro b
b = 2

# Fase de Map
mapped_rdd = rdd.flatMap(lambda edge: map_function([edge], b))
mapped_rdd.collect()

[((1, 0, 0), (1, 11, 2)),
 ((1, 0, 0), (1, 11, 2)),
 ((0, 1, 0), (1, 11, 2)),
 ((1, 0, 1), (1, 11, 2)),
 ((1, 1, 0), (1, 11, 2)),
 ((1, 1, 0), (1, 11, 2)),
 ((1, 1, 0), (1, 11, 3)),
 ((1, 0, 1), (1, 11, 3)),
 ((0, 1, 1), (1, 11, 3)),
 ((1, 1, 1), (1, 11, 3)),
 ((1, 1, 1), (1, 11, 3)),
 ((1, 1, 1), (1, 11, 3)),
 ((0, 1, 0), (2, 11, 3)),
 ((0, 0, 1), (2, 11, 3)),
 ((0, 0, 1), (2, 11, 3)),
 ((0, 1, 1), (2, 11, 3)),
 ((0, 1, 1), (2, 11, 3)),
 ((1, 0, 1), (2, 11, 3)),
 ((1, 0, 0), (3, 11, 2)),
 ((1, 0, 0), (3, 11, 2)),
 ((0, 1, 0), (3, 11, 2)),
 ((1, 0, 1), (3, 11, 2)),
 ((1, 1, 0), (3, 11, 2)),
 ((1, 1, 0), (3, 11, 2)),
 ((1, 0, 0), (3, 11, 4)),
 ((1, 0, 0), (3, 11, 4)),
 ((0, 1, 0), (3, 11, 4)),
 ((1, 0, 1), (3, 11, 4)),
 ((1, 1, 0), (3, 11, 4)),
 ((1, 1, 0), (3, 11, 4)),
 ((0, 1, 0), (4, 11, 1)),
 ((0, 0, 1), (4, 11, 1)),
 ((0, 0, 1), (4, 11, 1)),
 ((0, 1, 1), (4, 11, 1)),
 ((0, 1, 1), (4, 11, 1)),
 ((1, 0, 1), (4, 11, 1)),
 ((0, 0, 0), (4, 11, 2)),
 ((0, 0, 0), (4, 11, 2)),
 ((0, 0, 0),

In [51]:
# Agrupar por llave
grouped_rdd = mapped_rdd.groupByKey()
grouped_rdd.collect()

[((1, 0, 0), <pyspark.resultiterable.ResultIterable at 0x7e72376cc8b0>),
 ((0, 1, 0), <pyspark.resultiterable.ResultIterable at 0x7e72376ce410>),
 ((1, 1, 1), <pyspark.resultiterable.ResultIterable at 0x7e72376cde40>),
 ((0, 0, 1), <pyspark.resultiterable.ResultIterable at 0x7e72376ccb20>),
 ((1, 0, 1), <pyspark.resultiterable.ResultIterable at 0x7e72376ceda0>),
 ((1, 1, 0), <pyspark.resultiterable.ResultIterable at 0x7e72376cdea0>),
 ((0, 1, 1), <pyspark.resultiterable.ResultIterable at 0x7e72376cf790>),
 ((0, 0, 0), <pyspark.resultiterable.ResultIterable at 0x7e72376cf430>)]

Resultados

In [52]:
# Fase de Reduce
triangles_rdd = grouped_rdd.flatMap(lambda x: reduce_function(x[0], list(x[1])))
list(set(triangles_rdd.collect()))

[(2, 3, 1),
 (4, 3, 2),
 (1, 2, 3),
 (2, 4, 1),
 (1, 3, 4),
 (3, 4, 2),
 (2, 3, 4),
 (3, 2, 4),
 (2, 4, 3),
 (4, 2, 3),
 (3, 4, 1),
 (4, 1, 3),
 (3, 1, 2),
 (4, 1, 2),
 (1, 2, 4)]

In [53]:
spark.stop()

## Parte 2

In [101]:
### Conexión a la BD
URI = ""
AUTH = ("user","password")

driver = GraphDatabase.driver(URI, auth=AUTH)
with driver.session() as session:
    try:
        session.run("RETURN 1")
        print("Connection to Neo4j established successfully!")
    except Exception as e:
        print(f"Failed to connect to Neo4j: {e}")

Connection to Neo4j established successfully!


### Obtención de las relaciones dentro del grafo, este extrae todas las relaciones tales que n1 - R -> n2, con n1 y n2 nodos y R una relación cualquiera

### De estas solamente se extraen los *id´s*, para los nodos y en el caso de las relaciones solo extremos el *type*

In [102]:
### estas consultas procesan los diccionarios de input y lo van metiendo a neo4j

def get_all_relationships(tx):
    query = "MATCH (n)-[r]->(m) RETURN n, r, m"
    result = tx.run(query)
    relationships = []
    for record in result:
        # Extraccion del id de cada nodo y del type de la relacion
        node1 = record[0]["id"]
        relationship = record[1].type
        node2 = record[2]["id"]
        relationships.append((node1, relationship, node2))
    return relationships

In [103]:
with driver.session() as session:
    result = get_all_relationships(session)


### Sample de los primeros 5 datos obtenidos

In [104]:
result[:5]

[(31336, 'CITES', 31353),
 (31336, 'CITES', 10531),
 (1061127, 'CITES', 2440),
 (1106406, 'CITES', 23774),
 (1106406, 'CITES', 6169)]

### Crear la instacia del Spark

In [105]:
spark2 = SparkSession.builder.getOrCreate()

sc2 = spark2.sparkContext
sc2

## Implementa un programa en PySpark que entregue todos los tri´angulos (como tuplas de tres nodos) en el grafo usando b3 reducers, donde b es un par´ametro. Para esta primera parte puedes asumir que tu grafo solo usa una etiqueta de arista (en el grafo de prueba, esa etiqueta corresponde al numero 11).

### Funcion de Hash, esta hashea por el modulo

In [106]:
def hash_to_number(code, b):
    return code % b

### Funcion usada para el map, lo que hace es crear las llaves para cada relacion del tipo n1-R->n2

In [107]:
def map_function2(edges, b):
    mapped1 = []

    for edge in edges:
        # Desempaquetamos la relacion
        n1, R, n2 = edge
        # Aplicamos el Hash a los nodos
        b1 = hash_to_number(n1, b)
        b2 = hash_to_number(n2, b)
        # Creamos las llaves
        for i in range(b):
            key1 = (b1, b2, i)
            key2 = (b1, i, b2)
            key3 = (i, b1, b2)

            # Agregamos las llaves
            mapped1.append((key1, (n1, R, n2)))
            mapped1.append((key2, (n1, R, n2)))
            mapped1.append((key3, (n1, R, n2)))

    return list(set(mapped1))

### Se realiza el RDD que se solicita en el Punto 1 de la Parte 2 y luego se mapea, por la función anterior

In [108]:
rdd = sc2.parallelize(result)

# Parámetro b
b = 2

# Fase de Map
mapped_rdd = rdd.flatMap(lambda edge: map_function2([edge], b))
mapped_rdd.collect()

[((0, 1, 0), (31336, 'CITES', 31353)),
 ((1, 0, 1), (31336, 'CITES', 31353)),
 ((0, 0, 1), (31336, 'CITES', 31353)),
 ((0, 1, 1), (31336, 'CITES', 31353)),
 ((1, 0, 1), (31336, 'CITES', 10531)),
 ((0, 0, 1), (31336, 'CITES', 10531)),
 ((0, 1, 1), (31336, 'CITES', 10531)),
 ((0, 1, 0), (31336, 'CITES', 10531)),
 ((0, 1, 0), (1061127, 'CITES', 2440)),
 ((1, 1, 0), (1061127, 'CITES', 2440)),
 ((1, 0, 1), (1061127, 'CITES', 2440)),
 ((1, 0, 0), (1061127, 'CITES', 2440)),
 ((0, 0, 1), (1106406, 'CITES', 23774)),
 ((0, 0, 0), (1106406, 'CITES', 23774)),
 ((1, 0, 0), (1106406, 'CITES', 23774)),
 ((0, 1, 0), (1106406, 'CITES', 23774)),
 ((1, 0, 1), (1106406, 'CITES', 6169)),
 ((0, 0, 1), (1106406, 'CITES', 6169)),
 ((0, 1, 1), (1106406, 'CITES', 6169)),
 ((0, 1, 0), (1106406, 'CITES', 6169)),
 ((0, 0, 0), (1106406, 'CITES', 114)),
 ((1, 0, 0), (1106406, 'CITES', 114)),
 ((0, 1, 0), (1106406, 'CITES', 114)),
 ((0, 0, 1), (1106406, 'CITES', 114)),
 ((0, 0, 1), (1106406, 'CITES', 6213)),
 ((0, 1,

### La función de Reduce, considera las relaciones triangulares.

In [109]:
def reduce_function2(key, edges):
    triangles = []  # Usar un conjunto para evitar duplicados

    for edge1 in edges:
        for edge2 in edges:
            if edge1[2] == edge2[0]:
                for edge3 in edges:
                    if edge3[0] == edge2[2] and edge3[2] == edge1[0]:
                        # Ordenar los vértices del triángulo para asegurar una única representación
                        triangle = [edge1[0], edge2[0], edge3[0]]
                        triangles.append(tuple(triangle))

    return triangles

### Se agrupa por llave, para crear los subgrafos

In [110]:
# Agrupar por clave
grouped_rdd = mapped_rdd.groupByKey()
grouped_rdd.collect()

[((0, 1, 0), <pyspark.resultiterable.ResultIterable at 0x7e723d37df00>),
 ((0, 0, 1), <pyspark.resultiterable.ResultIterable at 0x7e723d2410c0>),
 ((1, 0, 0), <pyspark.resultiterable.ResultIterable at 0x7e723d240460>),
 ((1, 1, 1), <pyspark.resultiterable.ResultIterable at 0x7e723d2424a0>),
 ((1, 0, 1), <pyspark.resultiterable.ResultIterable at 0x7e723cf067a0>),
 ((0, 1, 1), <pyspark.resultiterable.ResultIterable at 0x7e72366beb60>),
 ((1, 1, 0), <pyspark.resultiterable.ResultIterable at 0x7e72366bff40>),
 ((0, 0, 0), <pyspark.resultiterable.ResultIterable at 0x7e72366bf850>)]

### Determina las relaciones con forma triangular, mediante la función *reduce_function2*

In [111]:
triangles_rdd = grouped_rdd.flatMap(lambda x: reduce_function2(x[0], list(x[1])))
list(set(triangles_rdd.collect()))

[(210871, 35061, 35),
 (2695, 342802, 2698),
 (643221, 643485, 644577),
 (753070, 753047, 753264),
 (184918, 23502, 330148),
 (33818, 9586, 78557),
 (210871, 273152, 35),
 (16819, 643239, 643221),
 (753264, 753070, 753047),
 (51180, 67415, 12350),
 (67415, 12350, 51180),
 (34263, 34266, 87482),
 (49811, 1997, 3233),
 (342802, 2698, 2695),
 (561568, 561613, 153063),
 (648121, 648112, 648106),
 (20178, 91852, 64271),
 (33818, 9586, 78552),
 (194617, 31932, 215912),
 (51180, 12195, 12350),
 (12350, 51180, 12195),
 (643485, 642894, 643221),
 (642894, 643221, 643239),
 (78557, 78552, 33818),
 (9586, 78557, 33818),
 (273152, 35, 210871),
 (643239, 643221, 16819),
 (2698, 2695, 342802),
 (3191, 5086, 3192),
 (60159, 399370, 60169),
 (56112, 12576, 83725),
 (644577, 643221, 643485),
 (284025, 143801, 119761),
 (64271, 20178, 91852),
 (78557, 33818, 9586),
 (643221, 643485, 642894),
 (78552, 33818, 9586),
 (28026, 5064, 5069),
 (126927, 645897, 126920),
 (35, 210871, 273152),
 (3233, 49811, 199

# Rectangulos

 ## Asume ahora que recibes un subgrafo como tres arreglos: un arreglo A con las variables, otro L con los tipos de aristas, y una matriz M de tamaño |A|×|L|×|A| que tiene un uno en la posicion (x,R,y) si y solo si (x,R,y) es una arista de tu subgrafo.

In [112]:
import numpy as np
import pandas as pd
import random

Para Pruebas

Obtener la matriz de listas de listas de listas

In [113]:
# Extraido de ChatGPT

def create_matrix(A, B):
    """
    Crea una matriz de ceros con dimensiones |A| x |B| x |A|.

    Args:
        A (list): Lista de elementos A.
        B (list): Lista de elementos B.

    Returns:
        np.ndarray: Matriz de ceros con dimensiones |A| x |B| x |A|.
    """
    return np.zeros((len(A), len(B), len(A)))

In [114]:
# Extraido de ChatGPT
def extract_tuples_from_matrix(matrix, A, B):
    """
    Extrae todas las tuplas (n1, R, n2) donde matrix[a][b][c] == 1.

    Args:
        matrix (np.ndarray): Matriz con dimensiones |A| x |B| x |A|.
        A (list): Lista de elementos A.
        B (list): Lista de elementos B.

    Returns:
        list: Lista de tuplas (n1, R, n2) donde matrix[a][b][c] == 1.
    """
    tuples = []
    for i in range(len(A)):
        for j in range(len(B)):
            for k in range(len(A)):
                if matrix[i, j, k] == 1:
                    n1 = A[i]
                    R = B[j]
                    n2 = A[k]
                    tuples.append((n1, R, n2))
    return tuples

In [115]:
def apply_values(matrix, A, B):
  # Genera un grafico aleatorio en una matriz
  for i in range(len(A)):
    for j in range(len(B)):
      for k in range(len(A)):
        if random.random() >= 0.6:
          matrix[i][j][k] = random.randint(0, 1)

  return matrix

Datos, entregar el A, L y M

In [116]:
A = [a for a in range(40)]
L = ['hola', 'chao']
matrix = create_matrix(A, L) # Matriz de listas de listas de listas, la primera lista indica el valor en A, la segunda el valor de L y la tercera el valor de A)

In [117]:
matrix = apply_values(matrix, A, L) # Saltar si hay una matriz definida con valores 1 en ella, esto solo agrega 1's a una matriz de solo 0's.

In [118]:
tuples = extract_tuples_from_matrix(matrix, A, L) # Extraer los datos

In [119]:
tuples[:5]

[(0, 'hola', 11),
 (0, 'hola', 12),
 (0, 'hola', 15),
 (0, 'hola', 22),
 (0, 'hola', 35)]

Aplicacion del codigo de PySpark

In [120]:
spark3 = SparkSession.builder.getOrCreate()

sc3 = spark3.sparkContext
sc3

In [121]:
rdd_rectangle = sc3.parallelize(tuples)

# Parámetro b
b = 2

# Fase de Map
mapped_rdd_matrix = rdd_rectangle.flatMap(lambda edge: map_function2([edge], b))
mapped_rdd_matrix.collect()

[((1, 0, 1), (0, 'hola', 11)),
 ((0, 0, 1), (0, 'hola', 11)),
 ((0, 1, 1), (0, 'hola', 11)),
 ((0, 1, 0), (0, 'hola', 11)),
 ((0, 0, 1), (0, 'hola', 12)),
 ((0, 0, 0), (0, 'hola', 12)),
 ((1, 0, 0), (0, 'hola', 12)),
 ((0, 1, 0), (0, 'hola', 12)),
 ((0, 0, 1), (0, 'hola', 15)),
 ((0, 1, 1), (0, 'hola', 15)),
 ((0, 1, 0), (0, 'hola', 15)),
 ((1, 0, 1), (0, 'hola', 15)),
 ((0, 0, 1), (0, 'hola', 22)),
 ((0, 0, 0), (0, 'hola', 22)),
 ((1, 0, 0), (0, 'hola', 22)),
 ((0, 1, 0), (0, 'hola', 22)),
 ((0, 0, 1), (0, 'hola', 35)),
 ((0, 1, 1), (0, 'hola', 35)),
 ((0, 1, 0), (0, 'hola', 35)),
 ((1, 0, 1), (0, 'hola', 35)),
 ((1, 0, 0), (0, 'hola', 38)),
 ((0, 1, 0), (0, 'hola', 38)),
 ((0, 0, 1), (0, 'hola', 38)),
 ((0, 0, 0), (0, 'hola', 38)),
 ((0, 1, 0), (0, 'hola', 39)),
 ((1, 0, 1), (0, 'hola', 39)),
 ((0, 0, 1), (0, 'hola', 39)),
 ((0, 1, 1), (0, 'hola', 39)),
 ((1, 0, 0), (0, 'chao', 2)),
 ((0, 0, 1), (0, 'chao', 2)),
 ((0, 0, 0), (0, 'chao', 2)),
 ((0, 1, 0), (0, 'chao', 2)),
 ((1, 0, 1),

In [122]:
# Agrupar por clave
grouped_rdd_matrix = mapped_rdd_matrix.groupByKey()
grouped_rdd_matrix.collect()

[((0, 0, 1), <pyspark.resultiterable.ResultIterable at 0x7e7236e91090>),
 ((0, 1, 0), <pyspark.resultiterable.ResultIterable at 0x7e7236e92710>),
 ((1, 0, 0), <pyspark.resultiterable.ResultIterable at 0x7e7236e93610>),
 ((1, 1, 1), <pyspark.resultiterable.ResultIterable at 0x7e7236e90130>),
 ((1, 0, 1), <pyspark.resultiterable.ResultIterable at 0x7e7236e91900>),
 ((0, 1, 1), <pyspark.resultiterable.ResultIterable at 0x7e7236e90940>),
 ((0, 0, 0), <pyspark.resultiterable.ResultIterable at 0x7e7236e92050>),
 ((1, 1, 0), <pyspark.resultiterable.ResultIterable at 0x7e7236e93be0>)]

In [123]:
def reduce_function_rectangles(key, edges):
    rectangles = []  # Usar un conjunto para evitar duplicados

    for edge1 in edges:
        for edge2 in edges:
          if edge1 == edge2:
            continue
          else:
            if edge1[2] == edge2[0]:
                for edge3 in edges:
                  if edge2 == edge3 or edge1 == edge3:
                    continue
                  else:
                    if edge2[2] == edge3[0]:
                      for edge4 in edges:
                        if edge2 == edge4 or edge1 == edge4 or edge3 == edge4:
                          continue
                        else:
                          if edge3[2] == edge4[0] and edge4[2] == edge1[0]: # (x, R, y) (y, O ,z) (z, L, w) (w, K, x)
                          # Ordenar los vértices del triángulo para asegurar una única representación
                            rectangle = [edge1[0], edge2[0], edge3[0], edge4[0]]
                            rectangles.append(tuple(rectangle))

    return rectangles # Aplicacion del set para eliminar repetidos

In [124]:
rectangles_rdd_matrix = grouped_rdd_matrix.flatMap(lambda x: reduce_function_rectangles(x[0], list(x[1])))

In [125]:
list(set(rectangles_rdd_matrix.collect()))

[(8, 8, 16, 25),
 (16, 14, 7, 34),
 (12, 15, 6, 32),
 (22, 30, 35, 0),
 (38, 23, 25, 5),
 (36, 34, 20, 35),
 (10, 27, 15, 21),
 (15, 18, 27, 18),
 (11, 1, 1, 22),
 (10, 26, 16, 17),
 (5, 15, 14, 31),
 (0, 39, 25, 39),
 (25, 21, 15, 18),
 (38, 22, 39, 0),
 (0, 25, 26, 23),
 (39, 25, 5, 36),
 (28, 38, 6, 18),
 (38, 6, 20, 14),
 (24, 38, 6, 12),
 (31, 33, 27, 14),
 (34, 0, 35, 8),
 (8, 0, 22, 35),
 (12, 14, 14, 17),
 (3, 0, 39, 19),
 (39, 25, 35, 36),
 (26, 5, 36, 22),
 (19, 32, 39, 15),
 (16, 39, 27, 15),
 (0, 5, 13, 29),
 (2, 24, 36, 36),
 (10, 26, 0, 22),
 (29, 32, 39, 8),
 (31, 12, 15, 19),
 (17, 13, 11, 28),
 (23, 23, 39, 38),
 (23, 25, 31, 33),
 (31, 31, 1, 4),
 (2, 25, 28, 0),
 (24, 30, 0, 12),
 (12, 33, 30, 29),
 (32, 38, 22, 29),
 (31, 18, 13, 29),
 (29, 10, 34, 22),
 (38, 23, 10, 16),
 (23, 8, 19, 23),
 (5, 8, 1, 1),
 (25, 37, 19, 34),
 (29, 25, 25, 13),
 (33, 0, 36, 30),
 (33, 27, 35, 31),
 (34, 16, 32, 19),
 (17, 36, 9, 14),
 (37, 23, 23, 39),
 (9, 5, 37, 28),
 (24, 1, 12, 11)

## Implementa un programa en PySpark que reciba un patrón que tiene solo variables, y exactamente cuatro variables, y entregue todos los matches de ese patrón (como tuplas de 4 nodos) en el grafo usando b^4 reducers, donde b es un parámetro.

### Obtener rectangulos mediante *reduce_function_rectangles*, esta realiza el reduce pero para relaciones con forma rectangular.

### Aqui se aplica en la instancia previa del *grouped_rdd*, pues se utilizan las mismas llaves y con ello los mismos subgrafos

In [126]:
rectangles_rdd = grouped_rdd.flatMap(lambda x: reduce_function_rectangles(x[0], list(x[1])))

In [127]:
len(list(set(rectangles_rdd.collect())))

144

In [128]:
list(set(rectangles_rdd.collect()))

[(78552, 33818, 9586, 78511),
 (5086, 3187, 5086, 3191),
 (33818, 78552, 33818, 78557),
 (50381, 62274, 32260, 62274),
 (12350, 51180, 67415, 12210),
 (5086, 3191, 5086, 3187),
 (753265, 753047, 753070, 753047),
 (66986, 39210, 66982, 39210),
 (124224, 6898, 12631, 6898),
 (12195, 12350, 51180, 38722),
 (5064, 28026, 5064, 5069),
 (5869, 910, 5462, 910),
 (3192, 3191, 5086, 3191),
 (126927, 126920, 126927, 103543),
 (16819, 643221, 643239, 643221),
 (103543, 126927, 126920, 126927),
 (3191, 3192, 3191, 5086),
 (79809, 11342, 79809, 358866),
 (342802, 2695, 2698, 2695),
 (74698, 23070, 87915, 23070),
 (5462, 910, 5869, 910),
 (643239, 643221, 643485, 643221),
 (20924, 289885, 20924, 294030),
 (35061, 141342, 35, 210871),
 (210871, 35061, 141342, 35),
 (67415, 12210, 12350, 51180),
 (56112, 83725, 12576, 83725),
 (62274, 32260, 62274, 50381),
 (753070, 753047, 753264, 753047),
 (67246, 31769, 67245, 31769),
 (3112, 77826, 3112, 77829),
 (643221, 643485, 643221, 643239),
 (273152, 210871,

### Cerramos las sesiones de Spark y la conexion con el server

In [129]:
spark2.stop()
spark3.stop()
driver.close()
