Skip to content

Commit

Permalink
KAFKA-15194: Prepend offset in the filenames used by LocalTieredStora…
Browse files Browse the repository at this point in the history
…ge (apache#14057)

Reviewers: Divij Vaidya <diviv@amazon.com>
  • Loading branch information
Owen-CH-Leung authored and jeqo committed Aug 15, 2023
1 parent 4a0aeb5 commit 3bfdd84
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static File producerSnapshotFile(File logDir, long offset) {
* @param offset The offset to use in the file name
* @return The filename
*/
private static String filenamePrefixFromOffset(long offset) {
public static String filenamePrefixFromOffset(long offset) {
NumberFormat nf = NumberFormat.getInstance();
nf.setMinimumIntegerDigits(20);
nf.setMaximumFractionDigits(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,25 @@
* The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
* <p>
* The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
* follows the structure UuidBase64-FileType.
* follows the structure startOffset-UuidBase64-FileType.
* <p>
* Given the root directory of the storage, segments and associated files are organized as represented below.
* </p>
* <code>
* / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / bCqX9U--S-6U8XUM9II25Q.log
* . . bCqX9U--S-6U8XUM9II25Q.index
* . . bCqX9U--S-6U8XUM9II25Q.timeindex
* . . h956soEzTzi9a-NOQ-DvKA.log
* . . h956soEzTzi9a-NOQ-DvKA.index
* . . h956soEzTzi9a-NOQ-DvKA.timeindex
* / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.log
* . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.index
* . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.timeindex
* . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.log
* . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.index
* . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.timeindex
* .
* / topic-1-LWgrMmVrT0a__7a4SasuPA / o8CQPT86QQmbFmi3xRmiHA.log
* . . o8CQPT86QQmbFmi3xRmiHA.index
* . . o8CQPT86QQmbFmi3xRmiHA.timeindex
* / topic-1-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.log
* . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.index
* . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.timeindex
* .
* / btopic-3-DRagLm_PS9Wl8fz1X43zVg / jvj3vhliTGeU90sIosmp_g.log
* . . jvj3vhliTGeU90sIosmp_g.index
* . . jvj3vhliTGeU90sIosmp_g.timeindex
* / topic-3-DRagLm_PS9Wl8fz1X43zVg / 00000000000000000011-jvj3vhliTGeU90sIosmp_g.log
* . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.index
* . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.timeindex
* </code>
*/
public final class LocalTieredStorage implements RemoteStorageManager {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final Lo
RemoteLogSegmentFileset fileset = null;

try {
fileset = openFileset(storageDirectory, id);
fileset = openFileset(storageDirectory, metadata);

logger.info("Offloading log segment for {} from segment={}", id.topicIdPartition(), data.logSegment());

Expand Down Expand Up @@ -359,7 +359,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata metadata,
eventBuilder.withStartPosition(startPos).withEndPosition(endPos);

try {
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId());
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);

final InputStream inputStream = newInputStream(fileset.getFile(SEGMENT).toPath(), READ);
inputStream.skip(startPos);
Expand All @@ -386,7 +386,7 @@ public InputStream fetchIndex(RemoteLogSegmentMetadata metadata, IndexType index
final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(eventType, metadata);

try {
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId());
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);

File file = fileset.getFile(fileType);
final InputStream inputStream = (fileType.isOptional() && !file.exists()) ?
Expand All @@ -411,7 +411,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata metadata) throws
if (deleteEnabled) {
try {
final RemoteLogSegmentFileset fileset = openFileset(
storageDirectory, metadata.remoteLogSegmentId());
storageDirectory, metadata);

if (!fileset.delete()) {
throw new RemoteStorageException("Failed to delete remote log segment with id:" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,19 @@ public void copyEmptyLogSegment() throws RemoteStorageException {
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(metadata, segment);

remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
}

@Test
public void copyDataFromLogSegment() throws RemoteStorageException {
final byte[] data = new byte[]{0, 1, 2};
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment(data);

tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
tieredStorage.copyLogSegmentData(metadata, segment);

remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment);
remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(metadata, segment);
}

@Test
Expand Down Expand Up @@ -201,49 +202,52 @@ public void fetchProducerSnapshot() throws RemoteStorageException {
@Test
public void deleteLogSegment() throws RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();

tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);

tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
}

@Test
public void deletePartition() throws RemoteStorageException {
int segmentCount = 10;
List<RemoteLogSegmentId> segmentIds = new ArrayList<>();
List<RemoteLogSegmentMetadata> segmentMetadatas = new ArrayList<>();
for (int i = 0; i < segmentCount; i++) {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
segmentIds.add(id);
tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
segmentMetadatas.add(metadata);
}
tieredStorage.deletePartition(topicIdPartition);
remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath());
for (RemoteLogSegmentId segmentId: segmentIds) {
remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId);
for (RemoteLogSegmentMetadata segmentMetadata: segmentMetadatas) {
remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentMetadata);
}
}

@Test
public void deleteLogSegmentWithoutOptionalFiles() throws RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment();
segment.transactionIndex().get().toFile().delete();

tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> {
tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata, path -> {
String fileName = path.getFileName().toString();
if (!fileName.contains(LogFileUtils.TXN_INDEX_FILE_SUFFIX)) {
remoteStorageVerifier.assertFileExists(path);
}
});

tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyLogSegmentFilesAbsent(id);
remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
}

@Test
Expand All @@ -252,12 +256,12 @@ public void segmentsAreNotDeletedIfDeleteApiIsDisabled(TestInfo testInfo) throws

final RemoteLogSegmentId id = newRemoteLogSegmentId();
final LogSegmentData segment = localLogSegments.nextSegment();

final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);

tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyContainsLogSegmentFiles(id);
remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
}

@Test
Expand Down Expand Up @@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, final TopicIdPartition t
this.topicIdPartition = requireNonNull(topicIdPartition);
}

private List<Path> expectedPaths(final RemoteLogSegmentId id) {
private List<Path> expectedPaths(final RemoteLogSegmentMetadata metadata) {
final String rootPath = getStorageRootDirectory();
TopicPartition tp = topicIdPartition.topicPartition();
final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(),
topicIdPartition.topicId());
final String uuid = id.id().toString();
final String uuid = metadata.remoteLogSegmentId().id().toString();
final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());

return Arrays.asList(
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
);
}

Expand All @@ -424,37 +429,37 @@ public Path expectedPartitionPath() {
return Paths.get(rootPath, topicPartitionSubpath);
}

public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id, final Consumer<Path> action) {
expectedPaths(id).forEach(action);
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata, final Consumer<Path> action) {
expectedPaths(metadata).forEach(action);
}

/**
* Verify the remote storage contains remote log segment and associated files for the provided {@code id}.
*
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes).
* @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
*/
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id) {
expectedPaths(id).forEach(this::assertFileExists);
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata) {
expectedPaths(metadata).forEach(this::assertFileExists);
}

/**
* Verify the remote storage does NOT contain remote log segment and associated files for the provided {@code id}.
*
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes).
* @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
*/
public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentId id) {
expectedPaths(id).forEach(this::assertFileDoesNotExist);
public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentMetadata metadata) {
expectedPaths(metadata).forEach(this::assertFileDoesNotExist);
}

/**
* Compare the content of the remote segment with the provided {@link LogSegmentData}.
* This method does not fetch from the remote storage.
*
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes).
* @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
* @param seg The segment stored on Kafka's local storage.
*/
public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentId id, final LogSegmentData seg) {
final Path remoteSegmentPath = expectedPaths(id).get(0);
public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentMetadata metadata, final LogSegmentData seg) {
final Path remoteSegmentPath = expectedPaths(metadata).get(0);
assertFileDataEquals(remoteSegmentPath, seg.logSegment());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
* the local tiered storage:
*
* <code>
* / storage-directory / topic-partition-uuidBase64 / oAtiIQ95REujbuzNd_lkLQ.log
* . oAtiIQ95REujbuzNd_lkLQ.index
* . oAtiIQ95REujbuzNd_lkLQ.timeindex
* / storage-directory / topic-partition-uuidBase64 / 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log
* . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.index
* . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.timeindex
* </code>
*/
public final class RemoteLogSegmentFileset {
Expand All @@ -73,9 +73,9 @@ public final class RemoteLogSegmentFileset {
* The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
* follows the structure UUID-FileType.
*/
private static final Pattern FILENAME_FORMAT = compile("([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
private static final int GROUP_UUID = 1;
private static final int GROUP_FILE_TYPE = 2;
private static final Pattern FILENAME_FORMAT = compile("(\\d+-)([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
private static final int GROUP_UUID = 2;
private static final int GROUP_FILE_TYPE = 3;

/**
* Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage.
Expand All @@ -98,10 +98,10 @@ public enum RemoteLogSegmentFileType {

/**
* Provides the name of the file of this type for the given UUID in the local tiered storage,
* e.g. uuid.log.
* e.g. 0-uuid.log.
*/
public String toFilename(final Uuid uuid) {
return uuid.toString() + suffix;
public String toFilename(final String startOffset, final Uuid uuid) {
return startOffset + "-" + uuid.toString() + suffix;
}

/**
Expand Down Expand Up @@ -155,19 +155,21 @@ public String getSuffix() {
* the log segment offloaded are not created on the file system until transfer happens.
*
* @param storageDir The root directory of the local tiered storage.
* @param id Remote log segment id assigned to a log segment in Kafka.
* @param metadata Remote log metadata about a topic partition's remote log.
* @return A new fileset instance.
*/
public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentId id) {
public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentMetadata metadata) {

final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(id.topicIdPartition(), storageDir);
final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(
metadata.remoteLogSegmentId().topicIdPartition(), storageDir);
final File partitionDirectory = tpDir.getDirectory();
final Uuid uuid = id.id();
final Uuid uuid = metadata.remoteLogSegmentId().id();
final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());

final Map<RemoteLogSegmentFileType, File> files = stream(RemoteLogSegmentFileType.values())
.collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(uuid))));
.collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(startOffset, uuid))));

return new RemoteLogSegmentFileset(tpDir, id, files);
return new RemoteLogSegmentFileset(tpDir, metadata.remoteLogSegmentId(), files);
}

/**
Expand All @@ -183,7 +185,7 @@ public static RemoteLogSegmentFileset openExistingFileset(final RemoteTopicParti
try {
final Map<RemoteLogSegmentFileType, File> files =
Files.list(tpDirectory.getDirectory().toPath())
.filter(path -> path.getFileName().toString().startsWith(uuid.toString()))
.filter(path -> path.getFileName().toString().contains(uuid.toString()))
.collect(toMap(path -> getFileType(path.getFileName().toString()), Path::toFile));

final Set<RemoteLogSegmentFileType> expectedFileTypes = stream(RemoteLogSegmentFileType.values())
Expand Down

0 comments on commit 3bfdd84

Please sign in to comment.