Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: true
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12", "3.13"]

steps:
Expand Down
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from pathlib import Path

from setuptools import find_packages, setup

this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text(encoding="utf-8")

setup(
name="gitingest",
version="0.1.2",
Expand All @@ -19,7 +24,7 @@
author="Romain Courtois",
author_email="romain@coderamp.io",
description="CLI tool to analyze and create text dumps of codebases for LLMs",
long_description=open("README.md").read(),
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/cyclotruc/gitingest",
classifiers=[
Expand Down
4 changes: 3 additions & 1 deletion src/gitingest/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" Configuration file for the project. """

import tempfile
from pathlib import Path

MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
Expand All @@ -8,4 +9,5 @@
MAX_TOTAL_SIZE_BYTES = 500 * 1024 * 1024 # 500 MB

OUTPUT_FILE_PATH = "digest.txt"
TMP_BASE_PATH = Path("/tmp/gitingest")

TMP_BASE_PATH = Path(tempfile.gettempdir()) / "gitingest"
101 changes: 89 additions & 12 deletions src/gitingest/query_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
""" Functions to ingest and analyze a codebase directory or single file. """

import locale
import os
import platform
from fnmatch import fnmatch
from pathlib import Path
from typing import Any
Expand All @@ -16,6 +19,61 @@
from gitingest.notebook_utils import process_notebook
from gitingest.query_parser import ParsedQuery

try:
locale.setlocale(locale.LC_ALL, "")
except locale.Error:
locale.setlocale(locale.LC_ALL, "C")


def _normalize_path(path: Path) -> Path:
"""
Normalize path for cross-platform compatibility.

Parameters
----------
path : Path
The Path object to normalize.

Returns
-------
Path
The normalized path with platform-specific separators and resolved components.
"""
return Path(os.path.normpath(str(path)))


def _normalize_path_str(path: str | Path) -> str:
"""
Convert path to string with forward slashes for consistent output.

Parameters
----------
path : str | Path
The path to convert, can be string or Path object.

Returns
-------
str
The normalized path string with forward slashes as separators.
"""
return str(path).replace(os.sep, "/")


def _get_encoding_list() -> list[str]:
"""
Get list of encodings to try, prioritized for the current platform.

Returns
-------
list[str]
List of encoding names to try in priority order, starting with the
platform's default encoding followed by common fallback encodings.
"""
encodings = ["utf-8", "utf-8-sig"]
if platform.system() == "Windows":
encodings.extend(["cp1252", "iso-8859-1"])
return encodings + [locale.getpreferredencoding()]


def _should_include(path: Path, base_path: Path, include_patterns: set[str]) -> bool:
"""
Expand Down Expand Up @@ -107,9 +165,13 @@ def _is_safe_symlink(symlink_path: Path, base_path: Path) -> bool:
`True` if the symlink points within the base directory, `False` otherwise.
"""
try:
target_path = symlink_path.resolve()
base_resolved = base_path.resolve()
# It's "safe" if target_path == base_resolved or is inside base_resolved
if platform.system() == "Windows":
if not os.path.islink(str(symlink_path)):
return False

target_path = _normalize_path(symlink_path.resolve())
base_resolved = _normalize_path(base_path.resolve())

return base_resolved in target_path.parents or target_path == base_resolved
except (OSError, ValueError):
# If there's any error resolving the paths, consider it unsafe
Expand Down Expand Up @@ -162,10 +224,22 @@ def _read_file_content(file_path: Path) -> str:
"""
try:
if file_path.suffix == ".ipynb":
return process_notebook(file_path)
try:
return process_notebook(file_path)
except Exception as e:
return f"Error processing notebook: {e}"

for encoding in _get_encoding_list():
try:
with open(file_path, encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
except OSError as e:
return f"Error reading file: {e}"

return "Error: Unable to decode file with available encodings"

with open(file_path, encoding="utf-8", errors="ignore") as f:
return f.read()
except (OSError, InvalidNotebookError) as e:
return f"Error reading file: {e}"

Expand Down Expand Up @@ -531,10 +605,10 @@ def _extract_files_content(
content = node["content"]

relative_path = Path(node["path"]).relative_to(query.local_path)

# Store paths with forward slashes
files.append(
{
"path": str(relative_path),
"path": _normalize_path_str(relative_path),
"content": content,
"size": node["size"],
},
Expand Down Expand Up @@ -572,7 +646,8 @@ def _create_file_content_string(files: list[dict[str, Any]]) -> str:
continue

output += separator
output += f"File: {file['path']}\n"
# Use forward slashes in output paths
output += f"File: {_normalize_path_str(file['path'])}\n"
output += separator
output += f"{file['content']}\n\n"

Expand Down Expand Up @@ -815,11 +890,13 @@ def run_ingest_query(query: ParsedQuery) -> tuple[str, str, str]:
ValueError
If the specified path cannot be found or if the file is not a text file.
"""
path = query.local_path / query.subpath.lstrip("/")
subpath = _normalize_path(Path(query.subpath.strip("/"))).as_posix()
path = _normalize_path(query.local_path / subpath)

if not path.exists():
raise ValueError(f"{query.slug} cannot be found")

if query.type and query.type == "blob":
return _ingest_single_file(path, query)
return _ingest_single_file(_normalize_path(path.resolve()), query)

return _ingest_directory(path, query)
return _ingest_directory(_normalize_path(path.resolve()), query)
29 changes: 28 additions & 1 deletion src/gitingest/repository_clone.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
""" This module contains functions for cloning a Git repository to a local path. """

import asyncio
import os
from dataclasses import dataclass
from pathlib import Path

from gitingest.utils import async_timeout

Expand Down Expand Up @@ -61,6 +63,8 @@ async def clone_repo(config: CloneConfig) -> tuple[bytes, bytes]:
------
ValueError
If the 'url' or 'local_path' parameters are missing, or if the repository is not found.
OSError
If there is an error creating the parent directory structure.
"""
# Extract and validate query parameters
url: str = config.url
Expand All @@ -74,6 +78,13 @@ async def clone_repo(config: CloneConfig) -> tuple[bytes, bytes]:
if not local_path:
raise ValueError("The 'local_path' parameter is required.")

# Create parent directory if it doesn't exist
parent_dir = Path(local_path).parent
try:
os.makedirs(parent_dir, exist_ok=True)
except OSError as e:
raise OSError(f"Failed to create parent directory {parent_dir}: {e}") from e

# Check if the repository exists
if not await _check_repo_exists(url):
raise ValueError("Repository not found, make sure it is public")
Expand Down Expand Up @@ -182,8 +193,24 @@ async def _run_git_command(*args: str) -> tuple[bytes, bytes]:
Raises
------
RuntimeError
If the Git command exits with a non-zero status.
If Git is not installed or if the Git command exits with a non-zero status.
"""
# Check if Git is installed
try:
version_proc = await asyncio.create_subprocess_exec(
"git",
"--version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await version_proc.communicate()
if version_proc.returncode != 0:
error_message = stderr.decode().strip() if stderr else "Git command not found"
raise RuntimeError(f"Git is not installed or not accessible: {error_message}")
except FileNotFoundError as exc:
raise RuntimeError("Git is not installed. Please install Git before proceeding.") from exc

# Execute the requested Git command
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
Expand Down
34 changes: 34 additions & 0 deletions tests/test_repository_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import asyncio
import os
from pathlib import Path
from unittest.mock import AsyncMock, patch

import pytest
Expand Down Expand Up @@ -362,3 +363,36 @@ async def test_clone_branch_with_slashes(tmp_path):
clone_config.url,
clone_config.local_path,
)


@pytest.mark.asyncio
async def test_clone_repo_creates_parent_directory(tmp_path: Path) -> None:
"""
Test that clone_repo creates parent directories if they don't exist.

Given a local path with non-existent parent directories:
When `clone_repo` is called,
Then it should create the parent directories before attempting to clone.
"""
nested_path = tmp_path / "deep" / "nested" / "path" / "repo"
clone_config = CloneConfig(
url="https://github.com/user/repo",
local_path=str(nested_path),
)

with patch("gitingest.repository_clone._check_repo_exists", return_value=True):
with patch("gitingest.repository_clone._run_git_command", new_callable=AsyncMock) as mock_exec:
await clone_repo(clone_config)

# Verify parent directory was created
assert nested_path.parent.exists()

# Verify git clone was called with correct parameters
mock_exec.assert_called_once_with(
"git",
"clone",
"--depth=1",
"--single-branch",
clone_config.url,
str(nested_path),
)