In [1]:
import json 
from typing import List, Dict, Optional, Tuple, Set 
from collections import deque, defaultdict
from pymongo import MongoClient

In [2]:
client = MongoClient("localhost", 27017)
print(client.list_database_names())


['admin', 'config', 'graph_db', 'local', 'prerequisites_graph', 'test_graph_retriever', 'testing-mcts', 'vectorstore']


## Graph DB
Creación de una base de datos de grafos usando Python y MongoDB, esta incluye la estructura de datos, las operaciones básicas y la función para recuperar entidades adyacentes.

La clase `GraphDB` maneja toda la lógica de la base de datos de grafos. Utiliza dos colecciones:
- `nodes`: para almacenar las entidades del grafo
- `edges`: para almacenar las relaciones entre entidades

Dicha clase trae las siguientes funcionalidades:
- `get_adjacent_entities`: recupera las entidades adyacentes a un nodo dado, con opciones para:
  - Filtrar por tipo de relación
  - Especificar dirección (grafo dirigido)
  - Obtener información completa de cada entidad adyacente
- `add_node`: añade un nodo (entidad) al grafo definiendo el identificador único del nodo, el tipo de nodo y las propiedades adicionales del nodo
- `del_node`: elimina un nodo del grafo y elimina automáticamente todas las relaciones donde el nodo participa (`cascade=True`)
- `add_edge`: añade una arista (relación) entre dos nodos, definiendo:
  - Nodo origen
  - Nodo destino
  - Tipo de relación 
  - Propiedades de la relación
  - Una variable booleana que determina si la relación es dirigida o no
- `get_relationship_types`: devuelve todos los tipos de relaciones únicas en el grafo
- `get_node_types`: devuelve todos los tipos de nodos únicos en el grafo 
- `get_graph_stats`: devuelve las estadísticas básicas del grafo, incluyendo:
  - Número de nodos 
  - Número de aristas
  - Todos los tipos de nodos únicos (`get_node_types`)
  - Todos los tipos de relaciones únicas (`get_relationship_types`)
- `_create_indexes`: crea índices en MongoDB para optimizar las consultas de la base de datos de grafos. Para cada colección:
  - `nodes`:
    - `node_id`: permite búsquedas por un identificador y garantiza unicidad
    - `type`: optimiza filtros por tipo de nodo
  - `edges`:
    - `[source, target]`: índice compuesto para consultas con ambos campos
    - `source`: búsquedas rápidas por nodo origen
    - `target`: búsquedas rápidas por nodo destino
    - `relation_type`: filtros por tipo de relación

> Esta clase soporta grafos dirigidos y no dirigidos

In [3]:
class GraphDB:
  
  def __init__(self, db_name:str = "graph_db", host:str = "localhost", port:int = 27017) -> None:
    """Inicializa la conexión a MongoDB y crea las colecciones necesarias

    Args:
        db_name (str, optional): nombre de la base de datos. Defaults to "graph_db".
        host (str, optional): host de mongodb. Defaults to "localhost".
        port (int, optional): puerto de mongodb. Defaults to 27017.
    """
    
    self.client:MongoClient = MongoClient(host,port)
    self.db = self.client[db_name]
    
    # colecciones
    self.nodes = self.db.nodes # entidades/nodos del grafo
    self.edges = self.db.edges # relaciones/aristas del grafo
    
    # crear indices para mejorar el rendimiento
    self._create_indexes()
  
  def _create_indexes(self):
    "Crea índices para optimizar las consultas"
    # índices para nodos
    self.nodes.create_index("node_id", unique=True)
    self.nodes.create_index("type")
    
    # índices para aristas
    self.edges.create_index([("source", 1), ("target", 1)])
    self.edges.create_index("source")
    self.edges.create_index("target")
    self.edges.create_index("relation_type")
  
  def add_node(self, node_id:str, node_type:str, properties:Dict = None) -> bool:
    """Añade un nodo al grafo
  
    Args:
        node_id (str): identificador único del nodo
        node_type (str): tipo de nodo
        properties (Dict, optional): propiedades adicionales del nodo. Defaults to None.

    Returns:
        bool: True si se añadió correctamente, False en caso contrario
    """
    if properties is None: properties = { }
    
    node_doc = {
      "node_id": node_id,
      "type": node_type,
      "properties": properties
    }
    
    try:
      self.nodes.insert_one(node_doc)
      return True
    except Exception as e:
      print(f"Error al insertar nodo: {e}")
      return False
  
  def del_node(self, node_id:str, cascade: bool = True) -> Dict:
    """Elimina un nodo del grafo

    Args:
        node_id (str): ID del nodo a eliminar
        cascade (bool, optional): Si True, elimina también todas las relaciones del nodo. Si False, solo elimina el nodo si no tiene relaciones. Defaults to True.

    Returns:
        Dict: Resultado de la operación de eliminación
    """
    if not self._node_exists(node_id=node_id):
      return {
        "success": False,
        "message": f"El nodo {node_id} no existe",
        "nodes_deleted": 0,
        "edges_deleted": 0,
        "existing_edges": 0,
      }
    
    # contar relaciones existentes
    outgoing_edges = self.edges.count_documents({"source": node_id})
    incoming_edges = self.edges.count_documents({"target": node_id})
    total_edges = outgoing_edges + incoming_edges
    
    # si cascade es False y tiene relaciones, no eliminar
    if not cascade and total_edges > 0:
      return {
        "success": False,
        "message": f"El nodo {node_id} tiene {total_edges} relaciones. Para eliminar el nodo con las conexiones se debe usar cascade=True",
        "nodes_deleted": 0,
        "edges_deleted": 0,
        "existing_edges": total_edges
      }
    
    try:
      # eliminar todas las relaciones donde el nodo es origen o destino
      edges_deleted = 0
      if cascade:
        # eliminar aristas donde es origen (source)
        result_outgoing = self.edges.delete_many({"source":node_id})
        # eliminar aristas donde es destino (target)
        result_incoming = self.edges.delete_many({"target":node_id})
        edges_deleted = result_outgoing.deleted_count + result_incoming.deleted_count
      
      # eliminar el nodo
      result_node = self.nodes.delete_one({"node_id": node_id})
      nodes_deleted = result_node.deleted_count
      
      return {
        "success": True,
        "message": f"Node {node_id} eliminado con exito",
        "nodes_deleted": nodes_deleted,
        "edges_deleted": edges_deleted,
        "existing_edges": 0
      }
    except Exception as e:
      return {
        "success": False,
        "message": f"Error al eliminar nodo: {str(e)}",
        "nodes_deleted": 0,
        "edges_deleted": 0,
        "existing_edges": 0
      }
  
  def add_edge(self, source_id:str, target_id:str, relation_type:str,
              properties: Dict = None, directed: bool = True) -> Dict:
    """Añade una arista (relación) entre dos nodos. Previene la creación de aristas exactamente iguales

    Args:
        source_id (str): ID del nodo origen
        target_id (str): ID del nodo destino
        relation_type (str): Tipo de relación (ej: 'knows', 'works_at', etc)
        properties (Dict, optional): Propiedades adicionales de la relación. Defaults to None.
        directed (bool, optional): Si la relación es dirigida o no. Defaults to True.

    Returns:
        Dict: Resultado de la operación
    """
    if properties is None:
      properties = { }
    
    # verificar que ambos nodos existen
    if not self._node_exists(source_id) or not self._node_exists(target_id):
      return {
        "success": False,
        "message": "Error: Uno o ambos nodos no existen",
        "edge_created": False,
        "duplicate_found": False
      }
    
    # verificar si la arista ya existe
    existing_edge = self._edge_exists(source_id, target_id, relation_type, properties, directed)
    if existing_edge:
      return {
        "success": False,
        "message": f"La arista ya existe: {source_id} -> {target_id} ({relation_type})",
        "edge_created": False,
        "duplicate_found": True,
        "existing_edge_id": str(existing_edge["_id"])
      }
    
    edge_doc = {
      "source": source_id,
      "target": target_id,
      "relation_type": relation_type,
      "properties": properties,
      "directed": directed
    }
    
    try:
      # insertar relación source_id -> target_id
      result = self.edges.insert_one(edge_doc)
      edges_created = 1
      
      # si no es dirigido, crear la relación inversa (target_id -> source_id)
      reverse_created = False 
      if not directed:
        # verificar si la relación inversa ya existe
        reverse_existing = self._edge_exists(target_id, source_id, relation_type, properties, directed)
        if not reverse_existing:
          reverse_edge = {
            "source": target_id,
            "target": source_id,
            "relation_type": relation_type,
            "properties": properties,
            "directed": False
          } 
          self.edges.insert_one(reverse_edge)
          edges_created += 1
          reverse_created = True
      
      return {
        "success": True,
        "message": f"Arista creada exitosamente: {source_id} -> {target_id} ({relation_type})",
        "edge_created": True,
        "duplicate_found": False,
        "edge_id": str(result.inserted_id),
        "edges_created": edges_created,
        "reverse_created": reverse_created
      }  
      
    except Exception as e:
      return {
        "success": False,
        "message": f"Error al insertar arista: {str(e)}",
        "edge_created": False,
        "duplicate_found": False
      }
  
  def _node_exists(self, node_id:str) -> bool:
    "Verifica si un nodo existe"
    return self.nodes.find_one({"node_id":node_id}) is not None
  
  def _edge_exists(self, source_id:str, target_id:str, relation_type:str, properties:Dict, directed:bool) -> Optional[Dict]:
    """Verifica si una arista exactamente igual ya existe

    Args:
        source_id (str): ID del nodo origen
        target_id (str): ID del nodo destino
        relation_type (str): tipo de relación
        properties (Dict): propiedades de la relación
        directed (bool): si la relación es dirigida

    Returns:
        Dict: Información de la arista existente o None si no existe
    """
    query = {
      "source": source_id,
      "target": target_id,
      "relation_type": relation_type,
      "properties": properties,
      "directed": directed
    }
    return self.edges.find_one(query)
  
  def get_adjacent_entities(self, node_id:str, relation_filter:str = None, direction:str = "both") -> List[Dict]:
    """Recupera las entidades adyacentes a un nodo dado

    Args:
        node_id (str): ID del nodo del cual se quiere obtener las entidades adyacentes
        relation_filter (str, optional): Filtro opcional por tipo de relación . Defaults to None.
        direction (str, optional): Dirección de las relacciones ('outgoing', 'incoming', defaults='both')

    Returns:
        List[Dict]: Lista de diccionarios con información de entidades adyacentes
    """
    if not self._node_exists(node_id):
      print(f"Error: El nodo {node_id} no existe")
      return []
    
    query_conditions = []
    
    # Relaciones salientes (este nodo es el origen)
    if direction in ["outgoing", "both"]:
      outgoing_query = {"source":node_id}
      if relation_filter: outgoing_query["relation_type"] = relation_filter
      query_conditions.append(outgoing_query)
      
    # Relaciones entrantes (este nodo es el destino)
    if direction in ["incoming", "both"]:
      incoming_query = {"target":node_id}
      if relation_filter: incoming_query["relation_type"] = relation_filter 
      query_conditions.append(incoming_query)
    
    adjacent_entities:List = []
    if query_conditions:
      if len(query_conditions) == 1:
        edges_cursor = self.edges.find(query_conditions[0])
      else:
        edges_cursor = self.edges.find( {"$or": query_conditions} )
      
      for edge in edges_cursor:
        # determinar cuál es la entidad adyacente
        if edge["source"] == node_id:
          adjacent_id = edge["target"]
          relationship_direction = "outgoing"
        else: 
          adjacent_id = edge["source"]
          relationship_direction = "incoming"
        
        # obtener información de la entidad adyacente
        adjacent_node = self.nodes.find_one( {"node_id": adjacent_id} )
        
        if adjacent_node:
          adjacent_info = {
            "node_id": adjacent_node["node_id"],
            "type": adjacent_node["type"],
            "properties": adjacent_node["properties"],
            "relationship": {
              "type": edge["relation_type"],
              "direction": relationship_direction,
              "properties": edge["properties"],
              "directed": edge["directed"]
            }
          }
          adjacent_entities.append(adjacent_info)
    
    return adjacent_entities  

  def get_node_info(self, node_id:str) -> Optional[Dict]:
    """Obtiene información completa del nodo
    
    Args:
        node_id (str): ID del nodo

    Returns:
        Optional[Dict]: Información del nodo o None si no existe
    """
    node = self.nodes.find_one({"node_id":node_id})
    if node:
      # remover el _id de MongoDB para una salida más limpia
      del node['_id']
      return node 
    return None
  
  def get_relationship_types(self) -> List[str]:
    "Obtiene todos los tipos de relaciones únicas en el grafo"
    return self.edges.distinct("relation_type")
  
  def get_node_types(self) -> List[str]:
    "Obtiene todos los tipos de nodos únicos en el grafo"
    return self.nodes.distinct("type")
  
  def get_graph_stats(self) -> Dict:
    "Obtiene estadísticas básicas del grafo"
    return {
      "total_nodes": self.nodes.count_documents({}),
      "total_edges": self.edges.count_documents({}),
      "node_types": self.get_node_types(),
      "relationship_types": self.get_relationship_types()
    }
  
  def retriever(self) -> ... :
    pass 
  
  def clear_database(self, confirm:bool = False) -> Dict:
    """Elimina toda la información de la base de datos (todos los nodos y aristas) 
    
    Args:
        confirm (bool, optional): Debe ser True para confirmar la eliminación total. Esto previene eliminaciones accidentales. Defaults to False.

    Returns:
        Dict: Resultado de la operación con las estadísticas de la eliminación
    """
    if not confirm:
      return {
        "success": False,
        "message": "Eliminación cancelada. Use confirm=True para confirmar la eliminación total de la base de datos",
        "nodes_deleted": 0,
        "edges_deleted": 0,
        "warning": "Esta operación eliminará TODOS los datos permanentemente"
      }
    
    try: 
      # Obtener estadísticas antes de eliminar
      initial_stats = self.get_graph_stats()
      
      # Eliminar todas las aristas
      edges_result = self.edges.delete_many({})
      edges_deleted = edges_result.deleted_count
      
      # Eliminar todos los nodos
      nodes_result = self.nodes.delete_many({})
      nodes_deleted = nodes_result.deleted_count
      
      # Verificar que las colecciones estén vacías
      remaining_nodes = self.nodes.count_documents({})
      remaining_edges = self.edges.count_documents({})
      
      return {
        "success": True,
        "message": "Base de datos limpiada con exito",
        "nodes_deleted": nodes_deleted,
        "edges_deleted": edges_deleted,
        "initial_stats": initial_stats,
        "remaining_nodes": remaining_nodes,
        "remaining_edges": remaining_edges,
        "database_empty": remaining_nodes == 0 and remaining_edges == 0
      }
    except Exception as e:
      return {
        "success": False,
        "message": f"Error al limpiar la base de datos: {str(e)}",
        "nodes_deleted": 0,
        "edges_deleted": 0
      }
  
  def reset_database(self, confirm:bool = False) -> Dict:
    """Reinicia completamente la base de datos: elimina todo y recrea los índices

    Args:
        confirm (bool): Debe ser True para confirmar el reinicio total. Defaults to False.

    Returns:
        Dict: Resultado de la operación
    """
    if not confirm:
      return {
        "success": False,
        "message": "Reinicio cancelado. Use confirm=True para confirmar el reinicio total de la base de datos",
        "warning": "Esta operación eliminará TODOS los datos y recreará los índices"
      }
    try:
      # Limpiar todos los datos
      clear_result = self.clear_database(confirm=True)
      
      if not clear_result["success"]:
        return clear_result
      
      # Eliminar todos los índices existentes
      self.nodes.drop_indexes()
      self.edges.drop_indexes()
      
      # Recrear los índices
      self._create_indexes()
      
      return {
        "success": True,
        "message": "Base de datos reiniciada exitosamente",
        "nodes_deleted": clear_result["nodes_deleted"],
        "edges_deleted": clear_result["edges_deleted"],
        "indexes_recreated": True,
        "database_reset": True
      }
    except Exception as e:
      return {
        "success": False,
        "message": f"Error al reiniciar la base de datos: {str(e)}",
        "nodes_deleted": 0,
        "edges_deleted": 0,
        "indexes_recreated": False
      }
  
  def close_connection(self) -> None:
    "Cierra la conexión a MongoDB"
    self.client.close()

### Init GraphDB

In [4]:
graph_db = GraphDB('graph_db')

In [11]:
reset_result = graph_db.reset_database(confirm=True)
print(json.dumps(reset_result, indent=2))

{
  "success": true,
  "message": "Base de datos reiniciada exitosamente",
  "nodes_deleted": 9,
  "edges_deleted": 8,
  "indexes_recreated": true,
  "database_reset": true
}


### Add Nodes (Entities) / Add Edges (Relationships)

In [12]:
# Nodos de álgebra
graph_db.add_node("linear_equations", "algebra", {"name": "Linear Equations", "description": "Equations of the first degree"})
graph_db.add_node("quadratic_equations", "algebra", {"name": "Quadratic Equations", "description": "Equations of the second degree"})
graph_db.add_node("polynomials", "algebra", {"name": "Polynomials", "description": "Expressions involving variables and coefficients"})
graph_db.add_node("matrices", "algebra", {"name": "Matrices", "description": "Rectangular arrays of numbers"})
graph_db.add_node("vector_spaces", "algebra", {"name": "Vector Spaces", "description": "Collections of vectors"})

# Nodos de análisis
graph_db.add_node("limits", "analysis", {"name": "Limits", "description": "Behavior of functions as inputs approach a value"})
graph_db.add_node("derivatives", "analysis", {"name": "Derivatives", "description": "Rates of change of functions"})
graph_db.add_node("integrals", "analysis", {"name": "Integrals", "description": "Area under curves"})
graph_db.add_node("sequences", "analysis", {"name": "Sequences", "description": "Ordered lists of numbers"})
graph_db.add_node("series", "analysis", {"name": "Series", "description": "Sum of sequences"})

# Relaciones entre contenidos
graph_db.add_edge("linear_equations", "matrices", "solved_by", {"method": "Gaussian elimination"}, True)
graph_db.add_edge("quadratic_equations", "polynomials", "is_a_case_of", {}, True)
graph_db.add_edge("polynomials", "limits", "analyzed_with", {}, True)
graph_db.add_edge("limits", "derivatives", "foundation_for", {}, True)
graph_db.add_edge("derivatives", "integrals", "inverse_of", {}, True)
graph_db.add_edge("sequences", "series", "forms", {}, True)
graph_db.add_edge("series", "limits", "converges_to", {}, True)
graph_db.add_edge("vector_spaces", "matrices", "represented_by", {}, True)
graph_db.add_edge("matrices", "vector_spaces", "act_on", {}, True)
graph_db.add_edge("polynomials", "vector_spaces", "form", {"type": "polynomial vector space"}, True)

{'success': True,
 'message': 'Arista creada exitosamente: polynomials -> vector_spaces (form)',
 'edge_created': True,
 'duplicate_found': False,
 'edge_id': '6872b2eba12e36523b3ee204',
 'edges_created': 1,
 'reverse_created': False}

In [7]:
adjacent = graph_db.get_adjacent_entities("linear_equations")
print("=== Adjacent Entities ===")
for entity in adjacent:
  print(f"Entity: {entity['node_id']} ({entity['type']})")
  print(f"Relationship: {entity['relationship']['type']} ({entity['relationship']['direction']})")
  print(f"Properties: {entity['properties']}")
  print("-------------------------")

=== Adjacent Entities ===
Entity: matrices (algebra)
Relationship: solved_by (outgoing)
Properties: {'name': 'Matrices', 'description': 'Rectangular arrays of numbers'}
-------------------------


In [8]:
work_relations = graph_db.get_adjacent_entities("matrices", relation_filter="solved_by")
for entity in work_relations:
  print(f"{entity['node_id']}: {entity['relationship']['type']}")

linear_equations: solved_by


In [13]:
print("=== Graph Stats ===")
print(json.dumps(graph_db.get_graph_stats(), indent=2))

=== Graph Stats ===
{
  "total_nodes": 10,
  "total_edges": 10,
  "node_types": [
    "algebra",
    "analysis"
  ],
  "relationship_types": [
    "act_on",
    "analyzed_with",
    "converges_to",
    "form",
    "forms",
    "foundation_for",
    "inverse_of",
    "is_a_case_of",
    "represented_by",
    "solved_by"
  ]
}


In [10]:
delete_result = graph_db.del_node("series")
print(json.dumps(delete_result, indent=2))

{
  "success": true,
  "message": "Node series eliminado con exito",
  "nodes_deleted": 1,
  "edges_deleted": 2,
  "existing_edges": 0
}


In [14]:
# always remember: close connection
graph_db.close_connection()

### Entity Relationship Retriever

#### Auxiliar Functions

In [3]:
def _find_paths_between_entities(self:GraphDB,
      source:str,
      target:str,
      max_depth:int,
      max_paths:int ) -> Dict:
  """Encuentra todos los caminos posibles entre dos entidades usando BFS con múltiples caminos 

  Args:
      source (str): entidad origen
      target (str): entidad destino
      max_depth (int): profundidad máxima de búsqueda
      max_paths (int): número máximo de caminos a retornar

  Returns:
      Dict: información sobre los caminos encontrados
  """
  # validar que ambas entidades existen
  if not self._node_exists(target):
    return {
      "success": False,
      "message": f"La entidad '{target}' no existe en el grafo",
      "query_type": "path_finding"
    }
  
  # verificar si ya existe una relación directa 
  direct_relations = self.get_adjacent_entities(source)
  direct_target = next( (rel for rel in direct_relations if rel["node_id"] == target), None )
  
  if direct_target:
    return {
      "success": True,
      "query_type": "direct_relation",
      "source": source,
      "target": target,
      "direct_relation": direct_target,
      "message": f"Existe una relación directa entre `{source}` y `target`"
    }
  
  # BFS para encontrar múltiples caminos
  paths_found = []
  queue = deque([(source, [source], 0)])  # nodo_actual, camino, profundidad
  visited_paths = set()                   # para evitar caminos duplicados
  
  while queue and len(paths_found) < max_paths:
    current_node, path, depth = queue.popleft()
    if depth >= max_depth: continue
    
    # obtener entidades adyacentes
    adjacent = self.get_adjacent_entities(current_node)
    
    for adj_entity in adjacent:
      next_node = adj_entity["node_id"]
      
      # si encontramos el objetivo
      if next_node == target:
        complete_path = path + [next_node]
        path_key = "->".join(complete_path)
        
        if path_key not in visited_paths:
          visited_paths.add(path_key)

          # construir información detallada del camino
          path_details = self._build_path_details(complete_path)
          
          paths_found.append({
            "path": complete_path,
            "length": len(complete_path) - 1,
            "details": path_details
          })
      
      # si no se ha visitado este nodo en este camino, continuar explorando
      elif next_node not in path:
        new_path = path + [next_node]
        queue.append((next_node,new_path,depth + 1))
    
  # ordenar caminos por longitud 
  paths_found.sort(key=lambda x:x["length"])
  
  return {
    "success": True,
    "query_type": "path_finding",
    "source": source,
    "target": target,
    "paths_found": len(paths_found),
    "paths": paths_found[:max_paths],
    "message": f"Se encontraron {len(paths_found)} caminos entre `{source}` y `{target}`"
  }

def _calculate_relation_probabilities(self:GraphDB,
      entity:str,
      relation_type:str,
      max_depth:int ) -> Dict:
  """Calcula la probabilidad de que una entidad tenga una relación específica con otras entidades basándose en caminos indirectos y patrones en el grafo

  Args:
      entity (str): entidad base
      relation_type (str): tipo de relación a analizar
      max_depth (int): profundidad máxima para análisis

  Returns:
      Dict: Probabilidades calculadas para cada entidad
  """
  
  # verificar si el tipo de relación existe en el grafo
  existing_relations = self.get_relationship_types()
  if relation_type not in existing_relations:
    return {
      "success": False,
      "message": f"El tipo de relación `{relation_type}` no existe en el grafo",
      "available_relations": existing_relations,
      "query_type": "relation_probability"
    }
  
  # obtener todas las relaciones directas de la entidad 
  direct_relations = self.get_adjacent_entities(entity)
  direct_relation_targets = {rel["node_id"] for rel in direct_relations if rel["relationship"]["type"] == relation_type}
  
  # obtener todos los nodos del grafo
  all_nodes = list(self.nodes.find({}, {"node_id": 1, "type": 1}))
  
  # calcular probabilidades para cada nodo
  probabilities = []
  
  for node_doc in all_nodes:
    target_node = node_doc["node_id"]
    if target_node == entity: continue
    
    # si ya existe una relación directa, entonces probabilidad = 1.0
    if target_node in direct_relation_targets:
      probabilities.append({
        "node_id": target_node,
        "node_type": node_doc["type"],
        "probability": 1.0,
        "message": "Relación directa encontrada",
        "evidence": []
      })
      continue
    
    # calcular probabilidad basada en algunos factores 
    probability, evidence = self._calculate_indirect_probability(entity, target_node, relation_type, max_depth)
    if probability > 0:
      probabilities.append({
        "node_id": target_node,
        "node_type": node_doc["type"],
        "probability": probability,
        "message": "Relación indirecta encontrada basada en patrones indirectos",
        "evidence": evidence
      })
  
  # ordenar por probabilidad (mayor primero)
  probabilities.sort(key=lambda x:x["probability"], reverse=True)
  
  return {
    "success": True,
    "query_type": "relation_probability",
    "entity": entity,
    "relation_type": relation_type,
    "total_candidates": len(probabilities),
    "direct_relations": len(direct_relation_targets),
    "probabilities": probabilities,  
    "message": f"Calculadas probabilidades de relación '{relation_type}' para '{entity}'"
  }

def _calculate_indirect_probability(self:GraphDB,
      source:str,
      target:str,
      relation_type:str,
      max_depth:int ) -> Tuple[float, List[Dict]]:
  """Calcula la probabilidad indirecta de relación basada en:
  1. Entidades comunes conectadas con el mismo tipo de relación
  2. Caminos cortos entre las entidades

  Args:
      source (str): entidad origen
      target (str): entidad objetivo
      relation_type (str): tipo de relación
      max_depth (int): profundidad máxima

  Returns:
      Tuple[float, List[Dict]]: probabilidad y evidencia
  """
  evidence = []
  probability_factors = []
  
  # factor 1: entidades comunes conectadas con el mismo tipo de relación
  source_relations = self.get_adjacent_entities(source, relation_filter=relation_type)
  target_relations = self.get_adjacent_entities(target, relation_filter=relation_type)
  
  source_connected = {rel["node_id"] for rel in source_relations}
  target_connected = {rel["node_id"] for rel in target_relations}
  
  common_entities = source_connected.intersection(target_connected)
  
  if common_entities:
    # m: entidades conectadas a source
    m = len(source_connected)
    # n: entidades conectadas a target
    n = len(target_connected)
    # p: entidades comunes entre source y target
    p = len(common_entities)
    
    # máximo de conexiones comunes posibles 
    common_max = min(m,n)
    
    common_factor = 0.0 # evitar división por cero
    if common_max != 0: common_factor = min(p/common_max, 1.0) # P(A) = conexiones-reales / máximo-posible
    
    probability_factors.append(common_factor)
    evidence.append({
      "type": "common_entities",
      "count": len(common_entities),
      "entities": list(common_entities),
      "contribution": common_factor
    })
  
  # factor 2: caminos cortos entre entidades => P(B) = 1/d(i,j)
  paths_result = self._find_paths_between_entities(source,target,min(max_depth, 3), 5)
  if paths_result["success"] and "paths" in paths_result:
    shortest_path_length = min(path["length"] for path in paths_result["paths"]) if paths_result["paths"] else float('inf')
    if shortest_path_length != float('inf'): 
      path_factor = 1/shortest_path_length
      
      probability_factors.append(path_factor)
      evidence.append({
        "type": "shortest_path", 
        "shortest_length": shortest_path_length,
        "total_paths": len(paths_result["paths"]),
        "contribution": path_factor
      })
  
  # unión de probabilidades independientes: P(A u B) = P(A) + P(B) - P(A)*P(B)
  if probability_factors:
    combined_probability = probability_factors[0]
    for factor in probability_factors[1:]:
      combined_probability = combined_probability + factor - (combined_probability * factor)
    return combined_probability, evidence
  
  return 0.0, evidence

def _build_path_details(self:GraphDB, path:List[str]) -> List[Dict]:
  """Construye información detallada para un camino específico

  Args:
      path (List[str]): lista de nodos en el camino

  Returns:
      List[Dict]: detalles de cada paso en el camino
  """
  details = []
  for i in range(len(path) - 1):
    current_node = path[i]
    next_node = path[i+1]

    # encontrar la relación entre estos dos nodos
    current_adjacent = self.get_adjacent_entities(current_node)
    relation_info = next((rel for rel in current_adjacent if rel["node_id"] == next_node), None)

    if relation_info:
      details.append({
        "from": current_node,
        "to": next_node,
        "relation_type": relation_info["relationship"]["type"],
        "direction": relation_info["relationship"]["direction"],
        "properties": relation_info["relationship"]["properties"]
      })
  
  return details

In [4]:
GraphDB._find_paths_between_entities = _find_paths_between_entities
GraphDB._calculate_relation_probabilities = _calculate_relation_probabilities
GraphDB._calculate_indirect_probability = _calculate_indirect_probability
GraphDB._build_path_details = _build_path_details

#### Retriever Function

In [5]:
def retriever(self:GraphDB, 
      query_entity_1:str, 
      query_entity_2:str = None,
      relation_type:str = None,
      max_depth:int = 4, 
      max_paths:int = 10 ) -> Dict:
  """Recuperador avanzado para el grafo que maneja dos casos principales:
  1. Encontrar caminos entre dos entidades no adyacentes
  2. Calcular probabilidades de la relación entre una entidad y todas las demás para un tipo de relación específico

  Args:
      query_entity_1 (str): primera entidad (siempre requerida)
      query_entity_2 (str, optional): segunda entidad para encontrar caminos. Defaults to None.
      relation_type (str, optional): tipo de relación para calcular probabilidades. Defaults to None.
      max_depth (int, optional): profundidad máxima de búsqueda. Defaults to 4.
      max_paths (int, optional): número máximo de caminos a retornar. Defaults to 10.

  Returns:
      Dict: Resultado con caminos encontrados o probabilidades calculadas
  """
  # validar que la primera entidad existe
  if not self._node_exists(query_entity_1):
    return {
      "success": False,
      "message": f"La entidad '{query_entity_1}' no existe en el grafo",
      "query_type": "error"
    }
  
  # caso 1: encontrar caminos entre dos entidades específicas
  if query_entity_2 is not None:
    return self._find_paths_between_entities(query_entity_1,query_entity_2,max_depth,max_paths)
  
  # caso 2: calcular probabilidades de relación para un tipo específico 
  elif relation_type is not None:
    return self._calculate_relation_probabilities(query_entity_1, relation_type, max_depth)
  
  # error: parámetros insuficientes
  else:
    return {
      "success": False,
      "message": "Debe proporcionar `query_entity_2` (para encontrar caminos) o `relation_type` (para calcular probabilidades)",
      "query_type": "error"
    }

In [6]:
GraphDB.retriever = retriever

#### Subgraph Extraction Function

La siguiente función permite: 
- Buscar todas las aristas de un tipo de relación especificado
- Identificar los nodos involucrados en esas relaciones 
- Extrae la información de esos nodos 

In [None]:
def extract_subgraph_by_relation(self:GraphDB, relation_type:str) -> Dict:
  """Extrae un subgrafo formado por todas las relaciones de un tipo específico 
  y los nodos que participan en dichas relaciones. 

  Args:
    relation_type (str): tipo de relación para filtrar el subgrafo

  Returns:
    Dict: Diccionario con la información del subgrafo que contiene:
    - `success`: bool indicando si la operación fue exitosa
    - `message`: str con mensaje descriptivo
    - `nodes`: List[Dict] con información de los nodos del subgrafo
    - `edges`: List[Dict] con información de las aristas del subgrafo
    - `stats`: Dict con estadísticas del subgrafo
  """
  try:
    # buscar todas las aristas del tipo de relación especificado
    edges_cursor = self.edges.find({"relation_type": relation_type})
    subgraph_edges = []
    involved_node_ids = set()
    
    # procesar cada arista encontrada
    for edge in edges_cursor:
      involved_node_ids.add(edge['source'])
      involved_node_ids.add(edge['target'])
      
      edge_info = {
        "source": edge["source"],
        "target": edge['target'],
        "relation_type": edge["relation_type"],
        "properties": edge["properties"],
        "directed": edge["directed"]
      }
      subgraph_edges.append(edge_info)
    
    # si no se encontraron aristas del tipo específico 
    if not subgraph_edges:
      return {
        "success": True,
        "message": f"No se encontraron relaciones del tipo: {relation_type}",
        "nodes": [],
        "edges": [],
        "stats": {
          "total_nodes": 0,
          "total_edges": 0,
          "relation_type": relation_type
        }
      }
    
    # obtener información de todos los nodos involucrados
    subgraph_nodes = []
    nodes_cursor = self.nodes.find({"node_id": {"$in": list(involved_node_ids)}})
    
    for node in nodes_cursor: 
      node_info = {
        "node_id": node["node_id"],
        "type": node["type"],
        "properties": node["properties"]
      }
      subgraph_nodes.append(node_info)
    
    # generar estadísticas del subgrafo
    node_types = list(set(node["type"] for node in subgraph_nodes))
    
    stats = {
      "total_nodes": len(subgraph_nodes),
      "total_edges": len(subgraph_edges),
      "relation_type": relation_type,
      "node_types_in_subgraph": node_types,
      "unique_nodes": len(involved_node_ids),
      "directed_edges": sum(1 for edge in subgraph_edges if edge["directed"]),
      "undirected_edges": sum(1 for edge in subgraph_edges if not edge["directed"])
    }
    
    return {
      "success": True,
      "message": f"Subgrafo extraído exitosamente para relación: {relation_type}",
      "nodes": subgraph_nodes,
      "edges": subgraph_edges,
      "stats": stats 
    }
  except Exception as e:
    return {
      "success": False,
      "message": f"Error al extraer subgrafo: {str(e)}",
      "nodes": [],
      "edges": [],
      "stats": {
        "total_nodes": 0,
        "total_edges": 0,
        "relation_type": relation_type
      }
    }

GraphDB.extract_subgraph_by_relation = extract_subgraph_by_relation

In [8]:
def is_dag(self:GraphDB) -> Dict:
  """Verifica si el grafo es un DAG (Directed Acyclic Graph)

  Un grafo es DAG si: 
  1. Todas las aristas son dirigidas
  2. No contiene ciclos

  Utiliza el algoritmo de Kahn (orden topológico) para detectar ciclos

  Returns:
    Dict: resultado de la verificación:
      - `is_dag`: bool indicando si es DAG
      - `message`: str con mensaje descriptivo
      - `has_undirected_edges`: bool si tiene aristas no dirigidas
      - `has_cycles`: bool si tiene ciclos
      - `cycle_info`: Dict con información del ciclo encontrado (si existe)
      - `stats`: Dict con estadísticas del análisis
  """
  try:
    # obtener todas las aristas
    edges_cursor = self.edges.find({})
    edges_list = list(edges_cursor)
    
    # verificar si hay aristas no dirigdas
    undirected_edges = [edge for edge in edges_list if not edge.get("directed", True)]
    has_undirected = len(undirected_edges) > 0
    
    if has_undirected:
      return {
        "is_dag": False,
        "message": f"El grafo NO es DAG: contiene {len(undirected_edges)} aristas no dirigidas",
        "has_undirected_edges": True,
        "has_cycles": None,   # no se puede determinar sin verificar ciclos
        "cycle_info": None,
        "stats": {
          "total_edges": len(edges_list),
          "directed_edges": len(edges_list) - len(undirected_edges),
          "undirected_edges": len(undirected_edges),
          "total_nodes": self.nodes.count_documents({})
        }
      }
      
    # solo considerar aristas dirigidas para el análisis de ciclos
    directed_edges = [edge for edge in edges_list if edge.get("directed", True)]
    cycle_result = detect_cycles_kahn(self, directed_edges)
    
    if cycle_result["has_cycles"]:
      return {
        "is_dag": False, 
        "message": f"El grafo NO es DAG: se detectaron ciclos",
        "has_undirected_edges": False,
        "has_cycles": True,
        "cycle_info": cycle_result["cycle_info"],
        "stats": {
          "total_edges": len(edges_list),
          "directed_edges": len(directed_edges),
          "undirected_edges": len(undirected_edges),
          "total_nodes": self.nodes.count_documents({}),
          "nodes_in_cycles": len(cycle_result["cycle_info"]["remaining_nodes"])
        }
      }
    return {
      "is_dag": True,
      "message": "El grafo es DAG: todas las aristas son dirigidas y no hay ciclos",
      "has_undirected_edges": False,
      "has_cycles": False,
      "cycle_info": None,
      "stats": {
        "total_edges": len(edges_list),
        "directed_edges": len(directed_edges),
        "undirected_edges": len(undirected_edges),
        "total_nodes": self.nodes.count_documents({}),
        "topological_order": cycle_result["topological_order"]
      }
    }
  except Exception as e:
    return {
      "is_dag": False,
      "message": f"Error al verificar DAG: {str(e)}",
      "has_undirected_edges": None,
      "has_cycles": None,
      "stats": {}
    }



def detect_cycles_kahn(self:GraphDB, directed_edges:List) -> Dict:
  """Detecta ciclos usando el algoritmo de Kahn (orden topológico)
  
  Args:
    directed_edges (List): lista de aristas dirigidas

  Returns:
    Dict: resultado de análisis de ciclos
  """
  try: 
    # Construir grafo de adyacencia y contar grados de entrada
    graph = {}
    in_degree = {}
    all_nodes = set()
    
    # Inicializar estructuras
    for edge in directed_edges:
      source = edge["source"]
      target = edge["target"]
      all_nodes.add(source)
      all_nodes.add(target)
      
      if source not in graph:
        graph[source] = []
      if target not in graph:
        graph[target] = []
      if source not in in_degree:
        in_degree[source] = 0
      if target not in in_degree:
        in_degree[target] = 0
    
    # Construir lista de adyacencia y contar grados de entrada
    for edge in directed_edges:
            source = edge["source"]
            target = edge["target"]
            graph[source].append(target)
            in_degree[target] += 1
    
    # Encontrar nodos con grado de entrada 0
    queue = [node for node in all_nodes if in_degree[node] == 0]
    topological_order = []
    
    while queue:
      current = queue.pop(0)
      topological_order.append(current)
      
      # Reducir grado de entrada de nodos adyacentes
      for neighbor in graph[current]:
        in_degree[neighbor] -= 1
        if in_degree[neighbor] == 0:
          queue.append(neighbor)
    
    # Si no todos los nodos están en el orden topológico, hay ciclos
    if len(topological_order) != len(all_nodes):
      remaining_nodes = [node for node in all_nodes if node not in topological_order]
      
      # Encontrar un ciclo específico
      cycle_path = self._find_cycle_path(graph, remaining_nodes)
      
      return {
        "has_cycles": True,
        "cycle_info": {
          "remaining_nodes": remaining_nodes,
          "nodes_in_cycles": len(remaining_nodes),
          "example_cycle": cycle_path
        },
        "topological_order": topological_order
      }
    else:
      return {
        "has_cycles": False,
        "cycle_info": None,
        "topological_order": topological_order
      }
  except Exception as e:
    return {
      "has_cycles": None,
      "cycle_info": {"error": str(e)},
      "topological_order": []
    }

GraphDB.detect_cycles_kahn = detect_cycles_kahn
GraphDB.is_dag = is_dag

In [20]:
def add_prerequisites_from_map(graph:GraphDB, prerequisite_map:Dict[str,List[str]]) -> None:
  """Inserta aristas de tipo 'prerequisito' basadas en eun diccionario donde:
  - key: subtema prerrequisito
  - value: lista de subtemas que lo requieren

  Args:
    graph (GraphDB): instancia de GraphDB
    prerequisite_map (Dict): diccionario con subtemas y listas de dependencias  
  """
  total = 0
  for prerequisite, topics in prerequisite_map.items():
    for topic in topics:
      result = graph.add_edge(
        source_id=prerequisite,
        target_id=topic,
        relation_type="prerequisito",
        properties={"explicación": f"{prerequisite} es prerequisito de {topic}"},
        directed=True
      )
      total += 1 if result["success"] else 0 
      status = "OK" if result["success"] else "ERROR"
      print(f"{status}: {prerequisite} -> {topic} : {result['message']}")

# instanciar y resetar para prueba
graph = GraphDB("prerequisites_graph")
graph.clear_database(True)

# definición de los tipos de tópicos y su clasificación
type_map = {
  # Fundamentos matemáticos
  "discrete_mathematics": [
    "discrete_math", "computer_logic", "graph_theory", "combinatorics",
    "set_theory", "boolean_algebra", "number_theory", "formal_methods"
  ],
  
  "continuous_mathematics": [
    "calculus", "linear_algebra", "statistics", "numerical_methods",
    "optimization", "probability_theory", "differential_equations"
  ],
  
  # Fundamentos de programación
  "programming_fundamentals": [
    "programming_basics", "data_structures", "algorithms", "object_oriented_programming",
    "functional_programming", "programming_paradigms", "code_design"
  ],
  
  # Inteligencia artificial
  "artificial_intelligence": [
    "artificial_intelligence", "expert_systems", "automated_reasoning",
    "knowledge_representation", "search_algorithms", "planning"
  ],
  
  "machine_learning": [
    "machine_learning", "deep_learning", "neural_networks",
    "reinforcement_learning", "supervised_learning", "unsupervised_learning"
  ],
}

for key, topics in type_map.items():
  for topic in topics:
    graph.add_node(topic, key) 

# definición de los prerequisitos
prerequisite_map = {
  # Matemáticas básicas - Base fundamental
  "basic_math": ["discrete_math", "linear_algebra", "calculus", "statistics"],
  
  # Programación fundamental
  "programming_basics": ["data_structures", "algorithms", "object_oriented_programming"],
  
  # Matemáticas avanzadas
  "discrete_math": ["algorithms", "computer_logic", "cryptography", "graph_theory"],
  "linear_algebra": ["machine_learning", "computer_graphics", "data_science"],
  "calculus": ["numerical_methods", "optimization", "machine_learning"],
  "statistics": ["data_science", "machine_learning", "artificial_intelligence"],
  
  # Estructuras de datos y algoritmos
  "data_structures": ["advanced_algorithms", "database_design", "compilers"],
  "algorithms": ["advanced_algorithms", "complexity_theory", "parallel_computing"],
  
  # Paradigmas de programación
  "object_oriented_programming": ["software_engineering", "design_patterns", "web_development"],
  
  # Lógica y matemáticas computacionales
  "computer_logic": ["formal_methods", "automated_reasoning", "compiler_design"],
  "graph_theory": ["network_algorithms", "distributed_systems", "social_networks"],
  
  # Inteligencia artificial y aprendizaje automático
  "artificial_intelligence": ["expert_systems", "natural_language_processing", "computer_vision"],
  "machine_learning": ["deep_learning", "reinforcement_learning", "neural_networks"],
  "deep_learning": ["computer_vision", "natural_language_processing", "generative_ai"],
}

tmp = []
for k,v in prerequisite_map.items():
  tmp.append(k)
  tmp.extend(v)

for i in tmp:
  graph.add_node(i, node_type="computer science")

def validate_topological_order(prereq_map:Dict):
  "Valida que el diccionario de prerequisitos no tenga ciclos (orden topológico válido)"
  visited = set()
  rec_stack = set()
  
  def has_cycle(node):
    if node in rec_stack:
      return True
    if node in visited:
      return False
    
    visited.add(node)
    rec_stack.add(node)
    
    # Verificar todos los cursos que dependen de este prerequisito
    for dependent in prereq_map.get(node, []):
      if has_cycle(dependent):
        return True
    
    rec_stack.remove(node)
    return False
  
  # Verificar todos los nodos
  for node in prereq_map:
    if node not in visited:
      if has_cycle(node):
        return False
  
  return True



assert validate_topological_order(prerequisite_map)

add_prerequisites_from_map(graph=graph, prerequisite_map=prerequisite_map)


Error al insertar nodo: E11000 duplicate key error collection: prerequisites_graph.nodes index: node_id_1 dup key: { node_id: "discrete_math" }, full error: {'index': 0, 'code': 11000, 'errmsg': 'E11000 duplicate key error collection: prerequisites_graph.nodes index: node_id_1 dup key: { node_id: "discrete_math" }', 'keyPattern': {'node_id': 1}, 'keyValue': {'node_id': 'discrete_math'}}
Error al insertar nodo: E11000 duplicate key error collection: prerequisites_graph.nodes index: node_id_1 dup key: { node_id: "linear_algebra" }, full error: {'index': 0, 'code': 11000, 'errmsg': 'E11000 duplicate key error collection: prerequisites_graph.nodes index: node_id_1 dup key: { node_id: "linear_algebra" }', 'keyPattern': {'node_id': 1}, 'keyValue': {'node_id': 'linear_algebra'}}
Error al insertar nodo: E11000 duplicate key error collection: prerequisites_graph.nodes index: node_id_1 dup key: { node_id: "calculus" }, full error: {'index': 0, 'code': 11000, 'errmsg': 'E11000 duplicate key error