diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 273a016180..f3f42a9f2a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -180,7 +181,7 @@ public ControllerResult commitObject(long objectId, long objectSize, lon } S3Object object = this.objectsMetadata.get(objectId); if (object == null) { - log.error("object {} not exist when commit stream set object", objectId); + log.error("object {} not exist when commit object", objectId); return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); } // verify the state @@ -263,6 +264,7 @@ public void replay(RemoveS3ObjectRecord record) { */ public ControllerResult checkS3ObjectsLifecycle() { List records = new ArrayList<>(); + List ttlReachedObjects = new LinkedList<>(); // check the expired objects this.preparedObjects.stream(). map(objectsMetadata::get). @@ -276,6 +278,7 @@ public ControllerResult checkS3ObjectsLifecycle() { .setExpiredTimeInMs(obj.getExpiredTimeInMs()) .setCommittedTimeInMs(obj.getCommittedTimeInMs()) .setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs()); + ttlReachedObjects.add(obj.getObjectId()); // generate the records which mark the expired objects as destroyed records.add(new ApiMessageAndVersion(record, (short) 0)); // generate the records which listener reply for the object-destroy events @@ -284,6 +287,9 @@ public ControllerResult checkS3ObjectsLifecycle() { records.addAll(result.records()); }); }); + if (!ttlReachedObjects.isEmpty()) { + log.info("objects TTL is reached, objects={}", ttlReachedObjects); + } // check the mark destroyed objects List destroyedObjectKeys = this.markDestroyedObjects.stream() .map(this.objectsMetadata::get)