Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-795 Make AbstractCoordinator part of the public API #11515

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
aed4772
KIP-795 Make AbstractCoordinator part of the public API
hgeraldino Nov 16, 2021
f5e566f
KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options (#11…
Nov 15, 2021
90a8710
KAFKA-13445: Add ECDSA test for JWT validation (#11487)
Nov 16, 2021
73eab80
KAFKA-13406: skip assignment validation for built-in cooperativeStick…
showuon Nov 16, 2021
03aba5c
MINOR: Fix FetchSessionBenchmark (#11501)
jolshan Nov 16, 2021
c86bbff
MINOR: Improve KafkaStreamsTest: testInitializesAndDestroysMetricsRep…
mkandaswamy Nov 16, 2021
a925a7c
KAFKA-13449: Comment optimization for parameter log.cleaner.delete.re…
RivenSun2 Nov 16, 2021
4cc9ab7
Revert "MINOR: Remove redundant argument from TaskMetricsGroup#record…
rhauch Nov 16, 2021
895a1dd
Add recordMetadata() to StateStoreContext (#11498)
Nov 16, 2021
68e90ec
KAFKA-13071; Deprecate support for changing acls through the authoriz…
hachikuji Nov 16, 2021
a7214fe
MINOR: update Kafka Streams standby task config (#11404)
mjsax Nov 17, 2021
278d5d6
KAFKA-13443: Kafka broker exits when OAuth enabled and certain config…
Nov 17, 2021
0c987df
KAFKA-13439: Deprecate eager rebalance protocol in kafka stream (#11490)
showuon Nov 17, 2021
08c4f69
KAFKA-13397: MirrorMaker should not mirror topics ending with `.inter…
dongjinleekr Nov 17, 2021
f6ae385
KAFKA-12257; Consumer mishandles topics deleted and recreated with th…
jolshan Nov 17, 2021
71b9f67
MINOR: Fix client.quota.callback.class doc (#11510)
joel-hamill Nov 18, 2021
79380ef
MINOR: Remove unused parameters, exceptions, comments, etc. (#11472)
dongjinleekr Nov 18, 2021
fa6acf7
MINOR: Small improvements to KafkaProducer javadocs (#11467)
showuon Nov 18, 2021
f4314e2
KAFKA-13394: Topic IDs should be removed from PartitionFetchState if …
jolshan Nov 18, 2021
3b32e94
MINOR: Set mock correctly in RocksDBMetricsRecorderTest (#11462)
cadonna Nov 18, 2021
559968c
MINOR: Modify the Exception type of the testCommitOffsetAsyncNotCoord…
RivenSun2 Nov 19, 2021
1be764d
KAFKA-9648: Add configuration to adjust listen backlog size for Accep…
ocadaruma Nov 19, 2021
b993133
MINOR: Brokers in KRaft don't need controller listener (#11511)
jsancio Nov 19, 2021
e3f91da
MINOR: Update error log in `DefaultSslEngineFactory#createTrustStoreF…
defhacks Nov 22, 2021
090e43e
KAFKA-13455: Add steps to run Kafka Connect to quickstart (#11500)
katheris Nov 22, 2021
fc2b4fd
MINOR: Share BrokerMetadataPublisher code for coordinators (#11525)
jsancio Nov 22, 2021
b9bfb6d
KAFKA-13117: migrate TupleForwarder and CacheFlushListener to new Rec…
jeqo Nov 23, 2021
9d1c52c
KAFKA-13457: SocketChannel in Acceptor#accept is not closed upon IOEx…
functioner Nov 24, 2021
25727ed
KAFKA-13357; Store producer IDs in broker snapshots
cmccabe Nov 24, 2021
d3728db
MINOR: guard against calls to exit in QuorumTestHarness tests (#11457)
cmccabe Nov 24, 2021
106286c
KAFKA-13480: Track Position in KeyValue stores (#11514)
Nov 25, 2021
28e2deb
MINOR: Update rocksDb memory management doc (#11528)
showuon Nov 25, 2021
6543630
MINOR: Fix system test StreamsCooperativeRebalanceUpgradeTest.test_up…
cadonna Nov 25, 2021
d19bda2
MINOR: Reduce log cleaner offset memory usage in KRaftClusterTestKit …
lbradstreet Nov 25, 2021
1bb2411
MINOR: Update javadoc of `SnapshotWriter.createWithHeader` (#11530)
socutes Nov 29, 2021
c5bdb23
KAFKA-10712; Update release scripts to Python3 (#11538)
dajac Nov 29, 2021
a35aeae
MINOR: replace Thread.isAlive by Thread.is_alive for Python code (#11…
chia7712 Nov 29, 2021
e19aed6
KIP-795 replace org.apache.kafka.message classes from public API
hgeraldino Nov 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
<property name="max" value="25"/>
<property name="max" value="26"/>
<property name="excludeClassesRegexps" value="AtomicInteger"/>
</module>
<module name="BooleanExpressionComplexity">
Expand Down
50 changes: 33 additions & 17 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,13 @@ synchronized Optional<MetadataResponse.PartitionMetadata> partitionMetadataIfCur
}
}

/**
* @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache
*/
public synchronized Map<String, Uuid> topicIds() {
return cache.topicIds();
}

public synchronized Map<Uuid, String> topicNames() {
return cache.topicNames();
}

public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) {
Optional<MetadataResponse.PartitionMetadata> maybeMetadata = partitionMetadataIfCurrent(topicPartition);
if (!maybeMetadata.isPresent())
Expand Down Expand Up @@ -326,22 +325,31 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,

List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
Map<String, Uuid> topicIds = new HashMap<>();
Map<String, Uuid> oldTopicIds = cache.topicIds();
for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
topics.add(metadata.topic());
if (!metadata.topicId().equals(Uuid.ZERO_UUID))
topicIds.put(metadata.topic(), metadata.topicId());
String topicName = metadata.topic();
Uuid topicId = metadata.topicId();
topics.add(topicName);
// We can only reason about topic ID changes when both IDs are valid, so keep oldId null unless the new metadata contains a topic ID
Uuid oldTopicId = null;
if (!Uuid.ZERO_UUID.equals(topicId)) {
topicIds.put(topicName, topicId);
oldTopicId = oldTopicIds.get(topicName);
} else {
topicId = null;
}

if (!retainTopic(metadata.topic(), metadata.isInternal(), nowMs))
if (!retainTopic(topicName, metadata.isInternal(), nowMs))
continue;

if (metadata.isInternal())
internalTopics.add(metadata.topic());
internalTopics.add(topicName);

if (metadata.error() == Errors.NONE) {
for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
// Even if the partition's metadata includes an error, we need to handle
// the update to catch new epochs
updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs())
updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs(), topicId, oldTopicId)
.ifPresent(partitions::add);

if (partitionMetadata.error.exception() instanceof InvalidMetadataException) {
Expand All @@ -352,14 +360,14 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
}
} else {
if (metadata.error().exception() instanceof InvalidMetadataException) {
log.debug("Requesting metadata update for topic {} due to error {}", metadata.topic(), metadata.error());
log.debug("Requesting metadata update for topic {} due to error {}", topicName, metadata.error());
requestUpdate();
}

if (metadata.error() == Errors.INVALID_TOPIC_EXCEPTION)
invalidTopics.add(metadata.topic());
invalidTopics.add(topicName);
else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
unauthorizedTopics.add(metadata.topic());
unauthorizedTopics.add(topicName);
}
}

Expand All @@ -375,17 +383,25 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)

/**
* Compute the latest partition metadata to cache given ordering by leader epochs (if both
* available and reliable).
* available and reliable) and whether the topic ID changed.
*/
private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
MetadataResponse.PartitionMetadata partitionMetadata,
boolean hasReliableLeaderEpoch) {
boolean hasReliableLeaderEpoch,
Uuid topicId,
Uuid oldTopicId) {
TopicPartition tp = partitionMetadata.topicPartition;
if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
int newEpoch = partitionMetadata.leaderEpoch.get();
// If the received leader epoch is at least the same as the previous one, update the metadata
Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
if (currentEpoch == null || newEpoch >= currentEpoch) {
if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
// If both topic IDs were valid and the topic ID changed, update the metadata
log.info("Resetting the last seen epoch of partition {} to {} since the associated topicId changed from {} to {}",
tp, newEpoch, oldTopicId, topicId);
lastSeenLeaderEpochs.put(tp, newEpoch);
return Optional.of(partitionMetadata);
} else if (currentEpoch == null || newEpoch >= currentEpoch) {
// If the received leader epoch is at least the same as the previous one, update the metadata
log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
lastSeenLeaderEpochs.put(tp, newEpoch);
return Optional.of(partitionMetadata);
Expand Down
24 changes: 6 additions & 18 deletions clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class MetadataCache {
private final Node controller;
private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
private final Map<String, Uuid> topicIds;
private final Map<Uuid, String> topicNames;

private Cluster clusterInstance;

Expand Down Expand Up @@ -83,11 +82,6 @@ private MetadataCache(String clusterId,
this.controller = controller;
this.topicIds = topicIds;

this.topicNames = new HashMap<>(topicIds.size());
for (Map.Entry<String, Uuid> entry : topicIds.entrySet()) {
this.topicNames.put(entry.getValue(), entry.getKey());
}

this.metadataByPartition = new HashMap<>(partitions.size());
for (PartitionMetadata p : partitions) {
this.metadataByPartition.put(p.topicPartition, p);
Expand All @@ -108,10 +102,6 @@ Map<String, Uuid> topicIds() {
return topicIds;
}

Map<Uuid, String> topicNames() {
return topicNames;
}

Optional<Node> nodeById(int id) {
return Optional.ofNullable(nodes.get(id));
}
Expand Down Expand Up @@ -156,15 +146,13 @@ MetadataCache mergeWith(String newClusterId,
Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));

Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());

// We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
// we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
if (shouldRetainTopic.test(entry.getKey())) {
newTopicIds.put(entry.getKey(), entry.getValue());
}
}
// We want the most recent topic ID. We start with the previous ID stored for retained topics and then
// update with newest information from the MetadataResponse. We always take the latest state, removing existing
// topic IDs if the latest state contains the topic name but not a topic ID.
Map<String, Uuid> newTopicIds = topicIds.entrySet().stream()
.filter(entry -> shouldRetainTopic.test(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

for (PartitionMetadata partition : addPartitions) {
newMetadataByPartition.put(partition.topicPartition, partition);
Expand Down