Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
venv/
.pytest_cache/
.ruff_cache/
.vscode/
.coverage
__pycache__/
File renamed without changes.
136 changes: 136 additions & 0 deletions prometheus/graph/file_graph_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from collections import deque
from pathlib import Path
from typing import Sequence, Tuple

from langchain_core.documents import Document
from langchain_text_splitters import MarkdownHeaderTextSplitter

from prometheus.graph.graph_types import (
ASTNode,
KnowledgeGraphEdge,
KnowledgeGraphEdgeType,
KnowledgeGraphNode,
TextNode,
)
from prometheus.parser import tree_sitter_parser


class FileGraphBuilder:
def __init__(self, max_ast_depth: int):
self.max_ast_depth = max_ast_depth

def supports_file(self, file: Path) -> bool:
if tree_sitter_parser.supports_file(file):
return True

if file.suffix == ".md":
return True

return False

def build_file_graph(
self, parent_node: KnowledgeGraphNode, file: Path, next_node_id: int
) -> Tuple[int, Sequence[KnowledgeGraphNode], Sequence[KnowledgeGraphEdge]]:
if tree_sitter_parser.supports_file(file):
return self._tree_sitter_file_graph(parent_node, file, next_node_id)

if file.suffix == ".md":
return self._markdown_file_graph(parent_node, file, next_node_id)

def _tree_sitter_file_graph(
self, parent_node: KnowledgeGraphNode, file: Path, next_node_id: int
) -> Tuple[int, Sequence[KnowledgeGraphNode], Sequence[KnowledgeGraphEdge]]:
tree_sitter_nodes = []
tree_sitter_edges = []

tree = tree_sitter_parser.parse(file)
if tree.root_node.has_error or tree.root_node.child_count == 0:
return next_node_id, tree_sitter_nodes, tree_sitter_edges

ast_root_node = ASTNode(
type=tree.root_node.type,
start_line=tree.root_node.start_point[0],
end_line=tree.root_node.end_point[0],
text=tree.root_node.text.decode("utf-8"),
)
kg_ast_root_node = KnowledgeGraphNode(next_node_id, ast_root_node)
next_node_id += 1
tree_sitter_nodes.append(kg_ast_root_node)
tree_sitter_edges.append(
KnowledgeGraphEdge(parent_node, kg_ast_root_node, KnowledgeGraphEdgeType.has_ast)
)

node_stack = deque()
node_stack.append((tree.root_node, kg_ast_root_node, 1))
while node_stack:
tree_sitter_node, kg_node, depth = node_stack.pop()

if depth > self.max_ast_depth:
continue

for tree_sitter_child_node in tree_sitter_node.children:
child_ast_node = ASTNode(
type=tree_sitter_child_node.type,
start_line=tree_sitter_child_node.start_point[0],
end_line=tree_sitter_child_node.end_point[0],
text=tree_sitter_child_node.text.decode("utf-8"),
)
kg_child_ast_node = KnowledgeGraphNode(next_node_id, child_ast_node)
next_node_id += 1

tree_sitter_nodes.append(kg_child_ast_node)
tree_sitter_edges.append(
KnowledgeGraphEdge(
kg_node, kg_child_ast_node, KnowledgeGraphEdgeType.parent_of
)
)

node_stack.append((tree_sitter_child_node, kg_child_ast_node, depth + 1))
return next_node_id, tree_sitter_nodes, tree_sitter_edges

def _markdown_file_graph(
self, parent_node: KnowledgeGraphNode, file: Path, next_node_id: int
) -> Tuple[int, Sequence[KnowledgeGraphNode], Sequence[KnowledgeGraphEdge]]:
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
text = file.open(encoding="utf-8").read()
documents = markdown_splitter.split_text(text)
return self._documents_to_file_graph(documents, parent_node, next_node_id)

def _documents_to_file_graph(
self,
documents: Sequence[Document],
parent_node: KnowledgeGraphNode,
next_node_id: int,
) -> Tuple[int, Sequence[KnowledgeGraphNode], Sequence[KnowledgeGraphEdge]]:
document_nodes = []
document_edges = []

previous_node = None
for document in documents:
text_node = TextNode(
text=document.page_content,
metadata=str(document.metadata) if document.metadata else "",
)
kg_text_node = KnowledgeGraphNode(next_node_id, text_node)
next_node_id += 1
document_nodes.append(kg_text_node)
document_edges.append(
KnowledgeGraphEdge(parent_node, kg_text_node, KnowledgeGraphEdgeType.has_text)
)

if previous_node:
document_edges.append(
KnowledgeGraphEdge(
previous_node, kg_text_node, KnowledgeGraphEdgeType.next_chunk
)
)

previous_node = kg_text_node
return next_node_id, document_nodes, document_edges
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Type definition for nodes and edges in the knowledge graph."""


import dataclasses
import enum
from typing import TypedDict, Union
Expand Down Expand Up @@ -42,9 +41,11 @@ class TextNode:

Attributes:
text: A string.
metadata: The metadata about the string.
"""

text: str
metadata: str


@dataclasses.dataclass
Expand Down Expand Up @@ -77,7 +78,9 @@ def to_neo4j_node(self) -> Union["Neo4jFileNode", "Neo4jASTNode", "Neo4jTextNode
text=self.node.text,
)
case TextNode():
return Neo4jTextNode(node_id=self.node_id, text=self.node.text)
return Neo4jTextNode(
node_id=self.node_id, text=self.node.text, metadata=self.node.metadata
)
case _:
raise ValueError("Unknown KnowledgeGraphNode.node type")

