Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 56 additions & 23 deletions src/capiscio/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,28 @@ def get_binary_path(version: str) -> Path:
# For now, let's put it in a versioned folder
return get_cache_dir() / version / filename

def _fetch_expected_checksum(version: str, filename: str) -> Optional[str]:
"""Fetch the expected SHA-256 checksum from the release checksums.txt."""
def _fetch_expected_checksum(version: str, filename: str) -> Tuple[Optional[str], str]:
"""Fetch the expected SHA-256 checksum from the release checksums.txt.

Returns:
A tuple of (checksum, status) where status is one of:
- "ok" — checksum found and returned
- "fetch_failed" — could not download checksums.txt
- "entry_missing" — checksums.txt downloaded but no entry for filename
"""
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{version}/checksums.txt"
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
for line in resp.text.strip().splitlines():
parts = line.split()
if len(parts) == 2 and parts[1] == filename:
return parts[0]
logger.warning(f"Binary {filename} not found in checksums.txt")
return None
with requests.get(url, timeout=30) as resp:
resp.raise_for_status()
for line in resp.text.strip().splitlines():
parts = line.split()
if len(parts) == 2 and parts[1] == filename:
return parts[0], "ok"
logger.warning(f"Binary {filename} not found in checksums.txt")
return None, "entry_missing"
except requests.exceptions.RequestException as e:
logger.warning(f"Could not fetch checksums.txt: {e}")
return None
return None, "fetch_failed"

def _verify_checksum(file_path: Path, expected_hash: str) -> bool:
"""Verify SHA-256 checksum of a downloaded file."""
Expand Down Expand Up @@ -135,26 +142,52 @@ def download_binary(version: str) -> Path:
f.write(chunk)
progress.update(task, advance=len(chunk))

# Make executable
st = os.stat(target_path)
os.chmod(target_path, st.st_mode | stat.S_IEXEC)

# Verify checksum integrity
expected_hash = _fetch_expected_checksum(version, filename)
# Verify checksum BEFORE making executable (security: validate before trust)
#
# CAPISCIO_REQUIRE_CHECKSUM env var controls strict verification:
# When set to "1", "true", or "yes", the download will fail if
# checksums.txt cannot be fetched OR the binary entry is missing.
# When unset/false, a warning is logged but the binary is still used.
require_checksum = os.environ.get("CAPISCIO_REQUIRE_CHECKSUM", "").lower() in ("1", "true", "yes")
expected_hash, checksum_status = _fetch_expected_checksum(version, target_path.name)
if expected_hash is not None:
if not _verify_checksum(target_path, expected_hash):
target_path.unlink()
raise RuntimeError(
f"Binary integrity check failed for {filename}. "
f"Binary integrity check failed for {target_path.name}. "
"The downloaded file does not match the published checksum. "
"This may indicate a tampered or corrupted download."
)
logger.info(f"Checksum verified for {filename}")
logger.info(f"Checksum verified for {target_path.name}")
elif require_checksum:
target_path.unlink()
if checksum_status == "fetch_failed":
raise RuntimeError(
f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) "
f"but checksums.txt could not be fetched for v{version}. "
"Cannot verify binary integrity."
)
else:
raise RuntimeError(
f"Checksum verification required (CAPISCIO_REQUIRE_CHECKSUM=true) "
f"but no checksum entry found for {target_path.name} in v{version} checksums.txt. "
"Cannot verify binary integrity."
)
else:
logger.warning(
"Could not verify binary integrity (checksums.txt not available). "
"Consider upgrading capiscio-core to a version that publishes checksums."
)
if checksum_status == "fetch_failed":
logger.warning(
"Could not verify binary integrity (checksums.txt not available). "
"Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification."
)
else:
logger.warning(
f"Could not verify binary integrity (no entry for {target_path.name} in checksums.txt). "
"Set CAPISCIO_REQUIRE_CHECKSUM=true to enforce verification."
)

# Make executable only after checksum verification passes
st = os.stat(target_path)
os.chmod(target_path, st.st_mode | stat.S_IEXEC)

