Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9048 [WIP]: Keep track of state at FetchState when it needs to be updated #7581

Closed
wants to merge 10 commits into from

Conversation

guozhangwang
Copy link
Contributor

Introducing new state FetchingWithNewState, and only add to the session when that state is set, indicating some fields has changed since the last time. Scenarios include:

  1. fetch offset has changed.
  2. log start offset has changed.
  3. leader epoch has changed.
  4. when the partition is excluded from the previous fetch since it is delayed / truncated, indicating that when it has transited to Fetching we need to update the state as well.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata);
}

List<TopicPartition> added = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that FetchSessionHandler is used by the consumer too, so we will either need two separate implementations or make sure it works just as well for the consumer too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One gotcha that I'm not sure whether we look out for yet. If the fetch session gets reset to a full fetch session https://github.com/apache/kafka/pull/7581/files#diff-38a1c48d28da7f7f671cef22421990e5L415, we should attempt to send the partitions in the order that they exist in the replica fetcher partitionStates. This ensures fairness if we are only ever performing full fetch sessions. Maybe this is not a problem in practice but we probably shouldn't change this behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I will try to see if consumer can leverage on the same path as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding we should attempt to send the partitions in the order that they exist in the replica fetcher partitionStates. Thinking about it a bit, I think the current logic actually still preserves it based on how we managed to update the list: on the very first initial fetch, the sessionPartitions would be set to next which is the same order of fetchStates, and then in future updates, when new partitions are added while looping over fetchStates they are added at the end of the sessionPartitions, and if they are modified they would be modified in place without changing the order.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initialize next to an empty hashmap on each pass, but we only add elements where updates are needed to it https://github.com/apache/kafka/pull/7581/files#diff-a8437f241a5ae585d5805ee769313080R263. Won't this result in only the updated partitions being sent in a full fetch request if the previous incremental response failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a full fetch request, we use the sessionPartitions directly instead of next, should that be sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that that was changed. Why do we still need next then? At a glance it seems to serve no purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need next in the actual incremental fetch, where a partition is neither added nor removed from the sessionPartitions, indicating that it is the same since last fetch, in this case it should not be included in next. I.e. we still need next to maintain "what's changed in this session" while sessionPartitions includes all the partitions we are interested in this session, no matter if they are changed or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. next just means "updated or added" now.

I'm still not sure sessionPartitions will necessarily match the order in partitionStates, as we are only ever updating sessionPartitions if partitions are updated (though usually we send those to the end of partition states too). Previously the whole map would be built up from scratch in the order of partitionStates, and then sent in a full fetch. In all likelihood it's probably OK, I'm just not sure.

