From 40e0d67cb32ea6ca5df257303843ac60da4f3f79 Mon Sep 17 00:00:00 2001 From: Anton Agestam Date: Fri, 12 May 2023 13:42:28 +0200 Subject: [PATCH] feature: Make schema backup handle empty topics - Adds start_offset and end_offset to data files in metadata. - Adds verification of start and end offset in backup reader. - Adds record_count to Metadata. - Start sourcing partition_count directly from what's read from Kafka, instead of via len(data_files). - Test can restore empty topics. --- karapace/backup/api.py | 20 ++- .../backup/backends/v3/avro/DataFile.avsc | 8 + .../backup/backends/v3/avro/Metadata.avsc | 12 ++ karapace/backup/backends/v3/backend.py | 65 +++++++- karapace/backup/backends/v3/errors.py | 4 + karapace/backup/backends/v3/schema.py | 8 +- karapace/backup/backends/writer.py | 1 + tests/integration/backup/test_v3_backup.py | 50 +++--- .../2db42756.metadata | Bin 89 -> 0 bytes .../a-topic.metadata | Bin 0 -> 95 bytes .../a-topic:123.data | Bin 0 -> 88 bytes .../a-topic.metadata | Bin 90 -> 0 bytes .../a-topic:123.data | Bin 43 -> 0 bytes .../a5f7a413.metadata | Bin 0 -> 92 bytes .../a5f7a413:0.data} | Bin .../2db42756.metadata | Bin 89 -> 0 bytes .../6595c9c2.metadata | Bin 0 -> 92 bytes .../6595c9c2:0.data} | Bin 112 -> 112 bytes tests/unit/backup/backends/test_v2.py | 2 + tests/unit/backup/backends/v3/test_backend.py | 148 ++++++++++++++++-- .../a-topic.metadata | Bin 90 -> 93 bytes .../a-topic:123.data | Bin 43 -> 65 bytes .../backup/backends/v3/test_serialisation.py | 3 + 23 files changed, 276 insertions(+), 45 deletions(-) delete mode 100644 tests/integration/test_data/backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/2db42756.metadata create mode 100644 tests/integration/test_data/backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/a-topic.metadata create mode 100644 tests/integration/test_data/backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/a-topic:123.data delete mode 100644 tests/integration/test_data/backup_v3_future_algorithm/a-topic.metadata delete mode 100644 tests/integration/test_data/backup_v3_future_algorithm/a-topic:123.data create mode 100644 tests/integration/test_data/backup_v3_future_algorithm/a5f7a413.metadata rename tests/integration/test_data/{backup_v3_single_partition/2db42756:0.data => backup_v3_future_algorithm/a5f7a413:0.data} (100%) delete mode 100644 tests/integration/test_data/backup_v3_single_partition/2db42756.metadata create mode 100644 tests/integration/test_data/backup_v3_single_partition/6595c9c2.metadata rename tests/integration/test_data/{backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/2db42756:0.data => backup_v3_single_partition/6595c9c2:0.data} (57%) diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 8db05ba95..49744fdab 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -32,7 +32,8 @@ from pathlib import Path from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed -from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, NoReturn, TypeVar +from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, \ + NoReturn, TypeVar, NamedTuple import contextlib import datetime @@ -444,11 +445,17 @@ def create_backup( allow_overwrite=overwrite, ) except EmptyPartition: + if version is not BackupVersion.V3: + LOG.warning( + "Topic partition '%s' is empty, nothing to back up.", + topic_partition, + ) + return LOG.warning( - "Topic partition '%s' is empty, nothing to back up.", + "Topic partition '%s' is empty, only backing up metadata.", topic_partition, ) - return + data_file = None end_time = datetime.datetime.now(datetime.timezone.utc) backend.store_metadata( @@ -457,7 +464,8 @@ def create_backup( topic_id=None, started_at=start_time, finished_at=end_time, - data_files=[data_file], + partition_count=1, + data_files=[data_file] if data_file else [], ) LOG.info( @@ -498,7 +506,7 @@ def inspect(backup_location: Path | StdOut) -> None: "finished_at": metadata.finished_at.isoformat(), "topic_name": metadata.topic_name, "topic_id": None if metadata.topic_id is None else str(metadata.topic_id), - "partition_count": 1, + "partition_count": metadata.partition_count, "checksum_algorithm": metadata.checksum_algorithm.value, "data_files": tuple( { @@ -506,6 +514,8 @@ def inspect(backup_location: Path | StdOut) -> None: "partition": data_file.partition, "checksum_hex": data_file.checksum.hex(), "record_count": data_file.record_count, + "start_offset": data_file.start_offset, + "end_offset": data_file.end_offset, } for data_file in metadata.data_files ), diff --git a/karapace/backup/backends/v3/avro/DataFile.avsc b/karapace/backup/backends/v3/avro/DataFile.avsc index 8b61600c8..76ef38783 100644 --- a/karapace/backup/backends/v3/avro/DataFile.avsc +++ b/karapace/backup/backends/v3/avro/DataFile.avsc @@ -15,6 +15,14 @@ { "name": "record_count", "type": "long" + }, + { + "name": "start_offset", + "type": "int" + }, + { + "name": "end_offset", + "type": "int" } ], "name": "DataFile", diff --git a/karapace/backup/backends/v3/avro/Metadata.avsc b/karapace/backup/backends/v3/avro/Metadata.avsc index c456e27c9..02e4c4bd3 100644 --- a/karapace/backup/backends/v3/avro/Metadata.avsc +++ b/karapace/backup/backends/v3/avro/Metadata.avsc @@ -26,6 +26,10 @@ "type": "long" } }, + { + "name": "record_count", + "type": "int" + }, { "name": "topic_name", "type": "string" @@ -64,6 +68,14 @@ { "name": "record_count", "type": "long" + }, + { + "name": "start_offset", + "type": "int" + }, + { + "name": "end_offset", + "type": "int" } ], "name": "DataFile", diff --git a/karapace/backup/backends/v3/backend.py b/karapace/backup/backends/v3/backend.py index ac49f6f74..175ab5ef9 100644 --- a/karapace/backup/backends/v3/backend.py +++ b/karapace/backup/backends/v3/backend.py @@ -4,8 +4,11 @@ """ from __future__ import annotations +import itertools + from .checksum import RunningChecksum from .errors import DecodeError, InvalidChecksum, UnknownChecksumAlgorithm +from .errors import InvalidChecksum, UnknownChecksumAlgorithm, OffsetMismatch from .readers import read_metadata, read_records from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record from .writers import write_metadata, write_record @@ -20,6 +23,7 @@ from pathlib import Path from typing import Callable, ContextManager, Final, Generator, IO, Iterator, Sequence, TypeVar from typing_extensions import TypeAlias +from typing import Callable, Final, Generator, IO, Iterator, Sequence, Iterable, TypeVar import datetime import io @@ -51,6 +55,17 @@ def _get_checksum_implementation(algorithm: ChecksumAlgorithm) -> Callable[[], R assert_never(algorithm) +T = TypeVar("T") + + +def _peek(iterator: Iterator[T]) -> tuple[T | None, Iterator[T]]: + try: + first = next(iterator) + except StopIteration: + return None, iterator + return first, itertools.chain((first,), iterator) + + class SchemaBackupV3Reader(BaseBackupReader): def _read_data_file( self, @@ -61,11 +76,23 @@ def _read_data_file( running_checksum = _get_checksum_implementation(metadata.checksum_algorithm)() with path.open("rb") as data_buffer: - for record in read_records( + records = read_records( buffer=data_buffer, num_records=data_file.record_count, running_checksum=running_checksum, - ): + ) + + record, records = _peek(records) + + # Verify first record matches start offset from data file. + if record is not None and record.offset != data_file.start_offset: + raise OffsetMismatch( + f"First record in data file does not match expected " + f"start_offset (expected: {data_file.start_offset}, " + f"actual: {record.offset})." + ) + + for record in records: yield ProducerSend( value=record.value, key=record.key, @@ -75,6 +102,15 @@ def _read_data_file( partition_index=data_file.partition, ) + # Verify last record matches end offset from data file. + if record is not None and record.offset != data_file.end_offset: + raise OffsetMismatch( + f"Last record in data file does not match expected " + f"end_offset (expected: {data_file.end_offset}, " + f"actual: {record.offset})." + ) + + # Verify checksum matches. if running_checksum.digest() != data_file.checksum: raise InvalidChecksum("Found checksum mismatch after reading full data file.") @@ -153,6 +189,8 @@ class _PartitionStats: records_written: int = 0 bytes_written_since_checkpoint: int = 0 records_written_since_checkpoint: int = 0 + min_offset: int | None = None + max_offset: int | None = None def get_checkpoint( self, @@ -168,10 +206,16 @@ def get_checkpoint( return self.running_checksum.digest() return None - def increase(self, bytes_offset: int) -> None: + def update( + self, + bytes_offset: int, + record_offset: int, + ) -> None: self.records_written_since_checkpoint += 1 self.records_written += 1 self.bytes_written_since_checkpoint += bytes_offset + self.min_offset = record_offset if self.min_offset is None else min(record_offset, self.min_offset) + self.max_offset = record_offset if self.max_offset is None else max(record_offset, self.max_offset) class SchemaBackupV3Writer(BytesBackupWriter[DataFile]): @@ -230,6 +274,8 @@ def finalize_partition(self, index: int, filename: str) -> DataFile: partition=index, checksum=stats.running_checksum.digest(), record_count=stats.records_written, + start_offset=stats.min_offset, + end_offset=stats.max_offset, ) def store_metadata( @@ -239,15 +285,16 @@ def store_metadata( topic_id: uuid.UUID | None, started_at: datetime.datetime, finished_at: datetime.datetime, + partition_count: int, data_files: Sequence[DataFile], ) -> None: assert isinstance(path, Path) metadata_path = path / f"{topic_name}.metadata" - if len(data_files) != 1: + if len(data_files) > 1: raise RuntimeError("Cannot backup multi-partition topics") - if len(self._partition_stats): + if self._partition_stats and data_files: raise RuntimeError("Cannot write metadata when not all partitions are finalized") with bytes_writer(metadata_path, False) as buffer: @@ -259,9 +306,10 @@ def store_metadata( tool_version=__version__, started_at=started_at, finished_at=finished_at, + record_count=sum(data_file.record_count for data_file in data_files), topic_name=topic_name, topic_id=topic_id, - partition_count=len(data_files), + partition_count=partition_count, checksum_algorithm=ChecksumAlgorithm.xxhash3_64_be, data_files=tuple(data_files), ), @@ -290,4 +338,7 @@ def store_record( ), running_checksum=stats.running_checksum, ) - stats.increase(buffer.tell() - offset_start) + stats.update( + bytes_offset=buffer.tell() - offset_start, + record_offset=record.offset, + ) diff --git a/karapace/backup/backends/v3/errors.py b/karapace/backup/backends/v3/errors.py index 909e87977..664fde6c5 100644 --- a/karapace/backup/backends/v3/errors.py +++ b/karapace/backup/backends/v3/errors.py @@ -38,6 +38,10 @@ class UnexpectedEndOfData(DecodeError, ValueError): pass +class OffsetMismatch(DecodeError, ValueError): + pass + + class EncodeError(BackupError): pass diff --git a/karapace/backup/backends/v3/schema.py b/karapace/backup/backends/v3/schema.py index 45671eeb7..f213dcab0 100644 --- a/karapace/backup/backends/v3/schema.py +++ b/karapace/backup/backends/v3/schema.py @@ -7,7 +7,7 @@ from dataclasses import field from karapace.avro_dataclasses.models import AvroModel from karapace.dataclasses import default_dataclass -from typing import Optional, Tuple +from typing import Optional, Tuple, Union import datetime import enum @@ -31,12 +31,16 @@ class DataFile(AvroModel): partition: int = field(metadata={"type": "long"}) checksum: bytes record_count: int = field(metadata={"type": "long"}) + start_offset: int + end_offset: int def __post_init__(self) -> None: assert self.record_count >= 0 assert self.partition >= 0 assert self.checksum assert self.filename + assert self.start_offset <= self.end_offset + assert self.end_offset - self.start_offset + 1 >= self.record_count @default_dataclass @@ -46,6 +50,7 @@ class Metadata(AvroModel): tool_version: str started_at: datetime.datetime finished_at: datetime.datetime + record_count: int = field(metadata={"type": "int"}) topic_name: str topic_id: Optional[uuid.UUID] partition_count: int = field(metadata={"type": "int"}) @@ -58,6 +63,7 @@ def __post_init__(self) -> None: assert self.finished_at >= self.started_at assert self.partition_count == 1 assert self.version == 3 + assert self.record_count == sum(data_file.record_count for data_file in self.data_files) @default_dataclass diff --git a/karapace/backup/backends/writer.py b/karapace/backup/backends/writer.py index 41a37f916..6b083b52b 100644 --- a/karapace/backup/backends/writer.py +++ b/karapace/backup/backends/writer.py @@ -81,6 +81,7 @@ def store_metadata( topic_id: uuid.UUID | None, started_at: datetime.datetime, finished_at: datetime.datetime, + partition_count: int, data_files: Sequence[F], ) -> None: """ diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index e6a916c05..4e06868a3 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -216,7 +216,7 @@ def test_roundtrip_from_file( config_file: Path, admin_client: KafkaAdminClient, ) -> None: - topic_name = "2db42756" + topic_name = "6595c9c2" backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" metadata_path = backup_directory / f"{topic_name}.metadata" with metadata_path.open("rb") as buffer: @@ -311,7 +311,7 @@ def no_color_env() -> dict[str, str]: class TestInspect: def test_can_inspect_v3(self) -> None: metadata_path = ( - Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "2db42756.metadata" + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "6595c9c2.metadata" ) cp = subprocess.run( @@ -331,26 +331,28 @@ def test_can_inspect_v3(self) -> None: assert json.loads(cp.stdout) == { "version": 3, "tool_name": "karapace", - "tool_version": "3.4.6-65-g9259060", - "started_at": "2023-05-08T09:31:56.238000+00:00", - "finished_at": "2023-05-08T09:31:56.571000+00:00", - "topic_name": "2db42756", + "tool_version": "3.4.6-67-g26d38c0", + "started_at": "2023-05-12T14:24:45.932000+00:00", + "finished_at": "2023-05-12T14:24:46.286000+00:00", + "topic_name": "6595c9c2", "topic_id": None, "partition_count": 1, "checksum_algorithm": "xxhash3_64_be", "data_files": [ { - "filename": "2db42756:0.data", + "filename": "6595c9c2:0.data", "partition": 0, "checksum_hex": "f414f504a8e49313", "record_count": 2, + "start_offset": 0, + "end_offset": 1, }, ], } def test_can_inspect_v3_with_future_checksum_algorithm(self) -> None: metadata_path = ( - Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_future_algorithm" / "a-topic.metadata" + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_future_algorithm" / "a5f7a413.metadata" ) cp = subprocess.run( @@ -373,18 +375,20 @@ def test_can_inspect_v3_with_future_checksum_algorithm(self) -> None: "version": 3, "tool_name": "karapace", "tool_version": "3.4.6-67-g26d38c0", - "started_at": "2023-05-23T13:19:34.843000+00:00", - "finished_at": "2023-05-23T13:19:34.843000+00:00", - "topic_name": "a-topic", + "started_at": "2023-05-30T14:44:24.841000+00:00", + "finished_at": "2023-05-30T14:44:25.168000+00:00", + "topic_name": "a5f7a413", "topic_id": None, "partition_count": 1, "checksum_algorithm": "unknown", "data_files": [ { - "filename": "a-topic:123.data", - "partition": 123, - "checksum_hex": "dc0e738f1e856010", - "record_count": 1, + "filename": "a5f7a413:0.data", + "partition": 0, + "checksum_hex": "f414f504a8e49313", + "record_count": 2, + "start_offset": 0, + "end_offset": 1, }, ], } @@ -431,7 +435,7 @@ def test_can_inspect_v1(self) -> None: class TestVerify: def test_can_verify_file_integrity(self) -> None: metadata_path = ( - Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "2db42756.metadata" + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "6595c9c2.metadata" ) cp = subprocess.run( @@ -450,14 +454,14 @@ def test_can_verify_file_integrity(self) -> None: assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( """\ - Integrity of 2db42756:0.data is intact. + Integrity of 6595c9c2:0.data is intact. ✅ Verified 1 data files in backup OK. """ ) def test_can_verify_record_integrity(self) -> None: metadata_path = ( - Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "2db42756.metadata" + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "6595c9c2.metadata" ) cp = subprocess.run( @@ -476,7 +480,7 @@ def test_can_verify_record_integrity(self) -> None: assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( """\ - Integrity of 2db42756:0.data is intact. + Integrity of 6595c9c2:0.data is intact. ✅ Verified 1 data files in backup OK. """ ) @@ -592,7 +596,7 @@ def test_can_refute_file_integrity(self) -> None: Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_corrupt_last_record_bit_flipped_no_checkpoints" - / "2db42756.metadata" + / "a-topic.metadata" ) cp = subprocess.run( @@ -611,7 +615,7 @@ def test_can_refute_file_integrity(self) -> None: assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( """\ - Integrity of 2db42756:0.data is not intact! + Integrity of a-topic:123.data is not intact! 💥 Failed to verify integrity of some data files. """ ) @@ -621,7 +625,7 @@ def test_can_refute_record_integrity(self) -> None: Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_corrupt_last_record_bit_flipped_no_checkpoints" - / "2db42756.metadata" + / "a-topic.metadata" ) cp = subprocess.run( @@ -640,7 +644,7 @@ def test_can_refute_record_integrity(self) -> None: assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( """\ - Integrity of 2db42756:0.data is not intact! + Integrity of a-topic:123.data is not intact! InvalidChecksum: Found checksum mismatch after reading full data file. 💥 Failed to verify integrity of some data files. """ diff --git a/tests/integration/test_data/backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/2db42756.metadata b/tests/integration/test_data/backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/2db42756.metadata deleted file mode 100644 index cc9f969377cd6fca8f9d4ca89e00ef4c66004542..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 89 zcmdN7Gv;DoUdrWF`iqp;W!Ty0}*O}fYtIc F0RUg>8`b~- literal 0 HcmV?d00001 diff --git a/tests/integration/test_data/backup_v3_future_algorithm/a-topic.metadata b/tests/integration/test_data/backup_v3_future_algorithm/a-topic.metadata deleted file mode 100644 index 0e0674ef1a05e0dcc9b099099ef7d96a591353d5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 90 zcmdN7Gv;DoUZ`62CDD+~F(kmupQBU}9ha0OzwDiU0rr diff --git a/tests/integration/test_data/backup_v3_future_algorithm/a-topic:123.data b/tests/integration/test_data/backup_v3_future_algorithm/a-topic:123.data deleted file mode 100644 index e48b5c7f8547eba371daffbbdd7400786e9daf62..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 43 vcmZQzU{GgZOUut^U`t9YViG9M&rQ|MPOTIHGs+TkN>dpa&V7A)jE4yT?o$nS diff --git a/tests/integration/test_data/backup_v3_future_algorithm/a5f7a413.metadata b/tests/integration/test_data/backup_v3_future_algorithm/a5f7a413.metadata new file mode 100644 index 0000000000000000000000000000000000000000..847db9c510e63c4d4117625989e839de43bea500 GIT binary patch literal 92 zcmdN7Gv;DoUtQ6O$Z_ZDpXBl30?+An--xE6a)}lZ9Crm>5_9 None: started_at=datetime.datetime.now(datetime.timezone.utc), finished_at=datetime.datetime.now(datetime.timezone.utc), data_files=(data_file,), + partition_count=1, ) reader = get_reader() @@ -258,6 +259,7 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: started_at=datetime.datetime.now(datetime.timezone.utc), finished_at=datetime.datetime.now(datetime.timezone.utc), data_files=(data_file,), + partition_count=1, ) # Only the backup file, and no temporary files exist in backup directory. diff --git a/tests/unit/backup/backends/v3/test_backend.py b/tests/unit/backup/backends/v3/test_backend.py index f839f2693..8372dc2a2 100644 --- a/tests/unit/backup/backends/v3/test_backend.py +++ b/tests/unit/backup/backends/v3/test_backend.py @@ -7,6 +7,8 @@ from karapace.backup.backends.reader import ProducerSend, RestoreTopic from karapace.backup.backends.v3.backend import _PartitionStats, SchemaBackupV3Reader, SchemaBackupV3Writer from karapace.backup.backends.v3.errors import InvalidChecksum, TooFewRecords, TooManyRecords, UnknownChecksumAlgorithm +from karapace.backup.backends.v3.errors import InvalidChecksum, TooFewRecords, \ + TooManyRecords, OffsetMismatch from karapace.backup.backends.v3.readers import read_records from karapace.backup.backends.v3.schema import ChecksumAlgorithm, DataFile from pathlib import Path @@ -31,7 +33,7 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: value=b"bar", topic=topic_name, partition=partition_index, - offset=0, + offset=10, timestamp=round(time.time()), timestamp_type=None, headers=(), @@ -45,7 +47,7 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: value=b"bar", topic=topic_name, partition=partition_index, - offset=0, + offset=14, timestamp=round(time.time()), timestamp_type=None, headers=(("some-key", b"some-value"),), @@ -81,6 +83,7 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: started_at=started_at, finished_at=finished_at, data_files=(data_file,), + partition_count=1, ) assert sorted(path.name for path in backup_path.iterdir()) == ["a-topic.metadata", "a-topic:123.data"] @@ -89,6 +92,8 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: partition=123, checksum=mock.ANY, record_count=2, + start_offset=10, + end_offset=14, ) # Read backup into restore instructions. @@ -187,15 +192,15 @@ def test_reader_raises_invalid_checksum(tmp_path: Path) -> None: # another record. with backup_writer.safe_writer(file_path, False) as buffer: backup_writer.store_record(buffer, make_record(topic_name, partition_index, 0)) - backup_writer.store_record(buffer, make_record(topic_name, partition_index, 0)) + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 1)) with mock.patch.object( backup_writer._partition_stats[partition_index], # pylint: disable=protected-access "get_checkpoint", return_value=b"not what you expected!", autospec=True, ): - backup_writer.store_record(buffer, make_record(topic_name, partition_index, 0)) - backup_writer.store_record(buffer, make_record(topic_name, partition_index, 0)) + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 2)) + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 3)) # Finalize backup. data_file = backup_writer.finalize_partition(partition_index, file_path.name) @@ -206,6 +211,7 @@ def test_reader_raises_invalid_checksum(tmp_path: Path) -> None: started_at=started_at, finished_at=finished_at, data_files=(data_file,), + partition_count=1, ) # Read backup into restore instructions. @@ -246,8 +252,8 @@ def test_reader_raises_invalid_checksum_for_corruption_in_last_record(tmp_path: ) with backup_writer.safe_writer(file_path, False) as buffer: - backup_writer.store_record(buffer, make_record(topic_name, partition_index, 0)) - backup_writer.store_record(buffer, make_record(topic_name, partition_index, 0)) + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 123)) + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 124)) # Flip last bit in byte before last. pos = -2 @@ -263,6 +269,7 @@ def test_reader_raises_invalid_checksum_for_corruption_in_last_record(tmp_path: started_at=started_at, finished_at=finished_at, data_files=(data_file,), + partition_count=1, ) # Read backup into restore instructions. @@ -316,6 +323,7 @@ def test_reader_raises_too_many_records(tmp_path: Path) -> None: started_at=started_at, finished_at=finished_at, data_files=(data_file,), + partition_count=1, ) # Read backup into restore instructions. @@ -358,6 +366,7 @@ def test_reader_raises_too_few_records(tmp_path: Path) -> None: data_file = replace( data_file, record_count=data_file.record_count + 1, + end_offset=data_file.end_offset + 1, ) # Finalize backup @@ -368,6 +377,7 @@ def test_reader_raises_too_few_records(tmp_path: Path) -> None: started_at=started_at, finished_at=finished_at, data_files=(data_file,), + partition_count=1, ) # Read backup into restore instructions. @@ -386,6 +396,114 @@ def test_reader_raises_too_few_records(tmp_path: Path) -> None: ) +def test_reader_raises_offset_mismatch_for_first_record(tmp_path: Path) -> None: + backup_path = tmp_path + topic_name = "a-topic" + partition_index = 123 + started_at = datetime.datetime.now(datetime.timezone.utc) + finished_at = datetime.datetime.now(datetime.timezone.utc) + + backup_writer = SchemaBackupV3Writer() + file_path = backup_writer.start_partition( + path=backup_path, + topic_name=topic_name, + index=partition_index, + ) + + with backup_writer.safe_writer(file_path, False) as buffer: + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 123)) + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 124)) + + # Finalize partition. + data_file = backup_writer.finalize_partition(partition_index, file_path.name) + + # Insert bad offset. + data_file = replace( + data_file, + start_offset=122 + ) + + # Finalize backup. + backup_writer.store_metadata( + path=backup_path, + topic_name=topic_name, + topic_id=None, + started_at=started_at, + finished_at=finished_at, + data_files=(data_file,), + partition_count=1, + ) + + # Read backup into restore instructions. + backup_reader = SchemaBackupV3Reader() + metadata_path = backup_path / "a-topic.metadata" + + with pytest.raises( + OffsetMismatch, + match=r"^First record in data file does not match expected start_offset \(expected: 122, actual: 123\)\.$", + ): + tuple( + backup_reader.read( + path=metadata_path, + topic_name="a-topic", + ) + ) + + +def test_reader_raises_offset_mismatch_for_last_record(tmp_path: Path) -> None: + backup_path = tmp_path + topic_name = "a-topic" + partition_index = 123 + started_at = datetime.datetime.now(datetime.timezone.utc) + finished_at = datetime.datetime.now(datetime.timezone.utc) + + backup_writer = SchemaBackupV3Writer() + file_path = backup_writer.start_partition( + path=backup_path, + topic_name=topic_name, + index=partition_index, + ) + + with backup_writer.safe_writer(file_path, False) as buffer: + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 123)) + backup_writer.store_record(buffer, make_record(topic_name, partition_index, 124)) + + # Finalize partition. + data_file = backup_writer.finalize_partition(partition_index, file_path.name) + + # Insert bad offset. + data_file = replace( + data_file, + end_offset=125 + ) + + # Finalize backup. + backup_writer.store_metadata( + path=backup_path, + topic_name=topic_name, + topic_id=None, + started_at=started_at, + finished_at=finished_at, + data_files=(data_file,), + partition_count=1, + ) + + # Read backup into restore instructions. + backup_reader = SchemaBackupV3Reader() + metadata_path = backup_path / "a-topic.metadata" + + with pytest.raises( + OffsetMismatch, + match=r"^Last record in data file does not match expected end_offset \(expected: 125, actual: 124\)\.$", + ): + tuple( + backup_reader.read( + path=metadata_path, + topic_name="a-topic", + ) + ) + + def test_writer_respects_max_records_per_checkpoint(tmp_path: Path) -> None: backup_path = tmp_path topic_name = "a-topic" @@ -579,18 +697,30 @@ def test_can_increase_stats(self) -> None: records_written=0, bytes_written_since_checkpoint=0, records_written_since_checkpoint=0, + min_offset=None, + max_offset=None, + ) + stats.update( + bytes_offset=1, + record_offset=1, ) - stats.increase(1) assert stats == _PartitionStats( running_checksum=stats.running_checksum, records_written=1, bytes_written_since_checkpoint=1, records_written_since_checkpoint=1, + min_offset=1, + max_offset=1, + ) + stats.update( + bytes_offset=1023, + record_offset=23, ) - stats.increase(1023) assert stats == _PartitionStats( running_checksum=stats.running_checksum, records_written=2, bytes_written_since_checkpoint=1024, records_written_since_checkpoint=2, + min_offset=1, + max_offset=23, ) diff --git a/tests/unit/backup/backends/v3/test_data/backup_v3_future_algorithm/a-topic.metadata b/tests/unit/backup/backends/v3/test_data/backup_v3_future_algorithm/a-topic.metadata index 0e0674ef1a05e0dcc9b099099ef7d96a591353d5..e1298b50616ee271dceb252df210ae88740ea1d6 100644 GIT binary patch delta 45 wcma!w<=3-!A2%5+t!}IOgyq?Y^2szPEp8F18alc$^f&dc(3ji>E4rl-X diff --git a/tests/unit/backup/backends/v3/test_data/backup_v3_future_algorithm/a-topic:123.data b/tests/unit/backup/backends/v3/test_data/backup_v3_future_algorithm/a-topic:123.data index e48b5c7f8547eba371daffbbdd7400786e9daf62..71d6ac8db83fbb6080a390aeef50a95828bff06d 100644 GIT binary patch delta 35 mcmdOP6k}ju5Mp3U%g<+EOG+$a5V^B!>2V&W2`cO|5EcNB$qFR^ delta 13 UcmZ?No+zfm&T#JQ(_=hL03D75y8r+H diff --git a/tests/unit/backup/backends/v3/test_serialisation.py b/tests/unit/backup/backends/v3/test_serialisation.py index 7da08e054..4dfb1f4ee 100644 --- a/tests/unit/backup/backends/v3/test_serialisation.py +++ b/tests/unit/backup/backends/v3/test_serialisation.py @@ -177,6 +177,7 @@ def test_metadata_roundtrip(self, buffer: IO[bytes]) -> None: tool_version="1.2.3", started_at=datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0), finished_at=datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0), + record_count=123, topic_name="some-topic", topic_id=uuid.uuid4(), partition_count=1, @@ -186,6 +187,8 @@ def test_metadata_roundtrip(self, buffer: IO[bytes]) -> None: partition=0, checksum=b"abc123", record_count=123, + start_offset=127, + end_offset=250, ), ), )