console.print(f"[green]Successfully installed CapiscIO Core v{version}[/green]")
return target_path
Expand Down
163 changes: 162 additions & 1 deletion tests/unit/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
get_binary_path,
download_binary,
run_core,
_fetch_expected_checksum,
_verify_checksum,
CORE_VERSION,
GITHUB_REPO,
)
Expand Down Expand Up @@ -145,15 +147,17 @@ def test_returns_existing_binary(self, mock_get_path):
result = download_binary("1.0.0")
assert result == mock_path

@patch('capiscio.manager._fetch_expected_checksum', return_value=(None, "fetch_failed"))
@patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64'))
@patch('capiscio.manager.get_binary_path')
@patch('capiscio.manager.requests.get')
@patch('capiscio.manager.console')
def test_downloads_binary_on_missing(self, mock_console, mock_requests, mock_get_path, mock_platform):
def test_downloads_binary_on_missing(self, mock_console, mock_requests, mock_get_path, mock_platform, mock_fetch_checksum):
"""Test that binary is downloaded when missing."""
mock_path = MagicMock(spec=Path)
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_path.name = "capiscio-linux-amd64"
mock_get_path.return_value = mock_path

# Mock the response
Expand Down Expand Up @@ -194,6 +198,163 @@ def test_cleans_up_on_download_error(self, mock_console, mock_requests, mock_get
mock_path.unlink.assert_called_once()


class TestFetchExpectedChecksum:
"""Tests for _fetch_expected_checksum function."""

@patch('capiscio.manager.requests.get')
def test_returns_checksum_on_match(self, mock_get):
"""Test successful checksum lookup."""
mock_resp = MagicMock()
mock_resp.text = "abc123 capiscio-linux-amd64\ndef456 capiscio-darwin-arm64\n"
mock_resp.raise_for_status = MagicMock()
mock_resp.__enter__ = MagicMock(return_value=mock_resp)
mock_resp.__exit__ = MagicMock(return_value=False)
mock_get.return_value = mock_resp

checksum, status = _fetch_expected_checksum("1.0.0", "capiscio-linux-amd64")
assert checksum == "abc123"
assert status == "ok"

@patch('capiscio.manager.requests.get')
def test_returns_entry_missing_when_not_found(self, mock_get):
"""Test that missing entry returns entry_missing status."""
mock_resp = MagicMock()
mock_resp.text = "abc123 capiscio-linux-amd64\n"
mock_resp.raise_for_status = MagicMock()
mock_resp.__enter__ = MagicMock(return_value=mock_resp)
mock_resp.__exit__ = MagicMock(return_value=False)
mock_get.return_value = mock_resp

checksum, status = _fetch_expected_checksum("1.0.0", "capiscio-darwin-arm64")
assert checksum is None
assert status == "entry_missing"

@patch('capiscio.manager.requests.get')
def test_returns_fetch_failed_on_network_error(self, mock_get):
"""Test that network errors return fetch_failed status."""
import requests.exceptions
mock_get.side_effect = requests.exceptions.ConnectionError("timeout")

checksum, status = _fetch_expected_checksum("1.0.0", "capiscio-linux-amd64")
assert checksum is None
assert status == "fetch_failed"


class TestChecksumVerificationIntegration:
"""Tests for checksum verification during download."""

def _make_download_mocks(self):
"""Helper: set up common mocks for download_binary tests."""
mock_path = MagicMock(spec=Path)
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_path.name = "capiscio-linux-amd64"
return mock_path

@patch('capiscio.manager._fetch_expected_checksum', return_value=("abc123", "ok"))
@patch('capiscio.manager._verify_checksum', return_value=True)
@patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64'))
@patch('capiscio.manager.get_binary_path')
@patch('capiscio.manager.requests.get')
@patch('capiscio.manager.console')
def test_checksum_verified_match(self, mock_console, mock_requests, mock_get_path,
mock_platform, mock_verify, mock_fetch):
"""Test that download succeeds when checksum matches."""
mock_path = self._make_download_mocks()
mock_get_path.return_value = mock_path

mock_response = MagicMock()
mock_response.headers = {'content-length': '1024'}
mock_response.iter_content.return_value = [b'x' * 1024]
mock_response.__enter__ = MagicMock(return_value=mock_response)
mock_response.__exit__ = MagicMock(return_value=False)
mock_requests.return_value = mock_response

with patch('builtins.open', mock_open()):
with patch.object(os, 'stat') as mock_stat:
with patch.object(os, 'chmod'):
mock_stat.return_value = MagicMock(st_mode=0o644)
result = download_binary("1.0.0")

assert result == mock_path
mock_verify.assert_called_once_with(mock_path, "abc123")

@patch('capiscio.manager._fetch_expected_checksum', return_value=("abc123", "ok"))
@patch('capiscio.manager._verify_checksum', return_value=False)
@patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64'))
@patch('capiscio.manager.get_binary_path')
@patch('capiscio.manager.requests.get')
@patch('capiscio.manager.console')
def test_checksum_mismatch_cleans_up(self, mock_console, mock_requests, mock_get_path,
mock_platform, mock_verify, mock_fetch):
"""Test that a checksum mismatch deletes the binary and raises."""
mock_path = self._make_download_mocks()
mock_get_path.return_value = mock_path

mock_response = MagicMock()
mock_response.headers = {'content-length': '1024'}
mock_response.iter_content.return_value = [b'x' * 1024]
mock_response.__enter__ = MagicMock(return_value=mock_response)
mock_response.__exit__ = MagicMock(return_value=False)
mock_requests.return_value = mock_response

with patch('builtins.open', mock_open()):
with pytest.raises(RuntimeError, match="integrity check failed"):
download_binary("1.0.0")

mock_path.unlink.assert_called()

@patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"})
@patch('capiscio.manager._fetch_expected_checksum', return_value=(None, "fetch_failed"))
@patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64'))
@patch('capiscio.manager.get_binary_path')
@patch('capiscio.manager.requests.get')
@patch('capiscio.manager.console')
def test_require_checksum_fails_on_fetch_failed(self, mock_console, mock_requests,
mock_get_path, mock_platform, mock_fetch):
"""Test fail-closed when CAPISCIO_REQUIRE_CHECKSUM=true and fetch fails."""
mock_path = self._make_download_mocks()
mock_get_path.return_value = mock_path

mock_response = MagicMock()
mock_response.headers = {'content-length': '1024'}
mock_response.iter_content.return_value = [b'x' * 1024]
mock_response.__enter__ = MagicMock(return_value=mock_response)
mock_response.__exit__ = MagicMock(return_value=False)
mock_requests.return_value = mock_response

with patch('builtins.open', mock_open()):
with pytest.raises(RuntimeError, match="could not be fetched"):
download_binary("1.0.0")

mock_path.unlink.assert_called()

@patch.dict(os.environ, {"CAPISCIO_REQUIRE_CHECKSUM": "true"})
@patch('capiscio.manager._fetch_expected_checksum', return_value=(None, "entry_missing"))
@patch('capiscio.manager.get_platform_info', return_value=('linux', 'amd64'))
@patch('capiscio.manager.get_binary_path')
@patch('capiscio.manager.requests.get')
@patch('capiscio.manager.console')
def test_require_checksum_fails_on_entry_missing(self, mock_console, mock_requests,
mock_get_path, mock_platform, mock_fetch):
"""Test fail-closed when CAPISCIO_REQUIRE_CHECKSUM=true and entry is missing."""
mock_path = self._make_download_mocks()
mock_get_path.return_value = mock_path

mock_response = MagicMock()
mock_response.headers = {'content-length': '1024'}
mock_response.iter_content.return_value = [b'x' * 1024]
mock_response.__enter__ = MagicMock(return_value=mock_response)
mock_response.__exit__ = MagicMock(return_value=False)
mock_requests.return_value = mock_response

with patch('builtins.open', mock_open()):
with pytest.raises(RuntimeError, match="no checksum entry found"):
download_binary("1.0.0")

mock_path.unlink.assert_called()


class TestRunCore:
"""Tests for run_core function."""

Expand Down
Loading