Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,13 @@ public boolean isActive(int brokerId) {
return !registration.inControlledShutdown() && !registration.fenced();
}

// AutoMQ for kafka inject start
public List<BrokerRegistration> getActiveBrokers() {
return brokerRegistrations.values().stream()
.filter(broker -> isActive(broker.id()))
.collect(Collectors.toList());
}
// AutoMQ for kafka inject end

BrokerHeartbeatManager heartbeatManager() {
if (heartbeatManager == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,8 @@ private QuorumController(
}
this.s3ObjectControlManager = new S3ObjectControlManager(
this, snapshotRegistry, logContext, clusterId, streamConfig, s3Operator);
this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager);
this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext,
this.s3ObjectControlManager, clusterControl);
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext);
// AutoMQ for Kafka inject end
updateWriteOffset(-1);
Expand Down Expand Up @@ -2308,8 +2309,7 @@ public void close() throws InterruptedException {
// AutoMQ for Kafka inject start
@Override
public CompletableFuture<Void> checkS3ObjectsLifecycle(ControllerRequestContext context) {
return appendWriteEvent("checkS3ObjectsLifecycle", context.deadlineNs(),
() -> s3ObjectControlManager.checkS3ObjectsLifecycle());
return appendWriteEvent("checkS3ObjectsLifecycle", context.deadlineNs(), s3ObjectControlManager::checkS3ObjectsLifecycle);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
Expand All @@ -65,9 +68,16 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -79,12 +89,9 @@
@SuppressWarnings("all")
public class StreamControlManager {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamControlManager.class);
private final SnapshotRegistry snapshotRegistry;

private final Logger log;

private final S3ObjectControlManager s3ObjectControlManager;

/**
* The next stream id to be assigned.
*/
Expand All @@ -94,16 +101,36 @@ public class StreamControlManager {

private final TimelineHashMap<Integer/*nodeId*/, NodeMetadata> nodesMetadata;

private final QuorumController quorumController;

private final SnapshotRegistry snapshotRegistry;

private final S3ObjectControlManager s3ObjectControlManager;

private final ClusterControlManager clusterControlManager;

private final ScheduledExecutorService cleanupScheduler;

public StreamControlManager(
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
S3ObjectControlManager s3ObjectControlManager) {
S3ObjectControlManager s3ObjectControlManager,
ClusterControlManager clusterControlManager) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(StreamControlManager.class);
this.s3ObjectControlManager = s3ObjectControlManager;
this.nextAssignedStreamId = new TimelineLong(snapshotRegistry);
this.streamsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.nodesMetadata = new TimelineHashMap<>(snapshotRegistry, 0);

this.cleanupScheduler = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-cleanup-scheduler", true));

this.quorumController = quorumController;
this.s3ObjectControlManager = s3ObjectControlManager;
this.clusterControlManager = clusterControlManager;

this.cleanupScheduler.scheduleWithFixedDelay(this::triggerCleanupScaleInNodes, 30, 30, TimeUnit.MINUTES);
}

public ControllerResult<CreateStreamResponse> createStream(int nodeId, long nodeEpoch, CreateStreamRequest request) {
Expand Down Expand Up @@ -898,6 +925,55 @@ private Errors nodeEpochCheck(int nodeId, long nodeEpoch, boolean checkFailover)
return Errors.NONE;
}

public void triggerCleanupScaleInNodes() {
if (!quorumController.isActive()) {
return;
}
quorumController.appendWriteEvent("cleanupScaleInNodes", OptionalLong.empty(), () -> {
return cleanupScaleInNodes();
});
}

public ControllerResult<Void> cleanupScaleInNodes() {
List<ApiMessageAndVersion> records = new LinkedList<>();
List<S3StreamSetObject> cleanupObjects = new LinkedList<>();
nodesMetadata.forEach((nodeId, nodeMetadata) -> {
if (!clusterControlManager.isActive(nodeId)) {
Collection<S3StreamSetObject> objects = nodeMetadata.streamSetObjects().values();
boolean alive = false;
for (S3StreamSetObject object: objects) {
if (alive) {
// if the last object is not expired, the likelihood of the subsequent object expiring is also quite low.
break;
}
long objectId = object.objectId();
AtomicBoolean expired = new AtomicBoolean(true);
List<StreamOffsetRange> streamOffsetRanges = object.offsetRangeList();
for (StreamOffsetRange streamOffsetRange : streamOffsetRanges) {
S3StreamMetadata stream = streamsMetadata.get(streamOffsetRange.streamId());
if (stream != null && stream.startOffset() < streamOffsetRange.endOffset()) {
expired.set(false);
alive = true;
}
}
if (expired.get()) {
cleanupObjects.add(object);
records.add(new ApiMessageAndVersion(new RemoveStreamSetObjectRecord()
.setNodeId(nodeId)
.setObjectId(objectId), (short) 0));
records.addAll(this.s3ObjectControlManager.markDestroyObjects(List.of(objectId)).records());
}
}
}
});
if (!cleanupObjects.isEmpty()) {
LOGGER.info("clean up scaled-in nodes objects: {}", cleanupObjects);
} else {
LOGGER.debug("clean up scaled-in nodes objects: []");
}
return ControllerResult.of(records, null);
}

public void replay(AssignedStreamIdRecord record) {
this.nextAssignedStreamId.set(record.assignedStreamId() + 1);
}
Expand Down
Loading