Expand Down Expand Up @@ -163,6 +166,7 @@ class Neo4jASTNode(TypedDict):
class Neo4jTextNode(TypedDict):
node_id: int
text: str
metadata: str


class Neo4jHasFileEdge(TypedDict):
Expand Down
66 changes: 66 additions & 0 deletions prometheus/graph/knowledge_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from collections import deque
import logging
from pathlib import Path

from prometheus.graph.file_graph_builder import FileGraphBuilder
from prometheus.graph.graph_types import (
FileNode,
KnowledgeGraphEdge,
KnowledgeGraphEdgeType,
KnowledgeGraphNode,
)


class KnowledgeGraph:
def __init__(self, root_dir: Path, max_ast_depth: int):
self.max_ast_depth = max_ast_depth
self._next_node_id = 0
self._root_node = None
self._knowledge_graph_nodes = []
self._knowledge_graph_edges = []
self._file_graph_builder = FileGraphBuilder(max_ast_depth)
self._logger = logging.getLogger("prometheus.graph.knowledge_graph")

self._build_graph(root_dir)

def _build_graph(self, root_dir: Path):
root_dir_node = FileNode(basename=root_dir.name, relative_path=".")
kg_root_dir_node = KnowledgeGraphNode(self._next_node_id, root_dir_node)
self._next_node_id += 1
self._knowledge_graph_nodes.append(kg_root_dir_node)
self._root_node = kg_root_dir_node

file_stack = deque()
file_stack.append((root_dir, kg_root_dir_node))

while file_stack:
file, kg_file_path_node = file_stack.pop()

if file.is_dir():
for child_file in sorted(file.iterdir()):
child_file_node = FileNode(
basename=child_file.name,
relative_path=str(child_file.relative_to(root_dir)),
)
kg_child_file_node = KnowledgeGraphNode(self._next_node_id, child_file_node)
self._next_node_id += 1
self._knowledge_graph_nodes.append(kg_child_file_node)
self._knowledge_graph_edges.append(
KnowledgeGraphEdge(
kg_file_path_node, kg_child_file_node, KnowledgeGraphEdgeType.has_file
)
)

file_stack.append((child_file, kg_child_file_node))
continue

if self._file_graph_builder.supports_file(file):
next_node_id, kg_nodes, kg_edges = self._file_graph_builder.build_file_graph(
kg_file_path_node, file, self._next_node_id
)
self._next_node_id = next_node_id
self._knowledge_graph_nodes.extend(kg_nodes)
self._knowledge_graph_edges.extend(kg_edges)
continue

self._logger.info(f"Skip parsing {file} because it is not supported.")
File renamed without changes.
50 changes: 50 additions & 0 deletions prometheus/parser/file_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import enum
from pathlib import Path


class FileType(enum.StrEnum):
"""Enum of all tree-sitter supported file types"""

BASH = "bash"
C = "c"
CSHARP = "csharp"
CPP = "cpp"
GO = "go"
JAVA = "java"
JAVASCRIPT = "javascript"
KOTLIN = "kotlin"
PHP = "php"
PYTHON = "python"
SQL = "sql"
YAML = "yaml"
UNKNOWN = "UNKNOWN"

@classmethod
def from_path(cls, path: Path):
match path.suffix:
case ".sh":
return cls.BASH
case ".c":
return cls.C
case ".cs":
return cls.CSHARP
case ".cpp" | ".cc" | ".cxx":
return cls.CPP
case ".go":
return cls.GO
case ".java":
return cls.JAVA
case ".js":
return cls.JAVASCRIPT
case ".kt":
return cls.KOTLIN
case ".php":
return cls.PHP
case ".py":
return cls.PYTHON
case ".sql":
return cls.SQL
case ".yaml" | ".yml":
return cls.YAML
case _:
return cls.UNKNOWN
44 changes: 44 additions & 0 deletions prometheus/parser/tree_sitter_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pathlib import Path

from tree_sitter._binding import Tree
from tree_sitter_languages import get_parser

from prometheus.parser.file_types import FileType


class FileNotSupportedError(Exception):
pass


FILE_TYPE_TO_LANG = {
FileType.BASH: "bash",
FileType.C: "c",
FileType.CSHARP: "c_sharp",
FileType.CPP: "cpp",
FileType.GO: "go",
FileType.JAVA: "java",
FileType.JAVASCRIPT: "javascript",
FileType.KOTLIN: "kotlin",
FileType.PHP: "php",
FileType.PYTHON: "python",
FileType.SQL: "sql",
FileType.YAML: "yaml",
}


def supports_file(file: Path) -> bool:
file_type = FileType.from_path(file)
return file_type in FILE_TYPE_TO_LANG


def parse(file: Path) -> Tree:
file_type = FileType.from_path(file)
lang = FILE_TYPE_TO_LANG.get(file_type, None)
if lang is None:
raise FileNotSupportedError(
f"{file_type.value} is not supported by tree_sitter_parser"
)

lang_parser = get_parser(lang)
with file.open("rb") as f:
return lang_parser.parse(f.read())
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ build-backend = "hatchling.build"
[project]
name = "Prometheus"
version = "0.0.1"
dependencies = [
"langchain==0.3.3",
"tree-sitter==0.21.3",
"tree-sitter-languages==1.10.2",
]
requires-python = ">= 3.11"

[project.optional-dependencies]
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt

This file was deleted.

Empty file added tests/graph/__init__.py
Empty file.
Loading
Loading