Skip to content

KAFKA-19338: Error on read/write of uninitialized share part. #19861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -59,15 +59,15 @@ public DeleteShareGroupStateRequest(DeleteShareGroupStateRequestData data, short
public DeleteShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<DeleteShareGroupStateResponseData.DeleteStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
topicResult -> results.add(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData()
.setResults(results));
.setResults(results));
}

@Override
@@ -77,8 +77,8 @@ public DeleteShareGroupStateRequestData data() {

public static DeleteShareGroupStateRequest parse(Readable readable, short version) {
return new DeleteShareGroupStateRequest(
new DeleteShareGroupStateRequestData(readable, version),
version
new DeleteShareGroupStateRequestData(readable, version),
version
);
}
}
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
import org.apache.kafka.common.protocol.Readable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -104,7 +103,7 @@ public static DeleteShareGroupStateResponseData.PartitionResult toResponsePartit

public static DeleteShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
return new DeleteShareGroupStateResponseData().setResults(
Collections.singletonList(new DeleteShareGroupStateResponseData.DeleteStateResult()
List.of(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
Original file line number Diff line number Diff line change
@@ -59,16 +59,16 @@ public ReadShareGroupStateRequest(ReadShareGroupStateRequestData data, short ver
public ReadShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<ReadShareGroupStateResponseData.ReadStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
topicResult -> results.add(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
return new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
.setResults(results));
.setResults(results));
}

@Override
@@ -78,8 +78,8 @@ public ReadShareGroupStateRequestData data() {

public static ReadShareGroupStateRequest parse(Readable readable, short version) {
return new ReadShareGroupStateRequest(
new ReadShareGroupStateRequestData(readable, version),
version
new ReadShareGroupStateRequestData(readable, version),
version
);
}
}
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
import org.apache.kafka.common.protocol.Readable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -47,9 +46,9 @@ public ReadShareGroupStateResponseData data() {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@@ -66,52 +65,52 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {

public static ReadShareGroupStateResponse parse(Readable readable, short version) {
return new ReadShareGroupStateResponse(
new ReadShareGroupStateResponseData(readable, version)
new ReadShareGroupStateResponseData(readable, version)
);
}

public static ReadShareGroupStateResponseData toResponseData(
Uuid topicId,
int partition,
long startOffset,
int stateEpoch,
List<ReadShareGroupStateResponseData.StateBatch> stateBatches
Uuid topicId,
int partition,
long startOffset,
int stateEpoch,
List<ReadShareGroupStateResponseData.StateBatch> stateBatches
) {
return new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(
new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(startOffset)
.setStateEpoch(stateEpoch)
.setStateBatches(stateBatches)
))
));
.setResults(List.of(
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(startOffset)
.setStateEpoch(stateEpoch)
.setStateBatches(stateBatches)
))
));
}

public static ReadShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
return new ReadShareGroupStateResponseData().setResults(
Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
List.of(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
}

public static ReadShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(int partitionId, Errors error, String errorMessage) {
return new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}

public static ReadShareGroupStateResponseData.ReadStateResult toResponseReadStateResult(Uuid topicId, List<ReadShareGroupStateResponseData.PartitionResult> partitionResults) {
return new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
.setTopicId(topicId)
.setPartitions(partitionResults);
}

public static ReadShareGroupStateResponseData toGlobalErrorResponse(ReadShareGroupStateRequestData request, Errors error) {
Original file line number Diff line number Diff line change
@@ -59,16 +59,16 @@ public WriteShareGroupStateRequest(WriteShareGroupStateRequestData data, short v
public WriteShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<WriteShareGroupStateResponseData.WriteStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
topicResult -> results.add(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
return new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData()
.setResults(results));
.setResults(results));
}

@Override
@@ -78,8 +78,8 @@ public WriteShareGroupStateRequestData data() {

public static WriteShareGroupStateRequest parse(Readable readable, short version) {
return new WriteShareGroupStateRequest(
new WriteShareGroupStateRequestData(readable, version),
version
new WriteShareGroupStateRequestData(readable, version),
version
);
}
}
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
import org.apache.kafka.common.protocol.Readable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,9 +46,9 @@ public WriteShareGroupStateResponseData data() {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@@ -66,47 +65,47 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {

public static WriteShareGroupStateResponse parse(Readable readable, short version) {
return new WriteShareGroupStateResponse(
new WriteShareGroupStateResponseData(readable, version)
new WriteShareGroupStateResponseData(readable, version)
);
}

public static WriteShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) {
return new WriteShareGroupStateResponseData()
.setResults(Collections.singletonList(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(
new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)))));
.setResults(List.of(
new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)))));
}

public static WriteShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData();
responseData.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
responseData.setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
return responseData;
}

public static WriteShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(int partitionId, Errors error, String errorMessage) {
return new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}

public static WriteShareGroupStateResponseData.WriteStateResult toResponseWriteStateResult(Uuid topicId, List<WriteShareGroupStateResponseData.PartitionResult> partitionResults) {
return new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
.setTopicId(topicId)
.setPartitions(partitionResults);
}

public static WriteShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) {
return new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId);
.setPartition(partitionId);
}

public static WriteShareGroupStateResponseData toGlobalErrorResponse(WriteShareGroupStateRequestData request, Errors error) {
Loading
Oops, something went wrong.