// 3. leader epoch has changed
// 4. the fetch was excluded before and now
if (fetchState.updateNeeded) {
val logStartOffset = this.logStartOffset(topicPartition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logStartOffset may change due to actions in the Partition/Log which are outside of the replica fetcher and are not currently signalled to the replica fetcher e.g. retention. I believe we need a way for the Partition/Log to signal that an update has occurred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at all the scenarios that logStartOffset can be modified:

  • when replica increased their logStartOffset due to the fetch response, this is usually due to delete-records API call, where leaders need to wait for the followers to be notified and increase their logStartOffset too before acking the request. This is covered in this PR.

  • when old segments get deleted due to compaction / retention, it is actually hard to capture since at that time we do not have direct access to FetchStates. I thought about it a for bit and I think it is not critical: the only reason that followers need to report log-start-offset to the leader to indicate if the deletion has been notified (consumers just set this value to 0): https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+deleteRecordsBefore%28%29+API+in+AdminClient. So in other words, retention resulted log-start-offset changes do not necessarily have to be reported to leaders.

I will sync with other folks who's familiar with this path to confirm, but if that's true I think we do not need to notify Partition/Log on log start offset changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether this was possible, but I really don't have a good handle with whether it's safe or not. Syncing with others to make sure makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with @junrao @hachikuji offline, and I think the log-start-offset is used for KIP-107 only, so not updating it due to log retention when sending to leader is okay.

// Exclude this partition from the upcoming session;
// if we have to exclude this fetch because it is delayed, we need to update its state
// so that it would always be included when the delay has elapsed
builder.remove(topicPartition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is it possible that we end up adding partitions to the "removed" list that we send in the incremental fetch request which were not in the previous request? Do we need to perform a contains check in here? https://github.com/apache/kafka/pull/7581/files#diff-38a1c48d28da7f7f671cef22421990e5R221
  2. How do we track partitions that were not removed due to pausing/errors/throttling, and were instead removed from the fetcher completely by
    def removePartitions(topicPartitions: Set[TopicPartition]): Unit = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Good catch, will do.
  2. Hmm that's a very good question. Since all the access of the map is synchronized by the partitionMapLock I think we can maintain a removed partition synchronized by that map as well, and let the buildFetch call iterate over it and also clean it within the lock. That's not very elegant but I cannot come up with a better solution on top of my head now. WDYT @lbradstreet ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other solution I can really think of is to maintain a list of removed partitions, and clean them from the map as we do today? I'm not totally sure which feels cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some pondering on that I decided to add the removedPartitions in the ReplicaFetcher instead of PartitionStates, since it was covered in inLock(partitionMapLock) and is less vulnerable to future changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. How is all of this integrating with the ReplicaAlterLogDirsThread implementation by the way? I'm guessing there's nothing to be done since there's no fetch request being built?

@@ -202,7 +205,23 @@ public String toString() {
* Mark that we want data from this partition in the upcoming fetch.
*/
public void add(TopicPartition topicPartition, PartitionData data) {
if (sessionPartitions.containsKey(topicPartition)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may be able to .put the added PartitionData into sessionPartitions up front, and capture the value returned by the .put, as it will return the previous value. If the previous value was null, then we add to added, if it was not null we add it to altered. This will save us the additional containsKey check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we didn't care about maintaining the altered and added lists (which are only used for debug level logging...), I would suggest that we simply add to next and then perform a putAll to sessionPartitions on build(). It would likely perform better if we are OK with losing a little info at debug levels.

@@ -211,50 +230,13 @@ public FetchRequestData build() {
log.debug("Built full fetch {} for node {} with {}.",
nextMetadata, node, partitionsToLogString(next.keySet()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we log print sessionPartitions.keySet here since that's the full session data now.

*/
public void remove(TopicPartition topicPartition) {
removed.add(topicPartition);
next.remove(topicPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will any of these partitions be in next? My assumption is that they would not be in partitionStates and would not be in next if they are being removed (especially if we get the locking right). Of course, they would be in sessionPartitions.

}
}

removedPartitions.foreach { topicPartition => builder.remove(topicPartition) }
removedPartitions.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if a partition was removed, and then re-added between passes? It's probably unlikely but it seems like it's possible.

private List<TopicPartition> removed = new ArrayList<>();
private List<TopicPartition> altered = new ArrayList<>();


Builder() {
this.next = new LinkedHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename next now? I think the name previously made sense because sessionPartitions would be pointed at it after each pass, so it was the "next" sessionPartitions, but now it's being used as an "updated" hashmap, and is completely thrown away after each pass.

val logStartOffset = this.logStartOffset(topicPartition)
builder.add(topicPartition, new FetchRequest.PartitionData(
fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
// A fetch needs to be used to update the session only if:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too bad this loop is still O(n) as a result of the shouldFollowerThrottle, but at least we've minimized the amount of work to be done inside the loop.

@lbradstreet
Copy link
Contributor

Before we merge this we should update the ReplicaFetcherBenchmark to use different fetch response patterns e.g. 100% of partitions return updated data vs 10% return updated data. The benchmark as currently written is returning extremely high iteration times that imply either an exception is returned or a mock is being used somewhere in the benchmarked code path. trunk doesn't exhibit this problem.

@lbradstreet
Copy link
Contributor

The next presizing here

val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
seems bad now that we are only tracking the updated fetches.

@@ -202,59 +206,56 @@ public String toString() {
* Mark that we want data from this partition in the upcoming fetch.
*/
public void add(TopicPartition topicPartition, PartitionData data) {
if (sessionPartitions.put(topicPartition, data) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we weren't tracking altered and added for debugging purposes, I would suggest we only call next.put here and then perform a sessionPartitions.putAll(next) on build. putAll is a bit more efficient, but I'm not sure it matters enough to change this given that we wouldn't be able to track altered and added. A second question is whether debug logging is a good enough reason to maintain altered and added for debugging alone.

@guozhangwang guozhangwang deleted the K9048-remove-next branch April 24, 2020 23:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants