Skip to content

Commit

Permalink
feature: Make schema backup handle empty topics
Browse files Browse the repository at this point in the history
- Adds start_offset and end_offset to data files in metadata.
- Adds verification of start and end offset in backup reader.
- Adds record_count to Metadata.
- Start sourcing partition_count directly from what's read from Kafka,
  instead of via len(data_files).
- Test can restore empty topics.
  • Loading branch information
aiven-anton committed May 30, 2023
1 parent 801e45f commit 40e0d67
Show file tree
Hide file tree
Showing 23 changed files with 276 additions and 45 deletions.
20 changes: 15 additions & 5 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, NoReturn, TypeVar
from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, \
NoReturn, TypeVar, NamedTuple

import contextlib
import datetime
Expand Down Expand Up @@ -444,11 +445,17 @@ def create_backup(
allow_overwrite=overwrite,
)
except EmptyPartition:
if version is not BackupVersion.V3:
LOG.warning(
"Topic partition '%s' is empty, nothing to back up.",
topic_partition,
)
return
LOG.warning(
"Topic partition '%s' is empty, nothing to back up.",
"Topic partition '%s' is empty, only backing up metadata.",
topic_partition,
)
return
data_file = None

end_time = datetime.datetime.now(datetime.timezone.utc)
backend.store_metadata(
Expand All @@ -457,7 +464,8 @@ def create_backup(
topic_id=None,
started_at=start_time,
finished_at=end_time,
data_files=[data_file],
partition_count=1,
data_files=[data_file] if data_file else [],
)

LOG.info(
Expand Down Expand Up @@ -498,14 +506,16 @@ def inspect(backup_location: Path | StdOut) -> None:
"finished_at": metadata.finished_at.isoformat(),
"topic_name": metadata.topic_name,
"topic_id": None if metadata.topic_id is None else str(metadata.topic_id),
"partition_count": 1,
"partition_count": metadata.partition_count,
"checksum_algorithm": metadata.checksum_algorithm.value,
"data_files": tuple(
{
"filename": data_file.filename,
"partition": data_file.partition,
"checksum_hex": data_file.checksum.hex(),
"record_count": data_file.record_count,
"start_offset": data_file.start_offset,
"end_offset": data_file.end_offset,
}
for data_file in metadata.data_files
),
Expand Down
8 changes: 8 additions & 0 deletions karapace/backup/backends/v3/avro/DataFile.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
{
"name": "record_count",
"type": "long"
},
{
"name": "start_offset",
"type": "int"
},
{
"name": "end_offset",
"type": "int"
}
],
"name": "DataFile",
Expand Down
12 changes: 12 additions & 0 deletions karapace/backup/backends/v3/avro/Metadata.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
"type": "long"
}
},
{
"name": "record_count",
"type": "int"
},
{
"name": "topic_name",
"type": "string"
Expand Down Expand Up @@ -64,6 +68,14 @@
{
"name": "record_count",
"type": "long"
},
{
"name": "start_offset",
"type": "int"
},
{
"name": "end_offset",
"type": "int"
}
],
"name": "DataFile",
Expand Down
65 changes: 58 additions & 7 deletions karapace/backup/backends/v3/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
"""
from __future__ import annotations

import itertools

from .checksum import RunningChecksum
from .errors import DecodeError, InvalidChecksum, UnknownChecksumAlgorithm
from .errors import InvalidChecksum, UnknownChecksumAlgorithm, OffsetMismatch
from .readers import read_metadata, read_records
from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record
from .writers import write_metadata, write_record
Expand All @@ -20,6 +23,7 @@
from pathlib import Path
from typing import Callable, ContextManager, Final, Generator, IO, Iterator, Sequence, TypeVar
from typing_extensions import TypeAlias
from typing import Callable, Final, Generator, IO, Iterator, Sequence, Iterable, TypeVar

import datetime
import io
Expand Down Expand Up @@ -51,6 +55,17 @@ def _get_checksum_implementation(algorithm: ChecksumAlgorithm) -> Callable[[], R
assert_never(algorithm)


T = TypeVar("T")


def _peek(iterator: Iterator[T]) -> tuple[T | None, Iterator[T]]:
try:
first = next(iterator)
except StopIteration:
return None, iterator
return first, itertools.chain((first,), iterator)


class SchemaBackupV3Reader(BaseBackupReader):
def _read_data_file(
self,
Expand All @@ -61,11 +76,23 @@ def _read_data_file(
running_checksum = _get_checksum_implementation(metadata.checksum_algorithm)()

with path.open("rb") as data_buffer:
for record in read_records(
records = read_records(
buffer=data_buffer,
num_records=data_file.record_count,
running_checksum=running_checksum,
):
)

record, records = _peek(records)

# Verify first record matches start offset from data file.
if record is not None and record.offset != data_file.start_offset:
raise OffsetMismatch(
f"First record in data file does not match expected "
f"start_offset (expected: {data_file.start_offset}, "
f"actual: {record.offset})."
)

for record in records:
yield ProducerSend(
value=record.value,
key=record.key,
Expand All @@ -75,6 +102,15 @@ def _read_data_file(
partition_index=data_file.partition,
)

# Verify last record matches end offset from data file.
if record is not None and record.offset != data_file.end_offset:
raise OffsetMismatch(
f"Last record in data file does not match expected "
f"end_offset (expected: {data_file.end_offset}, "
f"actual: {record.offset})."
)

# Verify checksum matches.
if running_checksum.digest() != data_file.checksum:
raise InvalidChecksum("Found checksum mismatch after reading full data file.")

Expand Down Expand Up @@ -153,6 +189,8 @@ class _PartitionStats:
records_written: int = 0
bytes_written_since_checkpoint: int = 0
records_written_since_checkpoint: int = 0
min_offset: int | None = None
max_offset: int | None = None

def get_checkpoint(
self,
Expand All @@ -168,10 +206,16 @@ def get_checkpoint(
return self.running_checksum.digest()
return None

def increase(self, bytes_offset: int) -> None:
def update(
self,
bytes_offset: int,
record_offset: int,
) -> None:
self.records_written_since_checkpoint += 1
self.records_written += 1
self.bytes_written_since_checkpoint += bytes_offset
self.min_offset = record_offset if self.min_offset is None else min(record_offset, self.min_offset)
self.max_offset = record_offset if self.max_offset is None else max(record_offset, self.max_offset)


class SchemaBackupV3Writer(BytesBackupWriter[DataFile]):
Expand Down Expand Up @@ -230,6 +274,8 @@ def finalize_partition(self, index: int, filename: str) -> DataFile:
partition=index,
checksum=stats.running_checksum.digest(),
record_count=stats.records_written,
start_offset=stats.min_offset,
end_offset=stats.max_offset,
)

def store_metadata(
Expand All @@ -239,15 +285,16 @@ def store_metadata(
topic_id: uuid.UUID | None,
started_at: datetime.datetime,
finished_at: datetime.datetime,
partition_count: int,
data_files: Sequence[DataFile],
) -> None:
assert isinstance(path, Path)
metadata_path = path / f"{topic_name}.metadata"

if len(data_files) != 1:
if len(data_files) > 1:
raise RuntimeError("Cannot backup multi-partition topics")

if len(self._partition_stats):
if self._partition_stats and data_files:
raise RuntimeError("Cannot write metadata when not all partitions are finalized")

with bytes_writer(metadata_path, False) as buffer:
Expand All @@ -259,9 +306,10 @@ def store_metadata(
tool_version=__version__,
started_at=started_at,
finished_at=finished_at,
record_count=sum(data_file.record_count for data_file in data_files),
topic_name=topic_name,
topic_id=topic_id,
partition_count=len(data_files),
partition_count=partition_count,
checksum_algorithm=ChecksumAlgorithm.xxhash3_64_be,
data_files=tuple(data_files),
),
Expand Down Expand Up @@ -290,4 +338,7 @@ def store_record(
),
running_checksum=stats.running_checksum,
)
stats.increase(buffer.tell() - offset_start)
stats.update(
bytes_offset=buffer.tell() - offset_start,
record_offset=record.offset,
)
4 changes: 4 additions & 0 deletions karapace/backup/backends/v3/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class UnexpectedEndOfData(DecodeError, ValueError):
pass


class OffsetMismatch(DecodeError, ValueError):
pass


class EncodeError(BackupError):
pass

Expand Down
8 changes: 7 additions & 1 deletion karapace/backup/backends/v3/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import field
from karapace.avro_dataclasses.models import AvroModel
from karapace.dataclasses import default_dataclass
from typing import Optional, Tuple
from typing import Optional, Tuple, Union

import datetime
import enum
Expand All @@ -31,12 +31,16 @@ class DataFile(AvroModel):
partition: int = field(metadata={"type": "long"})
checksum: bytes
record_count: int = field(metadata={"type": "long"})
start_offset: int
end_offset: int

def __post_init__(self) -> None:
assert self.record_count >= 0
assert self.partition >= 0
assert self.checksum
assert self.filename
assert self.start_offset <= self.end_offset
assert self.end_offset - self.start_offset + 1 >= self.record_count


@default_dataclass
Expand All @@ -46,6 +50,7 @@ class Metadata(AvroModel):
tool_version: str
started_at: datetime.datetime
finished_at: datetime.datetime
record_count: int = field(metadata={"type": "int"})
topic_name: str
topic_id: Optional[uuid.UUID]
partition_count: int = field(metadata={"type": "int"})
Expand All @@ -58,6 +63,7 @@ def __post_init__(self) -> None:
assert self.finished_at >= self.started_at
assert self.partition_count == 1
assert self.version == 3
assert self.record_count == sum(data_file.record_count for data_file in self.data_files)


@default_dataclass
Expand Down
1 change: 1 addition & 0 deletions karapace/backup/backends/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def store_metadata(
topic_id: uuid.UUID | None,
started_at: datetime.datetime,
finished_at: datetime.datetime,
partition_count: int,
data_files: Sequence[F],
) -> None:
"""
Expand Down
Loading

0 comments on commit 40e0d67

Please sign in to comment.