Skip to content

Commit

Permalink
KAFKA-15107: Support custom metadata for remote log segment
Browse files Browse the repository at this point in the history
This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit.

On testing:
1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit.
2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata.
3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata.
4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`).
5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly.
  • Loading branch information
ivanyu committed Jul 20, 2023
1 parent f6e7aa3 commit ba4cd3c
Show file tree
Hide file tree
Showing 31 changed files with 742 additions and 192 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote;

class CustomMetadataSizeLimitExceededException extends Exception {
}
31 changes: 28 additions & 3 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
Expand Down Expand Up @@ -572,6 +573,9 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
} else {
logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", copiedOffset, lso);
}
} catch (CustomMetadataSizeLimitExceededException e) {
// Only stop this task. Logging is done where the exception is thrown.
this.cancel();
} catch (InterruptedException ex) {
throw ex;
} catch (Exception ex) {
Expand All @@ -595,7 +599,8 @@ private long getNextSegmentBaseOffset(long activeSegBaseOffset, ListIterator<Log
return nextSegmentBaseOffset;
}

private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException {
private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();

Expand All @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);

RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);

int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize();
if (customMetadata.isPresent()) {
long customMetadataSize = customMetadata.get().value().length;
if (customMetadataSize > customMetadataSizeLimit) {
CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
logger.error("Custom metadata size {} exceeds configured limit {}." +
" Copying will be stopped and copied segment will be attempted to clean." +
" Original metadata: {}",
customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e);
try {
// For deletion, we provide back the custom metadata by creating a new metadata object from the update.
// However, the update itself will not be stored in this case.
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
} catch (RemoteStorageException e1) {
logger.error("Error while cleaning segment after custom metadata size exceeded", e1);
}
throw e;
}
}

remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
brokerTopicStats.topicStats(log.topicPartition().topic())
Expand Down

0 comments on commit ba4cd3c

Please sign in to comment.