Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Client.Coordinator() to retrieve the coordinating broker for a consumer group #411

Merged
merged 1 commit into from
Apr 10, 2015

Conversation

wvanbergen
Copy link
Contributor

This is the first step towards consumer group offset management, and later full consumer group management in Kafka 0.8.3.

Careful review of the 2 locks would be appreciated. Do we need to locks or can be just use the single lock for everything?

Extracted from #379.

@Shopify/kafka

return coordinator, nil
}

coordinator, err := client.refreshCoordinator(consumerGroup, 10)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of retries is hardcoded for now.

@wvanbergen
Copy link
Contributor Author

Outstanding question: in the functional test, the retry attempts and backoff are set to very high values right now. Normally the default values are more than enough, but during the first call to Coordinator, Kafka actually creates the __consumer_offsets topic, which can take a long time. Any subsequent call to the function will be a lot faster.

I am not sure if this warrants using different configuration values. On Travis CI, this initial call takes about 4 seconds.

@wvanbergen wvanbergen force-pushed the coordinator branch 3 times, most recently from 300b948 to 9596184 Compare April 9, 2015 10:03
@wvanbergen
Copy link
Contributor Author

Note: this only works against Kafka 0.8.2 and higher.

@eapache
Copy link
Contributor

eapache commented Apr 9, 2015

  • For simplicity, let's stick to just one lock for now. We can add fine-grained locking later if it turns out to be a performance bottleneck, but I doubt it.
  • Is there a reason not to follow the same pattern as Leader here, with a cachedCoordinator function that returns the (opened) broker and a refreshCoordinator function that just updates the maps and doesn't return the broker directly?
  • For retries, perhaps it is worth detecting the "need to create the topic" case (what error is that?) and adding a one-time separately configurable back-off of 5 seconds for that case, and otherwise using the metadata retries/backoffs. I am alternatively OK with just creating entirely separate retry/backoff configs for this path.

@wvanbergen
Copy link
Contributor Author

  1. will do.

  2. I think this is simpler and separates concerns better. refreshCoordinator just cares about executing the request and has no side effects, and doesn't have to care about cache. Coordinator deals with the cache.

  3. Let me investigate what happens when you create a second consumergroup. If my understanding is correct, that will return the same error (ErrConsumerCoordinatorNotAvailable), but it will be functional much faster because it doesn't need to create the topic. I will report back later.

@eapache
Copy link
Contributor

eapache commented Apr 9, 2015

  1. It just seems weird to have the broker cache updated in the refresh method but the coordinator cache updated in the Coordinator method - I feel like those updates should live together.

The concern I separated with the Leader design is lazy open - only cachedLeader has to worry about opening the broker, since it is the only thing that returns a broker.

Also, nothing uses the invalidateCoordinator method?

@wvanbergen
Copy link
Contributor Author

The invalidateCoordinator will be used by methods that use the coordinator and figure out that it actually is not the correct one.

@eapache
Copy link
Contributor

eapache commented Apr 9, 2015

The invalidateCoordinator will be used by methods that use the coordinator and figure out that it actually is not the correct one.

No no no, this is the same mistake I made with disconnectBroker :(


// If the number of partitions is large, we can get some churn calling cachedPartitions,
// so the result is cached. It is important to update this value whenever metadata is changed
cachedPartitionsResults map[string][maxPartitionIndex][]int32
lock sync.RWMutex // protects access to the maps, only one since they're always written together

lock sync.RWMutex // protects access to the metadata and coordinator maps
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and broker map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and cachedPartitionsResults map

@wvanbergen
Copy link
Contributor Author

Updated refreshCoordinator to never register or open the broker, Coordinator now always does these tasks.

@wvanbergen
Copy link
Contributor Author

Remind me again why that caused issues for disconnectBroker?

@wvanbergen
Copy link
Contributor Author

[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_0 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_0 is #9094 (localhost:9094).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9094
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_1 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_1 is #9095 (localhost:9095).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9095
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_2 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_2 is #9091 (localhost:9091).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9091
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_3 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_3 is #9092 (localhost:9092).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9092
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_4 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_4 is #9093 (localhost:9093).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9093
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_5 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_5 is #9094 (localhost:9094).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9094
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_6 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_6 is #9095 (localhost:9095).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9095
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_7 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_7 is #9091 (localhost:9091).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9091
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_8 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_8 is #9092 (localhost:9092).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9092
[sarama] 2015/04/09 15:53:45 client/coordinator Requesting coordinator for consumergoup another_new_consumer_group_9 from localhost:9091.
[sarama] 2015/04/09 15:53:45 client/coordinator Coordinator for consumergoup another_new_consumer_group_9 is #9093 (localhost:9093).
[sarama] 2015/04/09 15:53:45 Connected to broker localhost:9093

Looks like for all subsequent attempts for new consumergroups, the call succeeds immediately. So ErrConsumerCoordinatorNotAvailable is only returned when the topic is being created. A separate longer sleep call when this happens sounds like a good idea. 👍

@eapache
Copy link
Contributor

eapache commented Apr 9, 2015

Remind me again why that caused issues for disconnectBroker?

#15
#23 (comment)

Basically I made it private but used it from outside the client. The semantics were confused. Lots of things.

The solution finally boiled down to two different tasks the user might want to accomplish:

  • bounce a broker connection if e.g. it's timed out
  • refresh metadata if the leadership has moved

The first can already be accomplished (now that we have lazy connections) by just calling Close() on the broker and asking for it again. The second can be accomplished (in the leader case) by calling RefreshMetadata().

So based on all that, I think we should add RefreshCoordinator(topic) error as a public client method (and get rid of invalidateCoordinator).

edit, summary: invalidateCoordinator, like disconnectBroker is just a way to flag "please do a refresh the next time I ask for this broker". It is simpler and more powerful and easier to grok if we just expose a method that is "do the refresh directly"

@wvanbergen
Copy link
Contributor Author

Removed invalidateCoordinator, added RefreshCoordinator.

@wvanbergen
Copy link
Contributor Author

Some final 👀 ?

@@ -42,6 +43,14 @@ type Client interface {
// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)

// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
// value if it's available. This may be stale, in which case you should call RefreshCoordinator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the UX of saying "this may be stale", I think the necessary information is conveyed with just "it's locally cached".

return err
}

coordinator := &Broker{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once #413 lands this can go away

@@ -42,6 +42,14 @@ type Client interface {
// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)

// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
// value if it's available. This value is cached locally; you can call RefreshCoordinator to update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to says it's cached locally twice :)

@wvanbergen
Copy link
Contributor Author

@eapache final 👀

// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance.
func (client *client) registerBroker(broker *Broker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth mentioning this must be called with the lock held in the comment?

@eapache
Copy link
Contributor

eapache commented Apr 10, 2015

Very minor nits, but LGTM. Squash it and :shipit:

… retrieve the coordinating broker for a consumer group.
wvanbergen added a commit that referenced this pull request Apr 10, 2015
Add Client.Coordinator() to retrieve the coordinating broker for a consumer group
@wvanbergen wvanbergen merged commit 34f21c8 into master Apr 10, 2015
@wvanbergen wvanbergen deleted the coordinator branch April 10, 2015 21:26
@zaynetro
Copy link

Note: this only works against Kafka 0.8.2 and higher.

Damn, I spent hours trying to get Coordinator to work on v0.8.1.1. Maybe, it should be added to the docs?

@wvanbergen
Copy link
Contributor Author

@zaynetro #453

@zaynetro
Copy link

Great!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants