From 3003113f7cf1873602f08d2f3b4fed6b4e11683d Mon Sep 17 00:00:00 2001 From: Anton Agestam Date: Thu, 11 May 2023 18:43:28 +0200 Subject: [PATCH] feature: Implement backup verification command Adds a CLI command to allow verifying checksum integrity of data files in a V3 backup. The checksum verification is implemented as a simple read loop over the contents of each data file in the given metadata. --- karapace/backup/api.py | 42 +++++- karapace/backup/backends/v3/backend.py | 19 ++- karapace/backup/cli.py | 5 +- requirements/requirements-dev.txt | 14 +- requirements/requirements.in | 1 + requirements/requirements.txt | 10 +- tests/integration/backup/test_v3_backup.py | 139 +++++++++++++++++- .../backup_v3_corrupt/2db42756.metadata | Bin 0 -> 89 bytes .../backup_v3_corrupt/2db42756:0.data | Bin 0 -> 112 bytes 9 files changed, 220 insertions(+), 10 deletions(-) create mode 100644 tests/integration/test_data/backup_v3_corrupt/2db42756.metadata create mode 100644 tests/integration/test_data/backup_v3_corrupt/2db42756:0.data diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 626537f36..e27cc2eee 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -30,6 +30,7 @@ from karapace.schema_reader import new_schema_topic_from_config from karapace.utils import assert_never from pathlib import Path +from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, NoReturn, TypeVar @@ -481,9 +482,9 @@ def inspect(backup_location: Path | StdOut) -> None: metadata = backend.read_metadata(backup_location) if metadata.checksum_algorithm is ChecksumAlgorithm.unknown: - print( + console = Console(stderr=True, style="red") + console.print( "Warning! This file has an unknown checksum algorithm and cannot be restored with this version of Karapace.", - file=sys.stderr, ) metadata_json = { @@ -508,3 +509,40 @@ def inspect(backup_location: Path | StdOut) -> None: } print(json.dumps(metadata_json, indent=2)) + + +def verify(backup_location: Path | StdOut) -> None: + if isinstance(backup_location, str): + raise NotImplementedError("Cannot verify backup via stdin") + if not backup_location.exists(): + raise BackupError("Backup location doesn't exist") + backup_version = BackupVersion.identify(backup_location) + + if backup_version is not BackupVersion.V3: + print( + f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}.", + file=sys.stderr, + ) + raise SystemExit(1) + + backend = backup_version.reader() + assert isinstance(backend, SchemaBackupV3Reader) + + console = Console() + success = True + verified_files = 0 + + for matches, data_file in backend.verify_integrity(backup_location): + console.print(f"Integrity of [blue]{data_file.filename}[/blue] is ", end="") + if matches: + console.print("[green]intact.[/green]") + verified_files += 1 + else: + success = False + console.print("[red]not intact![/red]") + + if not success: + console.print("💥 [red]Found checksum mismatches.[/red]") + raise SystemExit(1) + + console.print(f"✅ [green]Verified {verified_files} data files in backup OK.[/green]") diff --git a/karapace/backup/backends/v3/backend.py b/karapace/backup/backends/v3/backend.py index 1540fcca0..c9581a33c 100644 --- a/karapace/backup/backends/v3/backend.py +++ b/karapace/backup/backends/v3/backend.py @@ -17,9 +17,10 @@ from karapace.utils import assert_never from karapace.version import __version__ from pathlib import Path -from typing import Callable, ContextManager, Final, IO, Iterator, Sequence, TypeVar +from typing import Callable, ContextManager, Final, Generator, IO, Iterator, Sequence, TypeVar import datetime +import io import uuid import xxhash @@ -88,6 +89,22 @@ def read(self, path: Path, topic_name: str) -> Iterator[Instruction]: data_file=data_file, ) + # Not part of common interface, because it exposes DataFile which is + # specific to V3. + def verify_integrity(self, path: Path) -> Generator[tuple[bool, DataFile], None, None]: + metadata = self.read_metadata(path) + + for data_file in metadata.data_files: + checksum = _get_checksum_implementation(metadata.checksum_algorithm)() + with (path.parent / data_file.filename).open("rb") as buffer: + while True: + chunk = buffer.read(io.DEFAULT_BUFFER_SIZE) + if not chunk: + break + checksum.update(chunk) + matches = checksum.digest() == data_file.checksum + yield matches, data_file + # Note: Because the underlying checksum API is mutable, it doesn't make sense to attempt # to make this immutable. diff --git a/karapace/backup/cli.py b/karapace/backup/cli.py index 14b1a5e1e..10b07329d 100644 --- a/karapace/backup/cli.py +++ b/karapace/backup/cli.py @@ -22,11 +22,12 @@ def parse_args() -> argparse.Namespace: parser_get = subparsers.add_parser("get", help="Store the schema backup into a file") parser_restore = subparsers.add_parser("restore", help="Restore the schema backup from a file") parser_inspect = subparsers.add_parser("inspect", help="Parse and dump metadata from a backup file.") + parser_verify = subparsers.add_parser("verify", help="Parse metadata, and verify all checksums of a backup.") parser_export_anonymized_avro_schemas = subparsers.add_parser( "export-anonymized-avro-schemas", help="Export anonymized Avro schemas into a file" ) - for p in (parser_get, parser_restore, parser_inspect, parser_export_anonymized_avro_schemas): + for p in (parser_get, parser_restore, parser_inspect, parser_verify, parser_export_anonymized_avro_schemas): p.add_argument("--location", default="", help="File path for the backup file") for p in (parser_get, parser_restore, parser_export_anonymized_avro_schemas): @@ -62,6 +63,8 @@ def dispatch(args: argparse.Namespace) -> None: ) elif args.command == "inspect": api.inspect(location) + elif args.command == "verify": + api.verify(location) elif args.command == "restore": config = get_config(args) api.restore_backup( diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 69cbbeaad..e17c85f80 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -47,6 +47,10 @@ charset-normalizer==3.1.0 # requests click==8.1.3 # via flask +commonmark==0.9.1 + # via + # -r requirements.txt + # rich configargparse==1.5.3 # via locust exceptiongroup==1.1.1 @@ -81,7 +85,7 @@ geventhttpclient==2.0.9 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.75.3 +hypothesis==6.75.4 # via -r requirements-dev.in idna==3.4 # via @@ -137,7 +141,10 @@ psutil==5.9.5 # locust # pytest-xdist pygments==2.15.1 - # via pdbpp + # via + # -r requirements.txt + # pdbpp + # rich pyrepl==0.9.0 # via fancycompleter pyrsistent==0.19.3 @@ -161,6 +168,8 @@ requests==2.31.0 # via # -r requirements-dev.in # locust +rich==12.5.1 + # via -r requirements.txt roundrobin==0.0.4 # via locust sentry-sdk==1.24.0 @@ -187,6 +196,7 @@ typing-extensions==4.6.2 # via # -r requirements.txt # locust + # rich ujson==5.7.0 # via -r requirements.txt urllib3==1.26.16 diff --git a/requirements/requirements.in b/requirements/requirements.in index e9a17227e..59ae8b39a 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -12,6 +12,7 @@ typing-extensions ujson<6 watchfiles<1 xxhash~=3.0 +rich~=12.5.0 # Patched dependencies # diff --git a/requirements/requirements.txt b/requirements/requirements.txt index a0f31b6e8..1f83054b7 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -26,6 +26,8 @@ avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c # via -r requirements.in charset-normalizer==3.1.0 # via aiohttp +commonmark==0.9.1 + # via rich frozenlist==1.3.3 # via # aiohttp @@ -52,10 +54,14 @@ packaging==23.1 # via aiokafka protobuf==3.20.3 # via -r requirements.in +pygments==2.15.1 + # via rich pyrsistent==0.19.3 # via jsonschema python-dateutil==2.8.2 # via -r requirements.in +rich==12.5.1 + # via -r requirements.in six==1.16.0 # via # isodate @@ -66,7 +72,9 @@ sniffio==1.3.0 tenacity==8.2.2 # via -r requirements.in typing-extensions==4.6.2 - # via -r requirements.in + # via + # -r requirements.in + # rich ujson==5.7.0 # via -r requirements.in watchfiles==0.19.0 diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 58e652609..a83b5c107 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -365,9 +365,8 @@ def test_can_inspect_v3_with_future_checksum_algorithm(self) -> None: ) assert cp.returncode == 0 - assert ( - cp.stderr.decode() - == "Warning! This file has an unknown checksum algorithm and cannot be restored with this version of Karapace.\n" + assert cp.stderr.decode() == ( + "Warning! This file has an unknown checksum algorithm and cannot be restored with this version of \nKarapace.\n" ) assert json.loads(cp.stdout) == { "version": 3, @@ -426,3 +425,137 @@ def test_can_inspect_v1(self) -> None: assert cp.returncode == 0 assert cp.stderr == b"" assert json.loads(cp.stdout) == {"version": 1} + + +class TestVerify: + def test_can_verify_backup_integrity(self) -> None: + metadata_path = ( + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "2db42756.metadata" + ) + + cp = subprocess.run( + [ + "karapace_schema_backup", + "verify", + "--location", + str(metadata_path), + ], + capture_output=True, + check=False, + env=no_color_env(), + ) + + assert cp.returncode == 0 + assert cp.stderr == b"" + assert b"Integrity of 2db42756:0.data is intact." in cp.stdout + assert b"Verified 1 data files in backup OK." in cp.stdout + + def test_can_verify_backup_integrity_from_large_topic( + self, + tmp_path: Path, + new_topic: NewTopic, + producer: KafkaProducer, + config_file: Path, + admin_client: KafkaAdminClient, + karapace_config: Config, + ) -> None: + # Populate the test topic. + for _ in range(100): + producer.send( + new_topic.name, + key=1000 * b"a", + value=1000 * b"b", + partition=0, + ).add_errback(_raise) + producer.flush() + + # Execute backup creation. + subprocess.run( + [ + "karapace_schema_backup", + "get", + "--use-format-v3", + "--config", + str(config_file), + "--topic", + new_topic.name, + "--location", + str(tmp_path), + ], + capture_output=True, + check=True, + ) + metadata_path = tmp_path / f"topic-{new_topic.name}" / f"{new_topic.name}.metadata" + + cp = subprocess.run( + [ + "karapace_schema_backup", + "verify", + "--location", + str(metadata_path), + ], + capture_output=True, + check=False, + env=no_color_env(), + ) + + assert cp.returncode == 0 + assert cp.stderr == b"" + assert f"Integrity of {new_topic.name}:0.data is intact.".encode() in cp.stdout + assert b"Verified 1 data files in backup OK." in cp.stdout + + def test_can_refute_backup_integrity(self) -> None: + metadata_path = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_corrupt" / "2db42756.metadata" + + cp = subprocess.run( + [ + "karapace_schema_backup", + "verify", + "--location", + str(metadata_path), + ], + capture_output=True, + check=False, + env=no_color_env(), + ) + + assert cp.returncode == 1 + assert cp.stderr == b"" + assert b"Integrity of 2db42756:0.data is not intact!" in cp.stdout + assert b"Found checksum mismatches." in cp.stdout + + @pytest.mark.parametrize( + ("test_file", "error_message"), + ( + ( + "test_restore_v1.log", + "Only backups using format V3 can be verified, found V1.\n", + ), + ( + "test_restore_v2.log", + "Only backups using format V3 can be verified, found V2.\n", + ), + ), + ) + def test_gives_non_successful_exit_code_for_legacy_backup_format( + self, + test_file: str, + error_message: str, + ) -> None: + backup_path = Path(__file__).parent.parent.resolve() / "test_data" / test_file + + cp = subprocess.run( + [ + "karapace_schema_backup", + "verify", + "--location", + str(backup_path), + ], + capture_output=True, + check=False, + env=no_color_env(), + ) + + assert cp.returncode == 1 + assert cp.stderr.decode() == error_message + assert cp.stdout == b"" diff --git a/tests/integration/test_data/backup_v3_corrupt/2db42756.metadata b/tests/integration/test_data/backup_v3_corrupt/2db42756.metadata new file mode 100644 index 0000000000000000000000000000000000000000..cc9f969377cd6fca8f9d4ca89e00ef4c66004542 GIT binary patch literal 89 zcmdN7Gv;DoU