Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Variable events buffer #17

Closed

Conversation

EVODelavega
Copy link

Having the maxEvents param in the c.handle.eventPoll call hard-coded to 1000 seems wrong, given that the buffer size can be changed through config parameters, or the channel can be full. In case the maxEvents is 0, perhaps a sleep should be added?

@ghost
Copy link

ghost commented Dec 6, 2016

Hey @EVODelavega,
thank you for your Pull Request.

It looks like you haven't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@EVODelavega
Copy link
Author

[clabot:check]

@ghost
Copy link

ghost commented Dec 6, 2016

@confluentinc It looks like @EVODelavega just signed our Contributor License Agreement. 👍

Always at your service,

clabot

_, term := c.handle.eventPoll(
c.events,
100,
eventsBufferSize-len(c.events), // max events depends on available buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must check that the computed value is > 0 otherwise this will just busy-loop.

@@ -329,15 +329,20 @@ func (c *Consumer) rebalance(ev Event) bool {
// consumerReader reads messages and events from the librdkafka consumer queue
// and posts them on the consumer channel.
// Runs until termChan closes
func consumerReader(c *Consumer, termChan chan bool) {
func consumerReader(c *Consumer, termChan chan bool, eventsBufferSize int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it the same thing as in caller, eventsChanSize

@@ -287,10 +287,9 @@ func channelProducer(p *Producer) {

// channelBatchProducer serves the ProduceChannel channel and attempts to
// improve cgo performance by using the produceBatch() interface.
func channelBatchProducer(p *Producer) {
func channelBatchProducer(p *Producer, batchSize int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont believe there is any use for synchronizing the batch size to the channel size:
since the channel is read message-by-message it is possible (likely and desired) to have a batch larger than the channel size.

The experimental batch interface aims to cut down on the number of C calls (since cgo has a high overhead), there is thus no logical correlation to the channel size.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, but what if I configure the publish stack to be greater than the default? ie: have batchSize default to the current value, but increase it depending on the config value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it is warranted, this is only an internal optimization in the Go bindings, this batching does not correlate to the application nor librdkafka internals or Kafka protocol batches.

@edenhill
Copy link
Contributor

Please revert and rebase your branch to avoid revert-commits.

Something like this, after you've reverted the commits:

$ git rebase -i master

# Mark all follow-up fixes as   f   (fixup) and put them under your top-level commit

# Be careful to only use --force on PR branches, never on long-lived branches (such as master)
$ git push --force origin <your_pr_branch>

@EVODelavega
Copy link
Author

@edenhill Done (twice, because I forgot to sync with upstream master 😄 )

_, term := c.handle.eventPoll(
c.events,
100,
eventsChanSize - len(c.events), // max events depends on available buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must check that this doesn't reach 0 or it will busy loop.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which would be the preferred way to handle this? sleep for 100ms, default to eventsChanSize, or poll for a single event?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's back up a bit; what is the original problem you were seeing prior to this fix?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, considering you can configure the channel size to less than 1000, the current call in consumer.go (_, term := c.handle.eventPoll(c.events, 100, 1000, termChan)) doesn't quite feel right. If I want to buffer 10,000 events, then surely that should be doable in a single call (rather than 10 calls as is the case now). Similarly, if I set the buffer to 10, it doesn't make sense to have eventPoll loop with a max of 1000 events. Sure, either the channel will be full, or there won't be any messages left to consume, but it feels/looks a bit odd to my eye

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree on the channelSize > pollCnt case: we should try to fetch as many events as possible to fill the channel buffer, but I think we should be more careful with the reverse where the channel buffer size is smaller than the poll count.
For performance we want to avoid idle periods and I imagine that a small channelSize and pollCnt will result in idle periods when the application side is as fast as the Go client side:

  • Loop 1: Go client polls 100 entries from librdkafka and enqueues them on channel.
  • Loop 2: Go client calculates that pollCnt - len(chan) == 0 and backs off with a delay
  • Application reads all events from the channel
  • Idle period while loop 2's backoff delay times out..
  • Go to loop 1

Compare this to the current behaviour:

  • Loop 1: Go client polls 100 entries from librdkafka and enqueues them on channel.
  • Loop 2: Go client pools another 100 entries from librdkafka but blocks on the first channel produce since the channel is full.
  • Application reads all events from the channel
  • Go client immediately unblocks and when channel space is available
  • Go to loop 1

There's probably a golden ratio where the pollCnt is some multiple of channelSize, but this depends on actual usage, so I feel the current approach with simply blocking provides the necessary and desired backpressure mechanism.
Having said that there is no point in polling 1000 events if the channelSize is 2, so some reasonable multiplier should be in place.
Maybe something like: pollCnt = math.Max(channelSize * 3, 100)

@edenhill edenhill closed this Feb 13, 2018
@lintang0 lintang0 mentioned this pull request Jun 18, 2018
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants