Skip to content

Commit

Permalink
KAFKA-15107: Support custom metadata for remote log segment (apache#1…
Browse files Browse the repository at this point in the history
…3984)

* KAFKA-15107: Support custom metadata for remote log segment

This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit.

On testing:
1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit.
2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata.
3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata.
4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`).
5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly.

Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>
  • Loading branch information
ivanyu authored and jeqo committed Aug 15, 2023
1 parent 2d38b89 commit 9b92841
Show file tree
Hide file tree
Showing 33 changed files with 700 additions and 60 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote;

class CustomMetadataSizeLimitExceededException extends Exception {
}
39 changes: 34 additions & 5 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
Expand Down Expand Up @@ -483,12 +484,14 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
class RLMTask extends CancellableRunnable {

private final TopicIdPartition topicIdPartition;
private final int customMetadataSizeLimit;
private final Logger logger;

private volatile int leaderEpoch = -1;

public RLMTask(TopicIdPartition topicIdPartition) {
public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) {
this.topicIdPartition = topicIdPartition;
this.customMetadataSizeLimit = customMetadataSizeLimit;
LogContext logContext = new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] ");
logger = logContext.logger(RLMTask.class);
}
Expand Down Expand Up @@ -589,6 +592,11 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
} else {
logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", copiedOffset, lso);
}
} catch (CustomMetadataSizeLimitExceededException e) {
// Only stop this task. Logging is done where the exception is thrown.
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
this.cancel();
} catch (InterruptedException ex) {
throw ex;
} catch (Exception ex) {
Expand All @@ -600,7 +608,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
}
}

private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException {
private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();

Expand All @@ -626,10 +635,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);

RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);

if (customMetadata.isPresent()) {
long customMetadataSize = customMetadata.get().value().length;
if (customMetadataSize > this.customMetadataSizeLimit) {
CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
logger.error("Custom metadata size {} exceeds configured limit {}." +
" Copying will be stopped and copied segment will be attempted to clean." +
" Original metadata: {}",
customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e);
try {
// For deletion, we provide back the custom metadata by creating a new metadata object from the update.
// However, the update itself will not be stored in this case.
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
logger.info("Successfully cleaned segment after custom metadata size exceeded");
} catch (RemoteStorageException e1) {
logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", e1);
}
throw e;
}
}

remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
brokerTopicStats.topicStats(log.topicPartition().topic())
Expand Down Expand Up @@ -1126,7 +1155,7 @@ void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Consumer<RLMTask> convertToLeaderOrFollower) {
RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
RLMTask task = new RLMTask(topicIdPartition);
RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
convertToLeaderOrFollower.accept(task);
LOGGER.info("Created a new task: {} and getting scheduled", task);
Expand Down
111 changes: 103 additions & 8 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
Expand Down Expand Up @@ -106,7 +107,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -343,7 +343,8 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenReturn(Optional.empty());

// Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
Expand All @@ -354,7 +355,7 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);

Expand Down Expand Up @@ -398,6 +399,100 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}

// We are verifying that if the size of a piece of custom metadata is bigger than the configured limit,
// the copy task should be cancelled and there should be an attempt to delete the just copied segment.
@Test
void testCustomMetadataSizeExceedsLimit() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
long lastStableOffset = 150L;
long logEndOffset = 150L;

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));

File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
File tempDir = TestUtils.tempDirectory();
// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
verify(oldSegment, times(0)).readNextOffset();
verify(activeSegment, times(0)).readNextOffset();

FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));

ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
when(mockLog.logEndOffset()).thenReturn(logEndOffset);

LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);

int customMetadataSizeLimit = 128;
CustomMetadata customMetadata = new CustomMetadata(new byte[customMetadataSizeLimit * 2]);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenReturn(Optional.of(customMetadata));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);

ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());

// Check we attempt to delete the segment data providing the custom metadata back.
RemoteLogSegmentMetadataUpdate expectedMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
remoteLogSegmentMetadataArg.getValue().remoteLogSegmentId(), time.milliseconds(),
Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
RemoteLogSegmentMetadata expectedDeleteMetadata = remoteLogSegmentMetadataArg.getValue().createWithUpdates(expectedMetadataUpdate);
verify(remoteStorageManager, times(1)).deleteLogSegmentData(eq(expectedDeleteMetadata));

// Check the task is cancelled in the end.
assertTrue(task.isCancelled());

// The metadata update should not be posted.
verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));

// Verify the metric for remote writes are not updated.
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
// Verify we did not report any failure for remote writes
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}

@Test
void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
long oldSegmentStartOffset = 0L;
Expand Down Expand Up @@ -533,7 +628,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
// Verify aggregate metrics
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);

Expand Down Expand Up @@ -573,7 +668,7 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
when(mockLog.lastStableOffset()).thenReturn(250L);

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToFollower();
task.copyLogSegmentsToRemote(mockLog);

Expand Down Expand Up @@ -717,7 +812,7 @@ private MemoryRecords records(long timestamp,

@Test
void testRLMTaskShouldSetLeaderEpochCorrectly() {
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
assertFalse(task.isLeader());
task.convertToLeader(1);
assertTrue(task.isLeader());
Expand Down Expand Up @@ -865,7 +960,7 @@ public void testCandidateLogSegmentsSkipsActiveSegment() {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, activeSegment)));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
Expand All @@ -891,7 +986,7 @@ public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, segment3, activeSegment)));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
Expand Down

0 comments on commit 9b92841

Please sign in to comment.