Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async producer can overflow itself at high rate #805

Closed
bobrik opened this issue Jan 10, 2017 · 26 comments
Closed

Async producer can overflow itself at high rate #805

bobrik opened this issue Jan 10, 2017 · 26 comments

Comments

@bobrik
Copy link
Contributor

bobrik commented Jan 10, 2017

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: b1da175
Kafka Version: Irrelevant
Go Version: b1da175

Configuration

What configuration values are you using for Sarama and Kafka?

Sarama async producer:

	conf := sarama.NewConfig()
	conf.Metadata.Retry.Max = 1
	conf.Metadata.Retry.Backoff = 250 * time.Millisecond
	conf.Producer.RequiredAcks = sarama.RequiredAcks(sarama.WaitForLocal)
	conf.Producer.Timeout = 1 * time.Second
	conf.Producer.MaxMessageBytes = 16 << 20 // 16MB
	conf.Producer.Flush.Bytes = 16 << 20 // 16MB
	conf.Producer.Flush.Frequency = time.Minute
	conf.Producer.Compression = sarama.CompressionNone // otherwise Kafka goes nuts
	conf.Producer.Return.Errors = true
	conf.Producer.Partitioner = NewIdentityPartitioner
Logs

Sarama logs:

2017-01-09T23:30:21.504 myhost 2017/01/09 23:30:19 Kafka producer err: kafka: Failed to produce message to topic requests: kafka server: Message was too large, server rejected it to avoid allocation error.
Problem Description

Problem is in this function:

We produce messages at such rate (100K/s+) so select always picks up processing of new incoming messages over rolling over and flushing existing ones.

I applied to following patch:

diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index e7ae8c2..13a888b 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"fmt"
+	"log"
 	"sync"
 	"time"
 
@@ -249,6 +250,7 @@ func (p *asyncProducer) dispatcher() {
 		}
 
 		if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
