In [3]:
import asyncio
from neo4j import AsyncGraphDatabase

NEO4J_USERNAME="neo4j"
NEO4J_PASSWORD="password"
NEO4J_URI="bolt://localhost:7687"

driver = AsyncGraphDatabase.driver(
    NEO4J_URI, 
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD)
)

def get_session():
    return driver.session()

async def close_driver():
    """Closes the global Neo4j driver."""
    await driver.close()

In [4]:
from typing import List, Dict, Any, Literal


In [5]:
import math
from llama_index.embeddings.fastembed import FastEmbedEmbedding

embed_model = FastEmbedEmbedding(model_name="BAAI/bge-small-en-v1.5")

def get_embedding(text: str) -> list[float]:
    if not text or not text.strip():
        return []

    embedding = embed_model.get_text_embedding(text)

    if not isinstance(embedding, list):
        try:
            embedding = embedding.tolist()
        except Exception as e:
            return []

    if not all(isinstance(x, (float, int)) and math.isfinite(x) for x in embedding):
        return []

    return embedding


  from .autonotebook import tqdm as notebook_tqdm
Fetching 5 files: 100%|██████████| 5/5 [01:05<00:00, 13.20s/it]


In [42]:
from collections import defaultdict
from typing import Any, List, Dict

def build_nested_tree(records: List[dict]) -> dict:
    """Build nested dict from path lists."""
    tree = lambda: defaultdict(tree)
    root = tree()
    for record in records:
        current = root
        for part in record["path_names"]:
            current = current[part]
        current["_meta"] = {
            "label": record["label"],
            "description": record.get("description"),
            "content": record.get("content")
        }
    return root

def format_nested_tree(tree: Dict[str, Any], prefix: str = "") -> str:
    """Convert nested tree to formatted text."""
    lines = []
    entries = sorted(tree.items())
    entries_meta = [(k, v) for k, v in entries if k != "_meta"]
    total_entries = len(entries_meta)

    for idx, (name, subtree) in enumerate(entries_meta):
        connector = "└── " if idx == total_entries - 1 else "├── "
        lines.append(f"{prefix}{connector}{name}")

        # Add description and code if present
        meta = subtree.get("_meta")
        if meta:
            desc = meta.get("description")
            content = meta.get("content")
            if desc:
                lines.append(f"{prefix}{'    ' if idx == total_entries -1 else '│   '}📌 {desc}")
            if content:
                content_lines = content.strip().split("\n")
                formatted_content = "\n".join(f"{prefix}{'    ' if idx == total_entries -1 else '│   '}    {line}" for line in content_lines)
                lines.append(f"{prefix}{'    ' if idx == total_entries -1 else '│   '}💻 Code:\n{prefix}{'    ' if idx == total_entries -1 else '│   '}    ```\n{formatted_content}\n{prefix}{'    ' if idx == total_entries -1 else '│   '}    ```")

        # Recursive call for children
        extension = "    " if idx == total_entries - 1 else "│   "
        lines.append(format_nested_tree(subtree, prefix + extension))

    return "\n".join(filter(None, lines))


def format_search_results(records: List) -> str:
    """Formats list of nodes for LLM input."""
    parts: List[str] = []
    for r in records:
        name = r["name"]
        desc = f": {r['description']}" if r.get("description") else ""
        content = r.get("content", "").rstrip()
        parts.append(
            f"\n\n**Name:** {name}\n"
            f"**Description**{desc}\n\n"
            f"**Code:**\n```\n{content}\n```\n"
        )
    return "\n".join(parts)

async def traverse_node(folder_name: str) -> str:
    """Recursively gather all contents under a folder."""
    cypher = """
    MATCH path = (f:Folder {name: $folder_name})-[:CONTAINS|HAS*]->(node)
    WHERE NOT node:Folder
    RETURN [n IN nodes(path) | n.name] AS path_names,
        labels(node)[0] AS label,
        node.description AS description,
        node.content AS content
    ORDER BY path_names

    """

    async with get_session() as session:
        result = await session.run(cypher, {"folder_name": folder_name})
        records = [r async for r in result]

    if not records:
        return "No matching node found."

    nested_tree = build_nested_tree(records)
    formatted_tree = f"{folder_name}\n" + format_nested_tree(nested_tree)
    return formatted_tree


async def search_graph(node_label: Literal["File", "Folder", "Class", "Method"], node_name: str ) -> str :
    """Usefull to search for spacific node in Graph databse"""
    top_k: int = 5
    name_embedding = get_embedding(node_name)  
    

    cypher = f"""
    CALL db.index.vector.queryNodes('{node_label.lower()}_embedding_name_index', $top_k, $embedding)
    YIELD node, score
    {f"WHERE '{node_label}' IN labels(node)" if node_label else ""}
    RETURN node.name AS name,node.description AS description,node.content AS content, score
    ORDER BY score DESC
    """

    async with get_session() as session:  # Assumes get_session can be used as context manager
        result = await session.run(cypher, {
            "embedding": name_embedding,
            "top_k": top_k
        })
        records = [r async for r in result]

    if node_label == "Folder" and records:
            print(records)
            print("#"*10)
            matched_folder_name = records[0]["name"]
            return await traverse_node(matched_folder_name)
    elif records:
        print(records)
        print("#"*10)
        return format_search_results(records)
    else:
        return "No matching node found."


In [44]:
await search_graph("Folder", "api")



[<Record name='api' description=None content=None score=0.9996776580810547>, <Record name='docs' description=None content=None score=0.8556280136108398>, <Record name='core' description=None content=None score=0.8210372924804688>, <Record name='service' description=None content=None score=0.8147873878479004>, <Record name='main_dir' description=None content=None score=0.8103017807006836>]
##########


'api\n└── api\n    ├── Dockerfile\n    │   💻 Code:\n    │       ```\n    │       # Use an official Python runtime as a base image\n    │       FROM python:3.12-slim\n    │       \n    │       # Install curl and necessary packages, including libgit2-dev for pygit2\n    │       RUN apt-get update && \\\n    │           apt-get install -y curl build-essential libpq-dev libgit2-dev\n    │       \n    │       ## install minio client (mc)\n    │       RUN curl https://dl.min.io/client/mc/release/linux-amd64/mc \\\n    │           --create-dirs \\\n    │           -o /usr/local/bin/mc && \\\n    │           chmod +x /usr/local/bin/mc\n    │       \n    │       # Install Poetry using the official installer and add it to PATH\n    │       RUN curl -sSL https://install.python-poetry.org | python3 -\n    │           ENV PATH="/root/.local/bin:${PATH}"\n    │       \n    │       ## Create and set the working directory\n    │       WORKDIR /app\n    │       \n    │       # Copy Poetry configuration

In [43]:
await search_graph("File", "main.py")

[<Record name='main.py' description='This FastAPI application extracts delivery information from images using OCR. It features CORS enabled for all origins, redirection from the root URL to the Swagger UI for API documentation, and includes a router for OCR delivery extraction under the `/extract` endpoint. The application utilizes a logger for debugging and monitoring.\n' content='import logging\n\nfrom fastapi import FastAPI\nfrom fastapi.middleware.cors import CORSMiddleware\nfrom starlette.responses import RedirectResponse\nfrom src.service.ocr_delivery import router as ocr_extractor_router\nfrom src.core.logger_config import setup_logging\n# from src.service.url_extractor import router as ocr_router\n# ----------------------------------------\n# Logging setup\n# ----------------------------------------\nsetup_logging()\nlogger = logging.getLogger(__name__)\n\n# ----------------------------------------\n# App metadata\n# ----------------------------------------\napp = FastAPI(\n   

'\n\n**Name:** main.py\n**Description**: This FastAPI application extracts delivery information from images using OCR. It features CORS enabled for all origins, redirection from the root URL to the Swagger UI for API documentation, and includes a router for OCR delivery extraction under the `/extract` endpoint. The application utilizes a logger for debugging and monitoring.\n\n\n**Code:**\n```\nimport logging\n\nfrom fastapi import FastAPI\nfrom fastapi.middleware.cors import CORSMiddleware\nfrom starlette.responses import RedirectResponse\nfrom src.service.ocr_delivery import router as ocr_extractor_router\nfrom src.core.logger_config import setup_logging\n# from src.service.url_extractor import router as ocr_router\n# ----------------------------------------\n# Logging setup\n# ----------------------------------------\nsetup_logging()\nlogger = logging.getLogger(__name__)\n\n# ----------------------------------------\n# App metadata\n# ----------------------------------------\napp = 