Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7831; Do not modify subscription state from background thread #6221

Merged
merged 3 commits into from
Mar 8, 2019

Conversation

hachikuji
Copy link
Contributor

Metadata may be updated from the background thread, so we need to protect access to SubscriptionState. This patch restructures the metadata handling so that we only check pattern subscriptions in the foreground. Additionally, it improves the following:

  1. SubscriptionState is now the source of truth for the topics that will be fetched. We had a lot of messy logic previously to try and keep the the topic set in Metadata consistent with the subscription, so this simplifies the logic.
  2. The metadata needs for the producer and consumer are quite different, so it made sense to separate the custom logic into separate extensions of Metadata. For example, only the producer requires topic expiration.
  3. We've always had an edge case in which a metadata change with an inflight request may cause us to effectively miss an expected update. This patch implements a separate version inside Metadata which is bumped when the needed topics changes.
  4. This patch removes the MetadataListener, which was the cause of https://issues.apache.org/jira/browse/KAFKA-7764.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji hachikuji force-pushed the KAFKA-7831 branch 6 times, most recently from d87b815 to 03cbbc1 Compare February 4, 2019 17:21
/**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
*
* @param metadataResponse metadata response received from the broker
* @param response metadata response received from the broker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: javadoc for new param requestVersion

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the PR. Made one pass through the implementation changes and it is looking really good. I will go through the tests and do another pass through the implementation tomorrow.

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did another pass through the implementation and left some minor comments. Still need to go through the unit tests.

Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I have gone through the tests as well. The PR is looking good, I have left only some minor comments.


for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> recordsByPartition = fetchedRecords();
for (List<ConsumerRecord<byte[], byte[]>> records : recordsByPartition.values())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do recordsByPartition.values().forEach(fetchedRecords::addAll)?

private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) {
List<Long> res = new ArrayList<>(records.size());
for (ConsumerRecord<?, ?> record : records)
res.add(record.offset());
return res;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary newline?

protected Time createTime() {
return Time.SYSTEM;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline?


assertNull(caughtException.get());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline?

public void testWaitObjectConditionSatisfied() throws InterruptedException {
Object obj = new Object();
Time time = createTime();
long timeoutMs = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use a large timeout here and test that join returns before timeout elapses?

*
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before the condition is satisfied
*/
void waitObject(Object obj, Predicate<Void> condition, long timeoutMs) throws InterruptedException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add description of the params to the javadoc? In particular the fact that we synchronize on obj and that we expect the caller to notify when condition is satisfied?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Predicate<Void> what about Supplier<Boolean>?

@ijuma
Copy link
Contributor

ijuma commented Feb 17, 2019

In KAFKA-7565, Rajini added a comment about this PR. Including it here so that @hachikuji can take it into consideration:

Jason Gustafson I am not sure if this is covered by your PR for KAFKA-7831.

We have a check for whether the partitions returned in a fetch response are still in the fetch session and we return if that is not the case:

Further down in the code, we assume that the partitions do exist in the session and access the partitions without checking for null:

long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;

Presumably fetch session partitions can change in between lines 234 aand 243 since the response could be processed on the heartbeat thread. With the changes from your PR for KAFKA-7831, can we guarantee that the session partitions wont change

@hachikuji
Copy link
Contributor Author

@rajinisivaram Thanks for the review. I will address your comments today.

@hachikuji
Copy link
Contributor Author

@rajinisivaram Thanks for reviewing. I pushed an update. I had a couple questions in response to your comments as well.

@hachikuji hachikuji force-pushed the KAFKA-7831 branch 2 times, most recently from 8a7a45d to cefe5df Compare February 23, 2019 18:39
@rajinisivaram
Copy link
Contributor

@hachikuji Thanks for the updates, looks good. Left a comment on inProgressRequestVersion. Also, the PR needs rebasing since there is a conflict.

@ijuma
Copy link
Contributor

ijuma commented Mar 6, 2019

Tests are passing

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Few questions/comments inline

@@ -94,7 +102,7 @@ public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultReset
this.defaultResetStrategy = defaultResetStrategy;
this.subscription = Collections.emptySet();
this.assignment = new PartitionStates<>();
this.groupSubscription = new HashSet<>();
this.groupSubscription = Collections.newSetFromMap(new ConcurrentHashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While looking this up (hadn't seen newSetFromMap before), I found that ConcurrentHashMap has a newKeySet method which is basically equivalent to this. Not suggesting we use it, just thought I'd share

*
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before the condition is satisfied
*/
void waitObject(Object obj, Predicate<Void> condition, long timeoutMs) throws InterruptedException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Predicate<Void> what about Supplier<Boolean>?

private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private AuthenticationException authenticationException;
private KafkaException metadataException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care about overwriting this value if get-and-clear hasn't been called?

this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
this.version = 0;
this.requestVersion = 0;
this.updateVersion = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if having the broker manage a metadata version would make this any easier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would. It would be nice to have a simple version in the metadata response that we could use to stale metadata. I suspect the partition leader epoch is the closest we can get to that without introducing server metadata bottlenecks. That said, I think some amount of bookkeeping is unavoidable in the client since the set of topics that it is interested in changes over time. You need some way of telling whether pending metadata fetches reflect changes to the interested topic set.

@hachikuji hachikuji force-pushed the KAFKA-7831 branch 2 times, most recently from d3d0945 to 84366c0 Compare March 7, 2019 20:57
Copy link
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the PR, LGTM

@hachikuji
Copy link
Contributor Author

@rajinisivaram @mumrah Thanks much for reviews. Merging to trunk only. Apologies in advance for anyone who has to fix conflicts in their PR.

@hachikuji hachikuji merged commit 460e46c into apache:trunk Mar 8, 2019
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…pache#6221)

Metadata may be updated from the background thread, so we need to protect access to SubscriptionState. This patch restructures the metadata handling so that we only check pattern subscriptions in the foreground. Additionally, it improves the following:

1. SubscriptionState is now the source of truth for the topics that will be fetched. We had a lot of messy logic previously to try and keep the the topic set in Metadata consistent with the subscription, so this simplifies the logic.
2. The metadata needs for the producer and consumer are quite different, so it made sense to separate the custom logic into separate extensions of Metadata. For example, only the producer requires topic expiration.
3. We've always had an edge case in which a metadata change with an inflight request may cause us to effectively miss an expected update. This patch implements a separate version inside Metadata which is bumped when the needed topics changes.
4. This patch removes the MetadataListener, which was the cause of https://issues.apache.org/jira/browse/KAFKA-7764. 

Reviewers: David Arthur <mumrah@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants