Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto update the client to handle changes in number of partitions #221

Merged
merged 6 commits into from Apr 14, 2020

Conversation

merlimat
Copy link
Contributor

Motivation

Client should automatically discover new partitions as they're added.

@merlimat merlimat added this to the 0.2.0 milestone Apr 14, 2020
@merlimat merlimat self-assigned this Apr 14, 2020
}
consumer.ticker = time.NewTicker(duration)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only start this if the topic is partitioned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that the TopicPartitions() will just return a list, so we don't necessarely know if it's 1 partition or no partitions. In any case, if the topic is not partitioned, it would not be possible to create a partitioned topic with same name.

pulsar/consumer_impl.go Outdated Show resolved Hide resolved
p.producers = make([]Producer, numPartitions)
p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this run for non-partitioned topics as well? Is that ok?

pulsar/producer_impl.go Outdated Show resolved Hide resolved
return p, nil
}

func (p *producer) internalCreatePartitionsProducers() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as consumers

@jerrypeng
Copy link
Contributor

General question about how locks are handled in Go, when multiple threads are waiting on a lock. In what order are the locks released? FIFO? Or just everyone attempts to acquire and first one wins? I am asking because I am wondering if there are going to be any thread starvation going on for the thread that periodically checks for new partitions especially when "sendAsync" can be called quite frequently.

@merlimat
Copy link
Contributor Author

Yes, I was trying to avoid getting the extra mutex on the write path, though I think it shouldn't be too bad.

when multiple threads are waiting on a lock. In what order are the locks released? FIFO?

I haven't really seen it specified, though a mutex always should include some sort of queuing for fairness. I'm not worried about that aspect. Also, the thread that updates the list of producers is not "critical" in the sense that it only needs to get executed eventually, with no hard time bound.

@merlimat merlimat merged commit a575681 into apache:master Apr 14, 2020
@merlimat merlimat deleted the auto-update-partitions branch April 16, 2020 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants