From 2b4b5ec19761b3b00e7a487405712625fea2cc81 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Mon, 11 Sep 2023 09:51:24 +0800 Subject: [PATCH] fix: convert brokerId to int32 Signed-off-by: Robin Han --- .../resources/common/message/GetOpeningStreamsRequest.json | 2 +- .../apache/kafka/controller/stream/StreamControlManager.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json b/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json index 1f42938a90..c77989c1bc 100644 --- a/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json +++ b/clients/src/main/resources/common/message/GetOpeningStreamsRequest.json @@ -26,7 +26,7 @@ "fields": [ { "name": "BrokerId", - "type": "int64", + "type": "int32", "versions": "0+", "about": "The broker id." }, diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 55ce9ac0ba..f31481ff02 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -476,8 +476,8 @@ public ControllerResult commitStreamObject(Commi public ControllerResult getOpeningStreams(GetOpeningStreamsRequestData data) { // TODO: check broker epoch, reject old epoch request. - int brokerId = (int) data.brokerId(); - // The getOpeningStreams operation rate is low, so we just iterate all streams to get the broker opening streams. + int brokerId = data.brokerId(); + // The getOpeningStreams is invoked when broker startup, so we just iterate all streams to get the broker opening streams. List streamOffsets = this.streamsMetadata.entrySet().stream().filter(entry -> { S3StreamMetadata streamMetadata = entry.getValue(); int rangeIndex = streamMetadata.currentRangeIndex.get();