+			log.Printf("Got message size bigger than allowed max message bytes %d > %d", msg.byteSize(), p.conf.Producer.MaxMessageBytes)
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 		}
@@ -577,9 +579,12 @@ func (bp *brokerProducer) run() {
 	var output chan<- *produceSet
 	Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
 
+	wasReadyTimes := 0
+
 	for {
 		select {
 		case msg := <-bp.input:
+			log.Println("INPUT MESSAGE")
 			if msg == nil {
 				bp.shutdown()
 				return
@@ -625,14 +630,23 @@ func (bp *brokerProducer) run() {
 				bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
 			}
 		case <-bp.timer:
+			log.Println("TIMER FIRED")
 			bp.timerFired = true
 		case output <- bp.buffer:
+			wasReadyTimes = 0
+			log.Println("ROLL OVER")
 			bp.rollOver()
 		case response := <-bp.responses:
+			log.Println("HANDLING RESPONSE")
 			bp.handleResponse(response)
 		}
 
 		if bp.timerFired || bp.buffer.readyToFlush() {
+			log.Println("READY TO FLUSH YAY")
+			wasReadyTimes++
+			if wasReadyTimes > 10 {
+				log.Fatal("We were ready for a long time, but it did not happen. Exiting.")
+			}
 			output = bp.output
 		} else {
 			output = nil
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index 9fe5f79..91a127f 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -1,6 +1,9 @@
 package sarama
 
-import "time"
+import (
+	"log"
+	"time"
+)
 
 type partitionSet struct {
 	msgs        []*ProducerMessage
@@ -147,6 +150,7 @@ func (ps *produceSet) readyToFlush() bool {
 		return true
 	// If we've passed the byte trigger-point
 	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
+		log.Printf("ready to flush because buffer bytes are big enough: %d >= %d", ps.bufferBytes, ps.parent.conf.Producer.Flush.Bytes)
 		return true
 	default:
 		return false

The output looks like this: https://gist.github.com/bobrik/27071d61d5ec98ed15ffd1cb5331f3f4.

Without log.Fatal I've seen buffer sizes go as high as 40MB, which is bigger than message.max.bytes=33554432 in our cluster.

The solution is probably to do rollOver outside of select.

@bobrik
Copy link
Contributor Author

bobrik commented Jan 10, 2017

Ok, it looks like sarama.MaxRequestSize is 100MB by default (and it's a global setting), and this was preventing sarama from applying back pressure sooner. Setting it under message.max.bytes solves the issue for us.

Still, this doesn't seem right. Message size back pressure should be controlled by sarama.Config.Producer.MaxMessageBytes, not by sarama.MaxRequestSize.

@eapache
Copy link
Contributor

eapache commented Jan 10, 2017

sarama.MaxRequestSize and sarama.Config.Producer.MaxMessageBytes are completely different values that control different aspects of Kafka.

sarama.Config.Producer.MaxMessageBytes is equivalent to message.max.bytes on the broker and refers to the maximum size of a kafka message, and has nothing to do with network wire size except in as much that messages get put on the wire in some form. Messages that are too large are rejected immediately by Sarama before they even get counted towards backpressure (https://github.com/Shopify/sarama/blob/master/async_producer.go#L258-L260). After that point the value is only used in a few subtle cases to do with the way kafka frames compressed messages.

sarama.MaxRequestSize is equivalent to max.request.size in the java version and pairs with socket.request.max.bytes on the broker, and refers to the maximum size of the network packet on the wire. Since you can't split a kafka message across network boundaries, this also serves as an indirect cap on the size of a kafka message, though usually it is several orders of magnitude larger and so irrelevant.

@bobrik
Copy link
Contributor Author

bobrik commented Jan 10, 2017

sarama.MaxRequestSize and sarama.Config.Producer.MaxMessageBytes are completely different values that control different aspects of Kafka.

There's no doubt in that.

sarama.Config.Producer.MaxMessageBytes is equivalent to message.max.bytes on the broker and refers to the maximum size of a kafka message

Yes, but the constraint of maximum message size gets violated, because no backpressure is applied when message size crosses this boundary. This is exactly the issue. I set sarama.Config.Producer.MaxMessageBytes to 16MB and get 40MB messages rejected by Kafka.

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

the constraint of maximum message size gets violated, because no backpressure is applied when message size crosses this boundary

When not using compression (your ticket specifies sarama.CompressionNone), produce rate and backpressure has nothing to do with message size at all. I think the confusion here is the over-use of the word "message". We're talking about two different kinds of messages and getting them mixed up:

  • Kafka is a "message" broker; kafka messages are produced to a partition of a topic, and end up at a specific offset. Kafka messages cannot be larger than sarama.Config.Producer.MaxMessageBytes or message.max.bytes. Batching and backpressure are irrelevant when calculating this size. This is the constraint being violated according to the error you provided.

  • Network messages containing multiple kafka messages are sent between the producer and the brokers. Network messages cannot be larger than sarama.MaxRequestSize or socket.request.max.bytes. Batching and backpressure are important here; if a large enough request is accumulated we cannot accept more kafka messages until the current network message has been delivered.

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

I notice that you have the following configuration values:

conf.Producer.MaxMessageBytes = 16 << 20 // on client
message.max.bytes=33554432 // on broker

Since these values are equivalent they are usually set to the same value.

@abraithwaite
Copy link

abraithwaite commented Jan 11, 2017

Just getting into investigating here, but I believe what we're trying to say is that because we have such a high rate of production on one topic-partition, the first statement in the select is always being run and the following lines don't have a chance to run:

https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L636-L637

Since those don't have a chance to run, we're never rolling over the buffer, and it grows too large.

@bobrik , please correct me if I'm wrong. Still looking into it more though.

I think the fact that setting sarama.MaxRequestSize under max.message.bytes makes it work is probably just a coincidence, so we'll revert that.

Also, I'm auditing our producers to ensure that they've all got MaxMessageBytes <= message.max.bytes on the broker. Thanks for that tip @eapache (I see now it's in the docs, but we must have looked over it. Long time users of the library means it was set ages ago 😉)

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

If your produce rate is that high, you should still hit a hard block when wouldOverflow triggers (https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L619-L624) and the call to waitForSpace will force the rollover to happen.

@abraithwaite
Copy link

Thanks. I'll update our patch to ensure we're hitting that condition and get back.

@bobrik
Copy link
Contributor Author

bobrik commented Jan 11, 2017

@eapache what you say sounds good in theory, but it's just now how I see it working in real life.

Consider the following patch:

diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index e7ae8c2..27c2f10 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -2,6 +2,7 @@ package sarama

 import (
        "fmt"
+       "log"
        "sync"
        "time"

@@ -249,6 +250,7 @@ func (p *asyncProducer) dispatcher() {
                }

                if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
+                       log.Printf("Got message size bigger than allowed max message bytes %d > %d", msg.byteSize(), p.conf.Producer.MaxMessageBytes)
                        p.returnError(msg, ErrMessageSizeTooLarge)
                        continue
                }
@@ -577,6 +579,8 @@ func (bp *brokerProducer) run() {
        var output chan<- *produceSet
        Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())

+       maxMessageSize := 0
+
        for {
                select {
                case msg := <-bp.input:
@@ -585,6 +589,11 @@ func (bp *brokerProducer) run() {
                                return
                        }

+                       if msg.byteSize() > maxMessageSize {
+                               maxMessageSize = msg.byteSize()
+                               log.Printf("Biggest message seen so far: %d bytes", maxMessageSize)
+                       }
+
                        if msg.flags&syn == syn {
                                Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
                                        bp.broker.ID(), msg.Topic, msg.Partition)
@@ -610,6 +619,7 @@ func (bp *brokerProducer) run() {
                        }

                        if bp.buffer.wouldOverflow(msg) {
+                               log.Println("!!! OVERFLOW !!!")
                                if err := bp.waitForSpace(msg); err != nil {
                                        bp.parent.retryMessage(msg, err)
                                        continue
@@ -627,6 +637,7 @@ func (bp *brokerProducer) run() {
                case <-bp.timer:
                        bp.timerFired = true
                case output <- bp.buffer:
+                       log.Printf("Roll over, bufer size: %d", bp.buffer.bufferBytes)
                        bp.rollOver()
                case response := <-bp.responses:
                        bp.handleResponse(response)
@@ -634,6 +645,7 @@ func (bp *brokerProducer) run() {

                if bp.timerFired || bp.buffer.readyToFlush() {
                        output = bp.output
+
                } else {
                        output = nil
                }
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index 9fe5f79..4886032 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -147,6 +147,7 @@ func (ps *produceSet) readyToFlush() bool {
                return true
        // If we've passed the byte trigger-point
        case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
+               // log.Printf("ready to flush because buffer bytes are big enough: %d >= %d", ps.bufferBytes, ps.parent.conf.Producer.Flush.Bytes)
                return true
        default:
                return false

The config looks like this:

	conf.Producer.MaxMessageBytes = 33554432
	conf.Producer.Flush.Bytes = 16 << 20

Let's look at the output:

2017/01/11 21:23:37 Biggest message seen so far: 26 bytes
2017/01/11 21:23:37 Biggest message seen so far: 1174 bytes
2017/01/11 21:23:37 Biggest message seen so far: 1334 bytes
2017/01/11 21:23:37 Biggest message seen so far: 1382 bytes
2017/01/11 21:23:37 Biggest message seen so far: 1510 bytes
2017/01/11 21:23:37 Biggest message seen so far: 1630 bytes
2017/01/11 21:23:37 Biggest message seen so far: 2094 bytes
2017/01/11 21:23:37 Biggest message seen so far: 2430 bytes
2017/01/11 21:23:37 Biggest message seen so far: 4510 bytes
2017/01/11 21:23:37 Biggest message seen so far: 4606 bytes
2017/01/11 21:23:37 Biggest message seen so far: 4990 bytes
2017/01/11 21:23:37 Biggest message seen so far: 5414 bytes
2017/01/11 21:23:37 Biggest message seen so far: 6414 bytes
2017/01/11 21:23:37 Biggest message seen so far: 6814 bytes
2017/01/11 21:23:37 Roll over, bufer size: 16778270
2017/01/11 21:23:37 Biggest message seen so far: 7526 bytes
2017/01/11 21:23:37 Biggest message seen so far: 7718 bytes
2017/01/11 21:23:37 Roll over, bufer size: 46134566
2017/01/11 21:23:37 Biggest message seen so far: 10918 bytes
2017/01/11 21:23:37 Biggest message seen so far: 11854 bytes
2017/01/11 21:23:37 !!! OVERFLOW !!!
2017/01/11 21:23:37 Biggest message seen so far: 20406 bytes
2017/01/11 21:23:37 !!! OVERFLOW !!!
panic: kafka: Failed to produce message to topic requests: kafka server: Message was too large, server rejected it to avoid allocation error.

goroutine 86 [running]:
panic(0x6c4cc0, 0xc423cca4e0)
	/usr/local/Cellar/go/1.7.4_1/libexec/src/runtime/panic.go:500 +0x1a1
code.cfops.it/data/replicator.errLoop(0xc4200a6a20)
	/code/replicator/replicator.go:122 +0x76
created by code.cfops.it/data/replicator.New
	/code/replicator/replicator.go:63 +0x3e2

So my bugger message was around 20kb, but Kafka still complains about too big messages.

Limiting sarama.MaxRequestSize to 30mb (just under message.max.bytes) makes it work.

@abraithwaite select{} running just for incoming messages is in fact fine, there is code for backpressure, it just doesn't engage.

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

Can you provide the config of the kafka broker please?

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

Also, to verify, this is all running still with conf.Producer.Compression = sarama.CompressionNone right?

@bobrik
Copy link
Contributor Author

bobrik commented Jan 11, 2017

Still running with sarama.CompressionNone.

Broker config:

broker.id.generation.enable=true


zookeeper.connect=zk.foo.bar.com:2181/kafka/http

zookeeper.connection.timeout.ms=30000

advertised.host.name=foo.bar.com
port=9092

controlled.shutdown.enable=true
auto.create.topics.enable=false
default.replication.factor=3
auto.leader.rebalance.enable=True
delete.topic.enable=true
message.max.bytes=33554432
num.io.threads=4
num.network.threads=16
num.partitions=3

log.dirs=/state/kafka/http
log.cleaner.enable=false
log.flush.interval.messages=10000000
log.flush.interval.ms=10000
log.retention.check.interval.ms=60000
log.retention.hours=6
log.segment.bytes=2000000000

num.replica.fetchers=4
replica.fetch.max.bytes=33554432
replica.fetch.wait.max.ms=500
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=1048576
replica.socket.timeout.ms=30000
replica.fetch.min.bytes=65535



socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
socket.send.buffer.bytes=1048576
log.message.format.version=0.9.0.1
broker.rack=218

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

Are any of those config values overridden for the specific topic you're using (https://kafka.apache.org/documentation/#topic-config)?

@bobrik
Copy link
Contributor Author

bobrik commented Jan 11, 2017

@eapache we don't have any topic level overrides:

Topic:requests	PartitionCount:84	ReplicationFactor:3	Configs:

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

I'm going to "think aloud". A few things don't line up:

  1. The largest message we're actually seeing (per your logs) is ~20KB but the maximum message size is configured to be much larger both on the client and the broker.
  2. The broker is returning MESSAGE_TOO_LARGE.
  3. Limiting the maximum request size (not the maximum message size) makes the MESSAGE_TOO_LARGE error go away.

1 is inconsistent with 2, since the broker should only return that error when the message we're sending is larger than the configured maximum.

2 is inconsistent with 3, since the maximum request size and the maximum message size have nothing to do with each other.

Also worth noting: there's nothing unusual about the config here, and while the message rate is high, Sarama has been benchmarked and stress-tested fairly thoroughly without anybody else reporting this issue.


At this point I can't even generate a consistent hypothesis which explains all of the facts. So let's get some more data and see if something pops:

  • Is there anything interesting in the broker logs when this occurs? A stack trace, or a log indicating the too-large message, or something?
  • Does anything happen differently if you change producerMessageOverhead from 26 to 34?

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

Also worth noting in the interest of completeness: the SHA you're running is a fairly old version at this point, but I don't think there have been any changes since that version which would affect this particular issue. Upgrading might be something to try; if nothing else it would give us another data point.

@eapache
Copy link
Contributor

eapache commented Jan 11, 2017

And in the interest of "more data won't hurt", could you provide the standard Sarama logs for one of these issues as well? You've provided some output of your custom logging statements, but the defaults (the ones going to sarama.Logger) have been fairly well-tuned for debugging by this point.

@abraithwaite
Copy link

abraithwaite commented Jan 11, 2017

We'll work on getting you some more information on this, but I do have one clarification question to ask regarding how the flush timing is handled:

If we check readyToFlush here:

https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L642

And we flush here:

https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L636-L637

The producer.Flush settings are described as best effort, but this seems a bit extreme. Why wouldn't we start putting back pressure earlier?

Seems like part of the reason we're seeing issues is because the Producer.Flush settings don't do much with high input message throughput. In my mind, sarama.MaxRequestSize should still do what it does (last resort), producer.Flush.{Bytes,Messages} should take next priority and flush directly after the threshold crossing, and then producer.Flush.Frequency should handle the trickle case, so is really the only one that should be considered best effort.

Gonna investigate how the other clients handle this though.

Edit: seems like the java producer sends immediately upon the threshold crossing for batch.size which I believe is analogous to producer.Flush.Messages

@bobrik
Copy link
Contributor Author

bobrik commented Jan 11, 2017

This is an experimental cluster and I tested compression on brokers at some point. I enabled debug logs on one broker with compression enabled and here's what I found:

2017-01-11T23:43:01.253 36ssds3 DEBUG [KafkaApi-1005] Produce request with correlation id 1 from client  on partition requests-38 failed due to org.apache.kafka.common.errors.RecordTooLargeException (kafka.server.KafkaApis)

Disabling compression.type on broker solves the issue. It's still not clear to me why that happens.

@bobrik
Copy link
Contributor Author

bobrik commented Jan 12, 2017

I guess we have this: https://issues.apache.org/jira/browse/KAFKA-1718

@eapache
Copy link
Contributor

eapache commented Jan 12, 2017

The producer.Flush settings are described as best effort, but this seems a bit extreme. Why wouldn't we start putting back pressure earlier?

The specific scenario in question is when:

  • a produce request is already in flight, so we can't send another (tcp pipelining would be a nice enhancement but is not on my roadmap for the near future)
  • we have enough new messages/bytes accumulated to trigger a flush condition, but not enough to max out the connection
  • we receive another message to process

Right now we just accept the message, since we can safely handle it. We could in principle start applying backpressure at this point, but all that would accomplish would be to artificially limit throughput, especially since the flush-trigger and the maximum-supported are usually substantially different.

the Producer.Flush settings don't do much with high input message throughput

Except for Flush.MaxMessages (which is a hard limit), Sarama will happily ignore the flush settings if respecting them would negatively affect throughput.

producer.Flush.{Bytes,Messages} should take next priority and flush directly after the threshold crossing

If they behave this way, Flush.Bytes would be effectively equivalent to sarama.MaxRequestSize and Flush.Messages would be exactly equivalent to Flush.MaxMessages (tangent: yes, I wish I could go back in time and make MaxRequestSize part of the config instead of a global, but the API is pretty much unchangeable now).

Honestly, those two config values (Flush.{Bytes, Messages}) are not usually useful in practice. Sarama's default as-fast-as-possible config is the best choice in almost all circumstances, the harder limits exist for those who need them, and Flush.Frequency exists, as you correctly point out, for the trickle case. If I ever did a v2 with API breaks I'd be half-tempted to remove them just to avoid confusion.

@eapache
Copy link
Contributor

eapache commented Jan 12, 2017

I guess we have this: https://issues.apache.org/jira/browse/KAFKA-1718

Yes, you have a new variant of that issue which seems to be caused by the broker doing compression even when the client sends uncompressed messages. Since the client is not compressing the messages at all, it has no way to even know to work around the issue. It may be worth filing another upstream ticket for the new bug and referencing the old one, since the new one is more problematic and harder to magically work around for the client.

For such high-throughput scenarios you probably want to enable client-side compression anyway, but if that doesn't work for you then setting sarama.MaxRequestSize to message.max.bytes will (as you have already discovered) work around the issue at the cost of slightly reduced maximum throughput.

@eapache
Copy link
Contributor

eapache commented Jan 12, 2017

Closing as this is an upstream bug.

I'm happy to keep discussing flush logic though if there are still questions around that.

@eapache eapache closed this as completed Jan 12, 2017
@abraithwaite
Copy link

abraithwaite commented Jan 12, 2017

Thanks for the thoughtful response. Although I have one quick follow up question:

How can I identify whether a produce request is in-flight in the code we've highlighted? I'm assuming that output <- bp.buffer blocks in that case, is that correct?

It'd be nice to try (nonblocking) output <- bp.buffer once after readyToFlush() triggers, and if it's not blocked, go for it.

As @bobrik mentioned, we found our issue after all so I guess this can be closed.

Thanks for all your help!

Edit: you beat me to it.

@eapache
Copy link
Contributor

eapache commented Jan 12, 2017

How can I identify whether a produce request is in-flight in the code we've highlighted? I'm assuming that output <- bp.buffer blocks in that case, is that correct?

Yup. There is a tiny bridge goroutine (https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L542-L553) which ranges over output, making one request at a time and dumping the response into responses. A request is effectively in flight from the time you've sent the buffer to output to the time you get a response back on responses, and writing to output will block during that time frame.

It'd be nice to try (nonblocking) output <- bp.buffer once after readyToFlush() triggers, and if it's not blocked, go for it.

This isn't needed, and I think stems from an earlier misunderstanding. You wrote:

because we have such a high rate of production on one topic-partition, the first statement in the select is always being run and the following lines don't have a chance to run

Which is simply not true. Per https://golang.org/ref/spec#Select_statements if multiple cases are ready one will be chosen at random. The fact that the input case is at the top will not starve the output in high-throughput scenarios.

@abraithwaite
Copy link

Ah, that's what I knew the behavior was, but then when I went to the docs:

For all the cases in the statement, the channel operands of receive operations and the channel and right-hand-side expressions of send statements are evaluated exactly once, in source order, upon entering the "select" statement.

And then stopped reading, confusing myself. Looks like I need to work on my RTFM skills a bit more again.

Thanks again for all the help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants