From e6d0dc2d3ce097dee23c1437244267d616400f8d Mon Sep 17 00:00:00 2001 From: Filip Christiansen <22807962+filipchristiansen@users.noreply.github.com> Date: Fri, 7 Mar 2025 16:42:08 +0100 Subject: [PATCH 1/2] Refactor ingestion logic to unify single-file and directory output, remove unused exceptions, and fix partial clone subpath handling. - Consolidate `format_directory` and `format_single_file` into a single `format_node` function - Remove unused exceptions (`MaxFilesReachedError`, `MaxFileSizeReachedError`, `AlreadyVisitedError`) - Update partial clone logic to correctly handle single-file paths by stripping the filename from subpath when `blob` is True - Improve docstrings and clean up code for better readability --- src/gitingest/cloning.py | 9 +- src/gitingest/exceptions.py | 21 ---- src/gitingest/filesystem_schema.py | 125 ++++++++++---------- src/gitingest/ingestion.py | 12 +- src/gitingest/output_formatters.py | 182 +++++++++++++---------------- 5 files changed, 151 insertions(+), 198 deletions(-) diff --git a/src/gitingest/cloning.py b/src/gitingest/cloning.py index ffd933c1..e24d5230 100644 --- a/src/gitingest/cloning.py +++ b/src/gitingest/cloning.py @@ -100,11 +100,12 @@ async def clone_repo(config: CloneConfig) -> None: checkout_cmd = ["git", "-C", local_path] if partial_clone: + subpath = config.subpath.lstrip("/") if config.blob: - # When ingesting from a file url (blob/branch/path/file.txt), we need to remove the file name - checkout_cmd += ["sparse-checkout", "set", Path(config.subpath.lstrip("/")).parent] - else: - checkout_cmd += ["sparse-checkout", "set", config.subpath.lstrip("/")] + # When ingesting from a file url (blob/branch/path/file.txt), we need to remove the file name. + subpath = str(Path(subpath).parent.as_posix()) + + checkout_cmd += ["sparse-checkout", "set", subpath] if commit: checkout_cmd += ["checkout", commit] diff --git a/src/gitingest/exceptions.py b/src/gitingest/exceptions.py index 3b01018d..241baf00 100644 --- a/src/gitingest/exceptions.py +++ b/src/gitingest/exceptions.py @@ -30,27 +30,6 @@ class AsyncTimeoutError(Exception): """ -class MaxFilesReachedError(Exception): - """Exception raised when the maximum number of files is reached.""" - - def __init__(self, max_files: int) -> None: - super().__init__(f"Maximum number of files ({max_files}) reached.") - - -class MaxFileSizeReachedError(Exception): - """Exception raised when the maximum file size is reached.""" - - def __init__(self, max_size: int): - super().__init__(f"Maximum file size limit ({max_size/1024/1024:.1f}MB) reached.") - - -class AlreadyVisitedError(Exception): - """Exception raised when a symlink target has already been visited.""" - - def __init__(self, path: str) -> None: - super().__init__(f"Symlink target already visited: {path}") - - class InvalidNotebookError(Exception): """Exception raised when a Jupyter notebook is invalid or cannot be processed.""" diff --git a/src/gitingest/filesystem_schema.py b/src/gitingest/filesystem_schema.py index 169830ba..77d0e464 100644 --- a/src/gitingest/filesystem_schema.py +++ b/src/gitingest/filesystem_schema.py @@ -7,12 +7,11 @@ from enum import Enum, auto from pathlib import Path -from gitingest.exceptions import InvalidNotebookError from gitingest.utils.ingestion_utils import _get_encoding_list from gitingest.utils.notebook_utils import process_notebook from gitingest.utils.textfile_checker_utils import is_textfile -SEPARATOR = "=" * 48 + "\n" +SEPARATOR = "=" * 48 class FileSystemNodeType(Enum): @@ -36,108 +35,104 @@ class FileSystemNode: # pylint: disable=too-many-instance-attributes """ Class representing a node in the file system (either a file or directory). - This class has more than the recommended number of attributes because it needs to - track various properties of files and directories for comprehensive analysis. + Tracks properties of files/directories for comprehensive analysis. """ name: str - type: FileSystemNodeType # e.g., "directory" or "file" + type: FileSystemNodeType path_str: str path: Path size: int = 0 file_count: int = 0 dir_count: int = 0 depth: int = 0 - children: list[FileSystemNode] = field(default_factory=list) # Using default_factory instead of empty list + children: list[FileSystemNode] = field(default_factory=list) def sort_children(self) -> None: """ Sort the children nodes of a directory according to a specific order. Order of sorting: - 1. README.md first - 2. Regular files (not starting with dot) - 3. Hidden files (starting with dot) - 4. Regular directories (not starting with dot) - 5. Hidden directories (starting with dot) - All groups are sorted alphanumerically within themselves. - """ - # Separate files and directories - files = [child for child in self.children if child.type == FileSystemNodeType.FILE] - directories = [child for child in self.children if child.type == FileSystemNodeType.DIRECTORY] + 2. Regular files (not starting with dot) + 3. Hidden files (starting with dot) + 4. Regular directories (not starting with dot) + 5. Hidden directories (starting with dot) - # Find README.md - readme_files = [f for f in files if f.name.lower() == "readme.md"] - other_files = [f for f in files if f.name.lower() != "readme.md"] + All groups are sorted alphanumerically within themselves. - # Separate hidden and regular files/directories - regular_files = [f for f in other_files if not f.name.startswith(".")] - hidden_files = [f for f in other_files if f.name.startswith(".")] - regular_dirs = [d for d in directories if not d.name.startswith(".")] - hidden_dirs = [d for d in directories if d.name.startswith(".")] + Raises + ------ + ValueError + If the node is not a directory. + """ + if self.type != FileSystemNodeType.DIRECTORY: + raise ValueError("Cannot sort children of a non-directory node") - # Sort each group alphanumerically - regular_files.sort(key=lambda x: x.name) - hidden_files.sort(key=lambda x: x.name) - regular_dirs.sort(key=lambda x: x.name) - hidden_dirs.sort(key=lambda x: x.name) + def _sort_key(child: FileSystemNode) -> tuple[int, str]: + # Groups: 0=README, 1=regular file, 2=hidden file, 3=regular dir, 4=hidden dir + name = child.name.lower() + if child.type == FileSystemNodeType.FILE: + if name == "readme.md": + return (0, name) + return (1 if not name.startswith(".") else 2, name) + return (3 if not name.startswith(".") else 4, name) - self.children = readme_files + regular_files + hidden_files + regular_dirs + hidden_dirs + self.children.sort(key=_sort_key) @property def content_string(self) -> str: """ - Return the content of the node as a string. - - This property returns the content of the node as a string, including the path and content. + Return the content of the node as a string, including path and content. Returns ------- str A string representation of the node's content. """ - content_repr = SEPARATOR + parts = [ + SEPARATOR, + f"File: {str(self.path_str).replace(os.sep, '/')}", + SEPARATOR, + f"{self.content}", + ] - # Use forward slashes in output paths - content_repr += f"File: {str(self.path_str).replace(os.sep, '/')}\n" - content_repr += SEPARATOR - content_repr += f"{self.content}\n\n" - return content_repr + return "\n".join(parts) + "\n\n" @property def content(self) -> str: # pylint: disable=too-many-return-statements """ - Read the content of a file. - - This function attempts to open a file and read its contents using UTF-8 encoding. - If an error occurs during reading (e.g., file is not found or permission error), - it returns an error message. + Read the content of a file if it's text (or a notebook). Return an error message otherwise. Returns ------- str The content of the file, or an error message if the file could not be read. + + Raises + ------ + ValueError + If the node is a directory. """ - if self.type == FileSystemNodeType.FILE and not is_textfile(self.path): + if self.type == FileSystemNodeType.DIRECTORY: + raise ValueError("Cannot read content of a directory node") + + if not is_textfile(self.path): return "[Non-text file]" - try: - if self.path.suffix == ".ipynb": - try: - return process_notebook(self.path) - except Exception as exc: - return f"Error processing notebook: {exc}" - - for encoding in _get_encoding_list(): - try: - with self.path.open(encoding=encoding) as f: - return f.read() - except UnicodeDecodeError: - continue - except OSError as exc: - return f"Error reading file: {exc}" - - return "Error: Unable to decode file with available encodings" - - except (OSError, InvalidNotebookError) as exc: - return f"Error reading file: {exc}" + if self.path.suffix == ".ipynb": + try: + return process_notebook(self.path) + except Exception as exc: + return f"Error processing notebook: {exc}" + + # Try multiple encodings + for encoding in _get_encoding_list(): + try: + with self.path.open(encoding=encoding) as f: + return f.read() + except UnicodeDecodeError: + continue + except OSError as exc: + return f"Error reading file: {exc}" + + return "Error: Unable to decode file with available encodings" diff --git a/src/gitingest/ingestion.py b/src/gitingest/ingestion.py index 24b65b39..bdfbdbf6 100644 --- a/src/gitingest/ingestion.py +++ b/src/gitingest/ingestion.py @@ -6,7 +6,7 @@ from gitingest.config import MAX_DIRECTORY_DEPTH, MAX_FILES, MAX_TOTAL_SIZE_BYTES from gitingest.filesystem_schema import FileSystemNode, FileSystemNodeType, FileSystemStats -from gitingest.output_formatters import format_directory, format_single_file +from gitingest.output_formatters import format_node from gitingest.query_parsing import ParsedQuery from gitingest.utils.ingestion_utils import _should_exclude, _should_include from gitingest.utils.path_utils import _is_safe_symlink @@ -38,7 +38,7 @@ def ingest_query(query: ParsedQuery) -> Tuple[str, str, str]: Raises ------ ValueError - If the specified path cannot be found or if the file is not a text file. + If the path cannot be found, is not a file, or the file has no content. """ subpath = Path(query.subpath.strip("/")).as_posix() path = query.local_path / subpath @@ -63,7 +63,11 @@ def ingest_query(query: ParsedQuery) -> Tuple[str, str, str]: path_str=str(relative_path), path=path, ) - return format_single_file(file_node, query) + + if not file_node.content: + raise ValueError(f"File {file_node.name} has no content") + + return format_node(file_node, query) root_node = FileSystemNode( name=path.name, @@ -80,7 +84,7 @@ def ingest_query(query: ParsedQuery) -> Tuple[str, str, str]: stats=stats, ) - return format_directory(root_node, query) + return format_node(root_node, query) def apply_gitingest_file(path: Path, query: ParsedQuery) -> None: diff --git a/src/gitingest/output_formatters.py b/src/gitingest/output_formatters.py index c9228361..8d5a278c 100644 --- a/src/gitingest/output_formatters.py +++ b/src/gitingest/output_formatters.py @@ -1,4 +1,4 @@ -""" Functions to ingest and analyze a codebase directory or single file. """ +"""Functions to ingest and analyze a codebase directory or single file.""" from typing import Optional, Tuple @@ -8,105 +8,109 @@ from gitingest.query_parsing import ParsedQuery -def _create_summary_string(query: ParsedQuery, node: FileSystemNode) -> str: +def format_node(node: FileSystemNode, query: ParsedQuery) -> Tuple[str, str, str]: """ - Create a summary string with file counts and content size. + Generate a summary, directory structure, and file contents for a given file system node. - This function generates a summary of the repository's contents, including the number - of files analyzed, the total content size, and other relevant details based on the query parameters. + If the node represents a directory, the function will recursively process its contents. Parameters ---------- + node : FileSystemNode + The file system node to be summarized. query : ParsedQuery The parsed query object containing information about the repository and query parameters. - node : FileSystemNode - The root node representing the directory structure, including file and directory counts. Returns ------- - str - Summary string containing details such as repository name, file count, and other query-specific information. + Tuple[str, str, str] + A tuple containing the summary, directory structure, and file contents. """ - if query.user_name: - summary = f"Repository: {query.user_name}/{query.repo_name}\n" + is_single_file = node.type == FileSystemNodeType.FILE + summary = _create_summary_prefix(query, single_file=is_single_file) + + if node.type == FileSystemNodeType.DIRECTORY: + summary += f"Files analyzed: {node.file_count}\n" else: - # Local scenario - summary = f"Directory: {query.slug}\n" + summary += f"File: {node.name}\n" + summary += f"Lines: {len(node.content.splitlines()):,}\n" - if query.commit: - summary += f"Commit: {query.commit}\n" - elif query.branch and query.branch not in ("main", "master"): - summary += f"Branch: {query.branch}\n" + tree = "Directory structure:\n" + _create_tree_structure(query, node) + _create_tree_structure(query, node) - if query.subpath != "/": - summary += f"Subpath: {query.subpath}\n" + content = _gather_file_contents(node) - summary += f"Files analyzed: {node.file_count}\n" - # TODO: Do we want to add the total number of lines? + token_estimate = _format_token_count(tree + content) + if token_estimate: + summary += f"\nEstimated tokens: {token_estimate}" - return summary + return summary, tree, content -def format_single_file(file_node: FileSystemNode, query: ParsedQuery) -> Tuple[str, str, str]: +def _create_summary_prefix(query: ParsedQuery, single_file: bool = False) -> str: """ - Format a single file for display. + Create a prefix string for summarizing a repository or local directory. - This function generates a summary, tree structure, and content for a single file. - It includes information such as the repository name, commit/branch, file name, - line count, and estimated token count. + Includes repository name (if provided), commit/branch details, and subpath if relevant. Parameters ---------- - file_node : FileSystemNode - The node representing the file to format. query : ParsedQuery The parsed query object containing information about the repository and query parameters. + single_file : bool + A flag indicating whether the summary is for a single file, by default False. Returns ------- - Tuple[str, str, str] - A tuple containing the summary, tree structure, and file content. - - Raises - ------ - ValueError - If the file has no content. + str + A summary prefix string containing repository, commit, branch, and subpath details. """ - if not file_node.content: - raise ValueError(f"File {file_node.name} has no content") + parts = [] - summary = f"Repository: {query.user_name}/{query.repo_name}\n" + if query.user_name: + parts.append(f"Repository: {query.user_name}/{query.repo_name}") + else: + # Local scenario + parts.append(f"Directory: {query.slug}") if query.commit: - summary += f"Commit: {query.commit}\n" + parts.append(f"Commit: {query.commit}") elif query.branch and query.branch not in ("main", "master"): - summary += f"Branch: {query.branch}\n" + parts.append(f"Branch: {query.branch}") - summary += f"File: {file_node.name}\n" - summary += f"Lines: {len(file_node.content.splitlines()):,}\n" + if query.subpath != "/" and not single_file: + parts.append(f"Subpath: {query.subpath}") - files_content = file_node.content_string + return "\n".join(parts) + "\n" - tree = "Directory structure:\n└── " + file_node.name - formatted_tokens = _generate_token_string(files_content) - if formatted_tokens: - summary += f"\nEstimated tokens: {formatted_tokens}" +def _gather_file_contents(node: FileSystemNode) -> str: + """ + Recursively gather contents of all files under the given node. - return summary, tree, files_content + This function recursively processes a directory node and gathers the contents of all files + under that node. It returns the concatenated content of all files as a single string. + Parameters + ---------- + node : FileSystemNode + The current directory or file node being processed. -def _get_files_content(node: FileSystemNode) -> str: + Returns + ------- + str + The concatenated content of all files under the given node. + """ if node.type == FileSystemNodeType.FILE: return node.content_string - if node.type == FileSystemNodeType.DIRECTORY: - return "\n".join(_get_files_content(child) for child in node.children) - return "" + + # Recursively gather contents of all files under the current directory + return "\n".join(_gather_file_contents(child) for child in node.children) def _create_tree_structure(query: ParsedQuery, node: FileSystemNode, prefix: str = "", is_last: bool = True) -> str: """ - Create a tree-like string representation of the file structure. + Generate a tree-like string representation of the file structure. This function generates a string representation of the directory structure, formatted as a tree with appropriate indentation for nested directories and files. @@ -127,36 +131,36 @@ def _create_tree_structure(query: ParsedQuery, node: FileSystemNode, prefix: str str A string representing the directory structure formatted as a tree. """ - tree = "" - if not node.name: + # If no name is present, use the slug as the top-level directory name node.name = query.slug - if node.name: - current_prefix = "└── " if is_last else "├── " - name = node.name + "/" if node.type == FileSystemNodeType.DIRECTORY else node.name - tree += prefix + current_prefix + name + "\n" + tree_str = "" + current_prefix = "└── " if is_last else "├── " + # Indicate directories with a trailing slash + display_name = node.name if node.type == FileSystemNodeType.DIRECTORY: - # Adjust prefix only if we added a node name - new_prefix = prefix + (" " if is_last else "│ ") if node.name else prefix - children = node.children - for i, child in enumerate(children): - tree += _create_tree_structure(query, node=child, prefix=new_prefix, is_last=i == len(children) - 1) + display_name += "/" + + tree_str += f"{prefix}{current_prefix}{display_name}\n" - return tree + if node.type == FileSystemNodeType.DIRECTORY and node.children: + prefix += " " if is_last else "│ " + for i, child in enumerate(node.children): + tree_str += _create_tree_structure(query, node=child, prefix=prefix, is_last=i == len(node.children) - 1) + return tree_str -def _generate_token_string(context_string: str) -> Optional[str]: +def _format_token_count(text: str) -> Optional[str]: """ - Return the number of tokens in a text string. + Return a human-readable string representing the token count of the given text. - This function estimates the number of tokens in a given text string using the `tiktoken` - library. It returns the number of tokens in a human-readable format (e.g., '1.2k', '1.2M'). + E.g., '120' -> '120', '1200' -> '1.2k', '1200000' -> '1.2M'. Parameters ---------- - context_string : str + text : str The text string for which the token count is to be estimated. Returns @@ -166,45 +170,15 @@ def _generate_token_string(context_string: str) -> Optional[str]: """ try: encoding = tiktoken.get_encoding("cl100k_base") - total_tokens = len(encoding.encode(context_string, disallowed_special=())) + total_tokens = len(encoding.encode(text, disallowed_special=())) except (ValueError, UnicodeEncodeError) as exc: print(exc) return None - if total_tokens > 1_000_000: + if total_tokens >= 1_000_000: return f"{total_tokens / 1_000_000:.1f}M" - if total_tokens > 1_000: + if total_tokens >= 1_000: return f"{total_tokens / 1_000:.1f}k" return str(total_tokens) - - -def format_directory(root_node: FileSystemNode, query: ParsedQuery) -> Tuple[str, str, str]: - """ - Ingest an entire directory and return its summary, directory structure, and file contents. - - This function processes a directory, extracts its contents, and generates a summary, - directory structure, and file content. It recursively processes subdirectories as well. - - Parameters - ---------- - root_node : FileSystemNode - The root node representing the directory to process. - query : ParsedQuery - The parsed query object containing information about the repository and query parameters. - - Returns - ------- - Tuple[str, str, str] - A tuple containing the summary, directory structure, and file contents. - """ - summary = _create_summary_string(query, node=root_node) - tree = "Directory structure:\n" + _create_tree_structure(query, root_node) - files_content = _get_files_content(root_node) - - formatted_tokens = _generate_token_string(tree + files_content) - if formatted_tokens: - summary += f"\nEstimated tokens: {formatted_tokens}" - - return summary, tree, files_content From 2c593bf8d14155b8c86e6c17ab2653e57610302a Mon Sep 17 00:00:00 2001 From: cyclotruc Date: Fri, 7 Mar 2025 20:33:57 +0000 Subject: [PATCH 2/2] add comments --- src/gitingest/filesystem_schema.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/gitingest/filesystem_schema.py b/src/gitingest/filesystem_schema.py index 77d0e464..61f60a95 100644 --- a/src/gitingest/filesystem_schema.py +++ b/src/gitingest/filesystem_schema.py @@ -11,7 +11,7 @@ from gitingest.utils.notebook_utils import process_notebook from gitingest.utils.textfile_checker_utils import is_textfile -SEPARATOR = "=" * 48 +SEPARATOR = "=" * 48 # Tiktoken, the tokenizer openai uses, counts 2 tokens if we have more than 48 class FileSystemNodeType(Enum): @@ -69,6 +69,7 @@ def sort_children(self) -> None: raise ValueError("Cannot sort children of a non-directory node") def _sort_key(child: FileSystemNode) -> tuple[int, str]: + # returns the priority order for the sort function, 0 is first # Groups: 0=README, 1=regular file, 2=hidden file, 3=regular dir, 4=hidden dir name = child.name.lower() if child.type == FileSystemNodeType.FILE: