KAFKA-20297: Cleanup org.apache.kafka.common.utils.CollectionUtils#21818
KAFKA-20297: Cleanup org.apache.kafka.common.utils.CollectionUtils#21818unknowntpo wants to merge 4 commits intoapache:trunkfrom
org.apache.kafka.common.utils.CollectionUtils#21818Conversation
|
@unknowntpo would you mind rebasing code? |
7e2ef79 to
10b13ba
Compare
Ok, rebased. |
…from CollectionUtils Replace all callers with idiomatic alternatives: - DescribeProducersHandler: add mapKey:true to DescribeProducersRequest.json and use find() on TopicRequestCollection - ListOffsetsHandler: use HashMap+computeIfAbsent Also fix Scala test callers affected by the DescribeProducersRequest mapKey change.
Inline subtractMap as a private helper in OAuthBearerExtensionsValidatorCallback, then delete the now-empty CollectionUtils class and its test.
…ethod Extract repeated stream-based grouping logic into a private groupPartitionsByTopic helper in StickyAssignor and AbstractStickyAssignorTest.
82b9572 to
4aea5f0
Compare
|
and ci errors are fixed. |
| return new MemberData(partitions, generation); | ||
| } | ||
|
|
||
| private static Map<String, List<Integer>> groupPartitionsByTopic(Collection<TopicPartition> partitions) { |
| @@ -90,6 +89,12 @@ public Map<String, String> ignoredExtensions() { | |||
| return Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(), invalidExtensions), validatedExtensions)); | |||
There was a problem hiding this comment.
The original logic will create map repeatedly. Could you use for-loop to streamline it?
public Map<String, String> ignoredExtensions() {
Map<String, String> ignored = new HashMap<>();
for (Map.Entry<String, String> entry : inputExtensions.map().entrySet()) {
String key = entry.getKey();
if (!invalidExtensions.containsKey(key) && !validatedExtensions.containsKey(key)) {
ignored.put(key, entry.getValue());
}
}
return Collections.unmodifiableMap(ignored);
}There was a problem hiding this comment.
I thought the subtractMap is more Functional Programming style, more declarative, and this path is unlikely a hot path, which is not suitable for repeating memory allocation, but your approach is okay for me, I'll change it.
| return new DescribeProducersResponse(response); | ||
| } | ||
|
|
||
| private static Map<String, Map<Integer, PartitionResponse>> groupPartitionDataByTopic( |
There was a problem hiding this comment.
we don't use such complex structure in tests actually. Maybe we could use describeProducersResponse(TopicPartition partition, PartitionResponse partitionResponse) instead
|
|
||
| Map<String, List<Integer>> map = CollectionUtils.groupPartitionsByTopic(partitions); | ||
| Map<String, List<Integer>> otherMap = CollectionUtils.groupPartitionsByTopic(otherPartitions); | ||
| Map<String, List<Integer>> map = groupPartitionsByTopic(partitions); |
There was a problem hiding this comment.
we could streamline the test.
Set<String> otherTopics = otherPartitions.stream()
.map(TopicPartition::topic)
.collect(Collectors.toSet());
for (TopicPartition tp : partitions) {
assertFalse(otherTopics.contains(tp.topic()),
"Error: Some partitions can be moved...");
}
This PR removes
CollectionUtilsentirely (KAFKA-20297) by replacingall usages with alternatives.