In [1]:
!pip install pyspark neo4j



In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

# Pregunta 1: Conexión a BBDD

In [3]:
import neo4j
from neo4j import GraphDatabase
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

In [4]:
### Conexión a la BD neo4j aura
URI = "neo4j+s://e52029e3.databases.neo4j.io"
AUTH = ("neo4j","ZckCRWcvmVzIiHrHDRd0e5ebQEJJL7iwAzjsn5uC4O0")

driver = GraphDatabase.driver(URI, auth=AUTH)
with driver.session() as session:
    try:
        session.run("RETURN 1")
        print("Connection to Neo4j established successfully!")
    except Exception as e:
        print(f"Failed to connect to Neo4j: {e}")

Connection to Neo4j established successfully!


In [5]:
# queries que utilizamos para probar neo4j

"""
limpiar_bd
————————————————————
session: sesión de conexión a bbdd
————————————————————
Funcion que limpia la base de datos cloud de neo4j. (actualmente tiene los mismos datos de prueba del github de la Tarea)
return vacio.
"""
def limpiar_bd(session):
    query = """
    MATCH (n) DETACH DELETE n
    """
    session.run(query)

"""
cargar_vertices
————————————————————
session: sesión de conexión a bbdd
vertices:  pandas dataframe con los vertices del grafo
————————————————————
Funcion que carga nodos bajo cierto formato a la base de datos cloud de neo4j.
return vacio.
"""
def cargar_vertices(session, vertices):
    query = """
    UNWIND $nodes AS row
    MERGE (p:Variable {id: row})
    """
    session.run(query, nodes=vertices)

"""
cargar_aristas
————————————————————
session: sesión de conexión a bbdd
aristas:  pandas dataframe con las aristas del grafo
————————————————————
Funcion que las carga aristas bajo cierto formato a la base de datos cloud de neo4j.
return vacio.
"""
def cargar_aristas(session, aristas):
    query = """
    UNWIND $edges AS row
    MATCH (p1:Variable {id: row.tail})
    MATCH (p2:Variable {id: row.head})
    MERGE (p1)-[:apunta {relationship_type: row.label}]->(p2);
    """

    # se ajusta el df aristas para que se pueda procesar por neo4j
    session.run(query, edges=aristas.to_dict(orient='records')
)

In [6]:
# # Considerando una flecha, la dirección es desde tail hasta head.

"""
get_all_relationships_bd
————————————————————
session: sesión de conexión a bbdd
sc: contexto de spark
————————————————————
Funcion que busca aristas de neo4j cloud. Notar que los datos cargados se
procesan considerando el formato de las functiones de carga definidas previamente
return todas las aristas del grafo como un rdd.
"""
def csv_grafo(session, csv):
    aristas = pd.read_csv(csv, header=None,names=['tail','label','head'])
    nodos = pd.concat([aristas['tail'],aristas['head']]).unique()
    cargar_vertices(session,nodos)
    cargar_aristas(session,aristas)
    query = "MATCH (n) RETURN n LIMIT 5"
    grafo = session.run(query)
    return grafo

# with driver.session() as session:
#     grafo = csv_grafo(session,'data.csv')

In [7]:
"""
get_all_relationships_bd
————————————————————
session: sesión de conexión a bbdd
sc: contexto de spark
————————————————————
Funcion que busca aristas de neo4j cloud. Notar que los datos cargados se
procesan considerando el formato de las functiones de carga definidas previamente
return todas las aristas del grafo como un rdd.
"""
def get_all_relationships_bd(session, sc):
  query = """
  MATCH (p1:Variable)-[r]->(p2:Variable)
  RETURN p1.id AS tail, p2.id AS head, r.relationship_type AS label
  """
  # notar que result tiene un grafo de neo4j
  result = session.run(query)
  records = [(record["tail"], record["label"], record["head"]) for record in result]
  return sc.parallelize(records)


"""
get_all_relationships_graph
————————————————————
graph: grafo de neo4j
sc: contexto de spark
————————————————————
Funcion para transformar directamente un grafo.
return grafo del formato esperado como un rdd de pyspark.
"""
def get_all_relationships_graph(graph, sc):
    records = [(record["tail"], record["label"], record["head"]) for record in graph]
    return sc.parallelize(records)

# Pregunta 2: Búsqueda de triángulos dirigidos con una etiqueta fija

In [8]:
"""
hashFunction
————————————————————
x: datos a hashear
b: parametro b
————————————————————
Funcion de hash con x modulo b
return numero hasheado
"""
def hashFunction(x, b):
  return x % b

"""
keyGenerator
————————————————————
x: datos a generar las llaves
b: parametro b
————————————————————
Generacion de cada una de las combinaciones posibles de
(b1,b2,0) ... (b1,b2,b-1), (0,b1,b2) ... (b-1,b1,b2), (b1,0,b2) ... (b1,b-1,b2)
return todas las llaves generadas para x
"""
def keyGenerator(x, b):
  keys = []
  keys += [((x[0][0], x[0][1], i), x[1]) for i in range(b)]
  keys += [((i, x[0][0], x[0][1]), x[1]) for i in range(b)]
  keys += [((x[0][1], i, x[0][0]), x[1]) for i in range(b)]
  return keys

In [9]:
"""
triangleFinder
————————————————————
data: rdd ya paralelizada con todas las aristas
b: parametro b
label: etiqueta fija (ejemplo es 11)
————————————————————
Funcion que implementa el algoritmo de la pregunta 2
return todas las combinaciones de triangulos dirigidos dentro del grafo
"""
def triangleFinder(data, b, label):
  dataFiltered = data.filter(lambda x: x[1]  == label)
  hashedData = dataFiltered.map(lambda x: ((hashFunction(x[0], b), hashFunction(x[2], b)), x))
  expandedData = hashedData.flatMap(lambda x: keyGenerator(x, b))
  groupedData = expandedData.groupByKey().mapValues(list).collect()

  output_set = set()

  for _, i in groupedData:
      for x_u, _,y_u in i:
          for x_v,_, y_v in i:
              for x_w,_, y_w in i:
                  if (y_u == x_v) and (y_v == x_w) and (y_w == x_u):
                      triplet = (x_u, x_v, x_w)
                      output_set.add(triplet)

  return list(output_set)

# Pregunta 3: Búsqueda de cuadrados dirigidos con etiqueta variable

In [10]:
"""
inputArista
————————————————————
A: arreglo de nodos unicos
L: arraglo de etiquetas unicas
M: matriz de adyacencia de la aristas
————————————————————
Convierte la matriz de adyacencia en una lista de aristas
return lista de aristas
"""
def inputArista(A, L, M):
    return [(A[i], L[j], A[k]) for i in range(len(A)) for j in range(len(L)) for k in range(len(A)) if M[i][j][k] == 1]

"""
keyGenerator
————————————————————
x: datos a generar las llaves
b: parametro b
consulta: lista de aristas
————————————————————
Generacion de cada una de las combinaciones posibles de (b1,b2,0,0) -> (b1,b2,b-1,b-1), etc..
Las diferencias con keyGenerator son:
   - la limitación de las etiquetas
   - la cantidad de combinaciones posibles
   - el tamaño de las llaves
return todas las llaves generadas para x
"""
def keyGenerator4(x, b, consulta):
    keys = []
    if x[1][1] == consulta[0][1]:
        keys += [((x[0][0], x[0][1], i, j), x[1]) for i in range(b) for j in range(b)]
    if x[1][1] == consulta[1][1]:
        keys += [((j, x[0][0], x[0][1], i), x[1]) for i in range(b) for j in range(b)]
    if x[1][1] == consulta[2][1]:
        keys += [((i, j, x[0][0], x[0][1]), x[1]) for i in range(b) for j in range(b)]
    if x[1][1] == consulta[3][1]:
        keys += [((x[0][1], i, j, x[0][0]), x[1]) for i in range(b) for j in range(b)]
    return keys

"""
dataTransform
————————————————————
item: llave y valor
————————————————————
genera un diccionario para cada una de las etiquetas de una llave
return llave y diccionario de etiquetas
"""
def dataTransform(item):
    key, values = item
    result = {}
    for (x, label, y) in values:
        if label not in result:
            result[label] = []
        result[label].append((x, y))
    return key, list(result.items())

In [11]:
"""
squareFinder
————————————————————
data: rdd ya paralelizada con todas las aristas
b: parametro b
A: arreglo de nodos unicos
L: arraglo de etiquetas unicas
M: matriz de adyacencia de la aristas
————————————————————
Funcion que implementa el algoritmo de la pregunta 3
return todas las combinaciones de cuadrados dirigidos dentro del grafo que cumplen la consulta
"""
def squareFinder(data, b, A, L, M):
  query = inputArista(A, L, M)
  hashedData4 = data.map(lambda x: ((hashFunction(x[0], b), hashFunction(x[2], b)), x))
  expandedData4 = hashedData4.flatMap(lambda x: keyGenerator4(x, b, query))
  groupedData4 = expandedData4.groupByKey().mapValues(list)

  transformed_rdd4 = groupedData4.map(dataTransform)

  last_filter4 = transformed_rdd4.filter(lambda x: len(x[1]) == len(L)).collect()

  results = []

  for _, i in last_filter4:
    edges = dict(i)
    for x_u, y_u in edges[query[0][1]]:
      for x_v, y_v in edges[query[1][1]]:
        for x_w, y_w in edges[query[2][1]]:
          for x_t, y_t in edges[query[3][1]]:
            if (y_u == x_v) and (y_v == x_w) and (y_w == x_t) and (y_t == x_u):
              results.append((x_u, x_v, x_w, x_t))

  return results

In [12]:
def aux_find_squares(item, query):
    key, edges_list = item
    edges = dict(edges_list)
    results = []
    for x_u, y_u in edges.get(query[0][1], []):
        for x_v, y_v in edges.get(query[1][1], []):
            for x_w, y_w in edges.get(query[2][1], []):
                for x_t, y_t in edges.get(query[3][1], []):
                    if (y_u == x_v) and (y_v == x_w) and (y_w == x_t) and (y_t == x_u):
                        results.append((x_u, x_v, x_w, x_t))
    return results

def squareFinder3000(data, b, A, L, M):
    query = inputArista(A, L, M)
    broadcast_query = sc.broadcast(query)

    hashedData4 = data.map(lambda x: ((hashFunction(x[0], b), hashFunction(x[2], b)), x))
    expandedData4 = hashedData4.flatMap(lambda x: keyGenerator4(x, b, broadcast_query.value))

    groupedData4 = expandedData4.groupByKey().mapValues(list)
    transformed_rdd4 = groupedData4.map(dataTransform)

    last_filter4 = transformed_rdd4.filter(lambda x: len(x[1]) == len(L))

    results = last_filter4.flatMap(lambda item: aux_find_squares(item, query)).collect()

    return results

# Pruebas

Para realizar las pruebas generamos archivos txt (para la pregunta 2 y 3) en las que se formatean los datos como se especifican en cada sub-sección.

## Pregunta 1

Haz tu esta parte Vicho (y las funciones de la 1, no cacho nada xd)
Idea: correr ```triangleFinder``` con los datos de Neo4J.

In [13]:
spark = SparkSession.builder \
    .getOrCreate()

sc = spark.sparkContext
sc

with driver.session() as session:
    data = get_all_relationships_bd(session, sc)

print(triangleFinder(data, 10, 11))

[(1, 3, 4), (3, 4, 2), (2, 3, 4), (3, 4, 1), (4, 1, 3), (4, 2, 3)]


## Pregunta 2

Los archivos contienen 3 lineas que representan cada uno de los parámetros de la función ```triangleFinder```. \
```data: [(x1,a1,y1),(x2,a2,y2),...,(xn-1,am-1,yn-1),(xn,am,yn)]``` \
```b: int``` \
```label: int```

In [14]:
def readFile2(filename):
    spark = SparkSession.builder \
        .appName("TriangleCounting") \
        .getOrCreate()

    sc = spark.sparkContext
    with open(filename, 'r') as file:
        lines = file.readlines()

    data = eval(lines[0].strip())
    b = int(lines[1].strip())
    label = int(lines[2].strip())

    return sc.parallelize(data), b, label

In [15]:
data, b, label = readFile2('datos_p2.txt')
print(triangleFinder(data, b, label))

[(1, 3, 4), (3, 4, 2), (2, 3, 4), (3, 4, 1), (4, 1, 3), (4, 2, 3)]


## Pregunta 3

Los archivos contienen 5 lineas que representan cada uno de los parámetros de la función ```squareFinder```. \
```data: [(x1,a1,y1),(x2,a2,y2),...,(xn-1,am-1,yn-1),(xn,am,yn)]``` \
```b: int``` \
```A: [a1,a2,...,ap-1,ap]``` \
```L: [l1,l2,...,lq-1,lq]``` \
```M: [[[0,0,...,1,0],...,[0,0,...,0,1]]]```


In [16]:
def readFile3(filename):
    spark = SparkSession.builder \
        .appName("TriangleCounting") \
        .getOrCreate()

    sc = spark.sparkContext
    with open(filename, 'r') as file:
        lines = file.readlines()

    data = eval(lines[0].strip())
    b = int(lines[1].strip())
    A = eval(lines[2].strip())
    L = eval(lines[3].strip())
    M = eval(lines[4].strip())

    return sc.parallelize(data), b, A, L, M

In [17]:
from time import time

In [18]:
start_time = time()
data, b, A, L, M = readFile3('datos_p3.txt')
print("--- %s seconds ---" % (time() - start_time))
print(squareFinder(data, b, A, L, M))

--- 0.02127218246459961 seconds ---
[(1, 5, 10, 17)]


In [19]:
start_time = time()
data, b, A, L, M = readFile3('datos_p3.txt')
print("--- %s seconds ---" % (time() - start_time))
print(squareFinder3000(data, b, A, L, M))

--- 0.02088475227355957 seconds ---
[(1, 5, 10, 17)]


In [20]:
print(data.getNumPartitions())

2


In [21]:
start_time = time()
data, b, A, L, M = readFile3('datos_p3_2.txt')
print("--- %s seconds ---" % (time() - start_time))
print(squareFinder(data, b, A, L, M))

--- 0.021589040756225586 seconds ---
[(2, 3, 4, 5), (1, 3, 4, 5)]


In [22]:
start_time = time()
data, b, A, L, M = readFile3('datos_p3_2.txt')
print("--- %s seconds ---" % (time() - start_time))
print(squareFinder3000(data, b, A, L, M))

--- 0.031281232833862305 seconds ---
[(2, 3, 4, 5), (1, 3, 4, 5)]


In [23]:
print(data.getNumPartitions())

2
