Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2720: expire group metadata when all offsets have expired #1427

Closed
wants to merge 7 commits into from

Conversation

hachikuji
Copy link
Contributor

No description provided.

@hachikuji
Copy link
Contributor Author

cc @onurkaraman @guozhangwang

This patch moves group metadata removal to the periodic offset expiration thread so that group metadata (including the generation) is preserved until all offsets for the group have expired. Since the last member may leave the group before offsets have expired, this patch changes the behavior to increment the generation and essentially write an otherwise empty group metadata object to the log. The main idea is to avoid resetting the generation to 0 until it is safe to do so. This also helps users who currently have no easy way to tell if the group simply doesn't exist or it is dead with possibly some offset state left behind.

Couple notes on the current patch:

  1. I've added a new Empty state to the coordinator's group state machine. This may be unnecessary, since it seems Stable could work as well, but I preferred slightly to keep the logic for this case separate. It also gets around the need to have a direct transition from PreparingRebalance to Stable, which should not be possible normally. Instead we allow PreparingRebalance to transition to Empty when there are no members left in the group.
  2. I've done a little bit of additional cleanup. The most notable change is the removal of the offset expiration lock in GroupMetadataManager, which seemed unnecessary with a small change to the offset loading logic. Rather than loading offsets directly into the cache, I first stage them in a local collection as is done with the group metadata. This should prevent the expiration thread from seeing stale values, but let me know if I've missed something.

delayedGroupStore.foreach(groupManager.store)
// for deadlock if the callback is invoked holding other locks (e.g. the replica
// state change lock)
delayedGroupStore.map(groupManager.store)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change from foreach to map? It doesn't seem like the return value is being used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. I briefly had considered refactoring the delayed store api and this may have been a side effect of manually reverting.

@guozhangwang
Copy link
Contributor

Just browsed through the code change, regarding 2) I am still not clear how it prevent concurrent access on the offsetCache. Let's discuss offline.

@guozhangwang
Copy link
Contributor

Specifically, could this issue be likely to triggered even without log compaction stopping? (second incident of the following blog post):

https://engineering.linkedin.com/blog/2016/05/kafkaesque-days-at-linkedin--part-1

@hachikuji
Copy link
Contributor Author

hachikuji commented May 25, 2016

Summary of brief offline discussion with @guozhangwang: I think we agree that concurrent loading and expiration is safe with this change, but we are trying to find a way to also solve the problem of expiration with a concurrent offset commit which is unprotected with and without this patch.

@onurkaraman
Copy link
Contributor

hey @hachikuji I'll try to get to this patch after I finish up testing the topic indexing idea you mentioned in the "[DISCUSS] scalability limits in the coordinator" thread.

@guozhangwang
Copy link
Contributor

Also ping @jjkoshy for reviews as well since you fixed the problem of race condition between offset expiration and added the lock; we are trying to safely remove the lock, and at the same time fix the race condition between offset expiration and offset commit as well.

* transition: last offsets removed in periodic expiration task => Dead
* join group from a new member => PreparingRebalance
*/
private[coordinator] case object Empty extends GroupState { val state: Byte = 5 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also document state transitions from other states to EMPTY state.

@hachikuji
Copy link
Contributor Author

@onurkaraman @guozhangwang Ready for another look. The main change I made is moving the offset cache of each group into GroupMetadata. For groups which only use offset commits, the group will stay in the Empty state until all offsets for the group expire. The change helps to unify expiration logic and addresses the race condition between offset commits and offset expiration, which could previously result in an inconsistent cache. Now offset update/expiration is protected with the group lock.

@guozhangwang
Copy link
Contributor

Thanks @hachikuji , will review.

The Jenkins failure looks familiar to me. I think we already have a JIRA for it?

@hachikuji
Copy link
Contributor Author

@guozhangwang Yeah, looks like an instance of KAFKA-3155.

}
)

def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
def start(enableExpiration: Boolean = true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function name is a bit awkward to me:

  1. it is only triggered in GroupCoordinator.startup, and enableExpiration is always passed (i.e. default value is never used?).

  2. it only take effects if the passed enableExpiration is true.

Could we rename it to startMetadataCleanupInBackground with no parameters, and let coordinator to call it only if enableExpiration is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I changed the name to enableMetadataExpiration.

@guozhangwang
Copy link
Contributor

The updated state machine diagram looks reasonable to me, and I will let @onurkaraman to take another check on it. One general comment about naming of removeGroup and removeGroupForPartition, and onGroupLoaded, onGroupUnloaded, could we differentiate them better by name the ones, for example: deleteGroup, evictGroupsFromCacheForPartition, and onGroupLoadedToCache onGroupEvictedFromCache. And also add some comments on when they could be triggered?

@hachikuji
Copy link
Contributor Author

@guozhangwang @ijuma @Ishiihara @onurkaraman: I'm removing the WIP tag on this patch. I've cleaned up a few small problems and added additional test cases. Take a look when you have a chance.

@hachikuji hachikuji changed the title KAFKA-2720 [WIP]: expire group metadata when all offsets have expired KAFKA-2720: expire group metadata when all offsets have expired Jun 13, 2016
@guozhangwang
Copy link
Contributor

@hachikuji All my comments are from in the previous pass and I do not have further ones, would be OK to go with it if others have made their own passes.

newGauge("NumOffsets",
new Gauge[Int] {
def value = offsetsCache.size
def value = groupMetadataCache.values.map(group => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worries me a bit. Every time we compute this metric, we need to hold onto many locks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugs me as well. I debated whether it was worthwhile keeping a separate atomic counter to track the size of the offsets or changing to a concurrent map, but it seemed like premature optimization. If you have any ideas, I'd love to hear them.

@hachikuji
Copy link
Contributor Author

FYI: There are some system test failures that I'm looking into.

@hachikuji
Copy link
Contributor Author

Found the problem. Here's a nearly clean run: http://testing.confluent.io/confluent-kafka-branch-builder-system-test-results/?prefix=2016-06-15--001.1465997374--hachikuji--KAFKA-2720--9ffe4d0/. The two failures appear to be unrelated.

@guozhangwang
Copy link
Contributor

LGTM. Merged to trunk.

@asfgit asfgit closed this in 8c55167 Jun 16, 2016
/**
* When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
* that partition.
* @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
*
* @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like an unintentional change, fix it in a follow-up? cc @hachikuji

@hachikuji
Copy link
Contributor Author

Thanks @ijuma. I'll prepare a follow-up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants