Skip to content

Commit

Permalink
feature: Implement backup verification command
Browse files Browse the repository at this point in the history
Adds a CLI command to allow verifying checksum integrity of data files
in a V3 backup. The checksum verification is implemented as a simple
read loop over the contents of each data file in the given metadata.
  • Loading branch information
aiven-anton committed May 26, 2023
1 parent 071226f commit 3003113
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 10 deletions.
42 changes: 40 additions & 2 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from karapace.schema_reader import new_schema_topic_from_config
from karapace.utils import assert_never
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, NoReturn, TypeVar

Expand Down Expand Up @@ -481,9 +482,9 @@ def inspect(backup_location: Path | StdOut) -> None:
metadata = backend.read_metadata(backup_location)

if metadata.checksum_algorithm is ChecksumAlgorithm.unknown:
print(
console = Console(stderr=True, style="red")
console.print(
"Warning! This file has an unknown checksum algorithm and cannot be restored with this version of Karapace.",
file=sys.stderr,
)

metadata_json = {
Expand All @@ -508,3 +509,40 @@ def inspect(backup_location: Path | StdOut) -> None:
}

print(json.dumps(metadata_json, indent=2))


def verify(backup_location: Path | StdOut) -> None:
if isinstance(backup_location, str):
raise NotImplementedError("Cannot verify backup via stdin")
if not backup_location.exists():
raise BackupError("Backup location doesn't exist")
backup_version = BackupVersion.identify(backup_location)

if backup_version is not BackupVersion.V3:
print(
f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}.",
file=sys.stderr,
)
raise SystemExit(1)

backend = backup_version.reader()
assert isinstance(backend, SchemaBackupV3Reader)

console = Console()
success = True
verified_files = 0

for matches, data_file in backend.verify_integrity(backup_location):
console.print(f"Integrity of [blue]{data_file.filename}[/blue] is ", end="")
if matches:
console.print("[green]intact.[/green]")
verified_files += 1
else:
success = False
console.print("[red]not intact![/red]")

if not success:
console.print("💥 [red]Found checksum mismatches.[/red]")
raise SystemExit(1)

console.print(f"✅ [green]Verified {verified_files} data files in backup OK.[/green]")
19 changes: 18 additions & 1 deletion karapace/backup/backends/v3/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from karapace.utils import assert_never
from karapace.version import __version__
from pathlib import Path
from typing import Callable, ContextManager, Final, IO, Iterator, Sequence, TypeVar
from typing import Callable, ContextManager, Final, Generator, IO, Iterator, Sequence, TypeVar

import datetime
import io
import uuid
import xxhash

Expand Down Expand Up @@ -88,6 +89,22 @@ def read(self, path: Path, topic_name: str) -> Iterator[Instruction]:
data_file=data_file,
)

# Not part of common interface, because it exposes DataFile which is
# specific to V3.
def verify_integrity(self, path: Path) -> Generator[tuple[bool, DataFile], None, None]:
metadata = self.read_metadata(path)

for data_file in metadata.data_files:
checksum = _get_checksum_implementation(metadata.checksum_algorithm)()
with (path.parent / data_file.filename).open("rb") as buffer:
while True:
chunk = buffer.read(io.DEFAULT_BUFFER_SIZE)
if not chunk:
break
checksum.update(chunk)
matches = checksum.digest() == data_file.checksum
yield matches, data_file


# Note: Because the underlying checksum API is mutable, it doesn't make sense to attempt
# to make this immutable.
Expand Down
5 changes: 4 additions & 1 deletion karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ def parse_args() -> argparse.Namespace:
parser_get = subparsers.add_parser("get", help="Store the schema backup into a file")
parser_restore = subparsers.add_parser("restore", help="Restore the schema backup from a file")
parser_inspect = subparsers.add_parser("inspect", help="Parse and dump metadata from a backup file.")
parser_verify = subparsers.add_parser("verify", help="Parse metadata, and verify all checksums of a backup.")
parser_export_anonymized_avro_schemas = subparsers.add_parser(
"export-anonymized-avro-schemas", help="Export anonymized Avro schemas into a file"
)

for p in (parser_get, parser_restore, parser_inspect, parser_export_anonymized_avro_schemas):
for p in (parser_get, parser_restore, parser_inspect, parser_verify, parser_export_anonymized_avro_schemas):
p.add_argument("--location", default="", help="File path for the backup file")

for p in (parser_get, parser_restore, parser_export_anonymized_avro_schemas):
Expand Down Expand Up @@ -62,6 +63,8 @@ def dispatch(args: argparse.Namespace) -> None:
)
elif args.command == "inspect":
api.inspect(location)
elif args.command == "verify":
api.verify(location)
elif args.command == "restore":
config = get_config(args)
api.restore_backup(
Expand Down
14 changes: 12 additions & 2 deletions requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ charset-normalizer==3.1.0
# requests
click==8.1.3
# via flask
commonmark==0.9.1
# via
# -r requirements.txt
# rich
configargparse==1.5.3
# via locust
exceptiongroup==1.1.1
Expand Down Expand Up @@ -81,7 +85,7 @@ geventhttpclient==2.0.9
# via locust
greenlet==2.0.2
# via gevent
hypothesis==6.75.3
hypothesis==6.75.4
# via -r requirements-dev.in
idna==3.4
# via
Expand Down Expand Up @@ -137,7 +141,10 @@ psutil==5.9.5
# locust
# pytest-xdist
pygments==2.15.1
# via pdbpp
# via
# -r requirements.txt
# pdbpp
# rich
pyrepl==0.9.0
# via fancycompleter
pyrsistent==0.19.3
Expand All @@ -161,6 +168,8 @@ requests==2.31.0
# via
# -r requirements-dev.in
# locust
rich==12.5.1
# via -r requirements.txt
roundrobin==0.0.4
# via locust
sentry-sdk==1.24.0
Expand All @@ -187,6 +196,7 @@ typing-extensions==4.6.2
# via
# -r requirements.txt
# locust
# rich
ujson==5.7.0
# via -r requirements.txt
urllib3==1.26.16
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ typing-extensions
ujson<6
watchfiles<1
xxhash~=3.0
rich~=12.5.0

# Patched dependencies
#
Expand Down
10 changes: 9 additions & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c
# via -r requirements.in
charset-normalizer==3.1.0
# via aiohttp
commonmark==0.9.1
# via rich
frozenlist==1.3.3
# via
# aiohttp
Expand All @@ -52,10 +54,14 @@ packaging==23.1
# via aiokafka
protobuf==3.20.3
# via -r requirements.in
pygments==2.15.1
# via rich
pyrsistent==0.19.3
# via jsonschema
python-dateutil==2.8.2
# via -r requirements.in
rich==12.5.1
# via -r requirements.in
six==1.16.0
# via
# isodate
Expand All @@ -66,7 +72,9 @@ sniffio==1.3.0
tenacity==8.2.2
# via -r requirements.in
typing-extensions==4.6.2
# via -r requirements.in
# via
# -r requirements.in
# rich
ujson==5.7.0
# via -r requirements.in
watchfiles==0.19.0
Expand Down
139 changes: 136 additions & 3 deletions tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,8 @@ def test_can_inspect_v3_with_future_checksum_algorithm(self) -> None:
)

assert cp.returncode == 0
assert (
cp.stderr.decode()
== "Warning! This file has an unknown checksum algorithm and cannot be restored with this version of Karapace.\n"
assert cp.stderr.decode() == (
"Warning! This file has an unknown checksum algorithm and cannot be restored with this version of \nKarapace.\n"
)
assert json.loads(cp.stdout) == {
"version": 3,
Expand Down Expand Up @@ -426,3 +425,137 @@ def test_can_inspect_v1(self) -> None:
assert cp.returncode == 0
assert cp.stderr == b""
assert json.loads(cp.stdout) == {"version": 1}


class TestVerify:
def test_can_verify_backup_integrity(self) -> None:
metadata_path = (
Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "2db42756.metadata"
)

cp = subprocess.run(
[
"karapace_schema_backup",
"verify",
"--location",
str(metadata_path),
],
capture_output=True,
check=False,
env=no_color_env(),
)

assert cp.returncode == 0
assert cp.stderr == b""
assert b"Integrity of 2db42756:0.data is intact." in cp.stdout
assert b"Verified 1 data files in backup OK." in cp.stdout

def test_can_verify_backup_integrity_from_large_topic(
self,
tmp_path: Path,
new_topic: NewTopic,
producer: KafkaProducer,
config_file: Path,
admin_client: KafkaAdminClient,
karapace_config: Config,
) -> None:
# Populate the test topic.
for _ in range(100):
producer.send(
new_topic.name,
key=1000 * b"a",
value=1000 * b"b",
partition=0,
).add_errback(_raise)
producer.flush()

# Execute backup creation.
subprocess.run(
[
"karapace_schema_backup",
"get",
"--use-format-v3",
"--config",
str(config_file),
"--topic",
new_topic.name,
"--location",
str(tmp_path),
],
capture_output=True,
check=True,
)
metadata_path = tmp_path / f"topic-{new_topic.name}" / f"{new_topic.name}.metadata"

cp = subprocess.run(
[
"karapace_schema_backup",
"verify",
"--location",
str(metadata_path),
],
capture_output=True,
check=False,
env=no_color_env(),
)

assert cp.returncode == 0
assert cp.stderr == b""
assert f"Integrity of {new_topic.name}:0.data is intact.".encode() in cp.stdout
assert b"Verified 1 data files in backup OK." in cp.stdout

def test_can_refute_backup_integrity(self) -> None:
metadata_path = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_corrupt" / "2db42756.metadata"

cp = subprocess.run(
[
"karapace_schema_backup",
"verify",
"--location",
str(metadata_path),
],
capture_output=True,
check=False,
env=no_color_env(),
)

assert cp.returncode == 1
assert cp.stderr == b""
assert b"Integrity of 2db42756:0.data is not intact!" in cp.stdout
assert b"Found checksum mismatches." in cp.stdout

@pytest.mark.parametrize(
("test_file", "error_message"),
(
(
"test_restore_v1.log",
"Only backups using format V3 can be verified, found V1.\n",
),
(
"test_restore_v2.log",
"Only backups using format V3 can be verified, found V2.\n",
),
),
)
def test_gives_non_successful_exit_code_for_legacy_backup_format(
self,
test_file: str,
error_message: str,
) -> None:
backup_path = Path(__file__).parent.parent.resolve() / "test_data" / test_file

cp = subprocess.run(
[
"karapace_schema_backup",
"verify",
"--location",
str(backup_path),
],
capture_output=True,
check=False,
env=no_color_env(),
)

assert cp.returncode == 1
assert cp.stderr.decode() == error_message
assert cp.stdout == b""
Binary file not shown.
Binary file not shown.

0 comments on commit 3003113

Please sign in to comment.