Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer stops publishing messages after a few hours #377

Closed
4 of 7 tasks
natemurthy opened this issue Oct 1, 2019 · 13 comments
Closed
4 of 7 tasks

Producer stops publishing messages after a few hours #377

natemurthy opened this issue Oct 1, 2019 · 13 comments

Comments

@natemurthy
Copy link

natemurthy commented Oct 1, 2019

Description

I have a Kafka client using the confluent-kafka-go library. The client streams batches of messages from another I/O library and sends them to a Kafka topic on a separate goroutine. After several hours, the Kafka producer appears to stop publishing even though the client observes the messages as sent.

The screenshot below is what the client sees. We increment a gauge for each time (*kafka.Producer).Produce() is called to generate a 1-minute moving average of message publication attempts per second:

image

You can see this client is producing at a steady-state of roughly 24 msg/sec. Now this screenshot is from the JMX metrics exported from the Kafka broker for the topic to which the above message should be written:

image

You can observe the messages in per second begin to taper off around 17:15 and eventually goes completely to zero after 17:45 even though the client continues to publish at a steady rate.

What's strange is that if I pass in a non-nil delivery channel to (*kafka.Producer).Produce() to and block on it, then I can get my messages through, but this comes at a huge throughput penalty as these producer calls need to be asynchronous.

How to reproduce

Implement a loop that calls kp (of type kafka.Producer) on a separate goroutine like so:

func handler(stream xyz.Stream) {
    for  {
        batch, _ := stream.Recv()
        go publish(batch)
    }
}

func publish(batch []kafka.Message) {
    for _, msg := range batch {
        metric.KafkaProduceAttemps.Inc()
        kp.Produce(msg, nil)
    }
}

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version: 1.1.0
  • Apache Kafka broker version: 2.1.1
  • Client configuration:ConfiMap{"go.events.channel.size":0, "go.produce.channel.size":0}
  • Operating system: Linux Debian Stretch (slim)
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

edenhill commented Oct 2, 2019

Thanks for a great report!

How are you handling the delivery reports? The Events() channel needs to be served at all times to avoid filling up and blocking further produces.

Your code snippet does not handle Produce() errors, is your real code doing that, and if so, are you seeing any errors when the produce rate drops?

@natemurthy
Copy link
Author

natemurthy commented Oct 2, 2019

@edenhill you bet :) just want to make your job easy

The current code does not handle delivery reports (i think there's a way of disabling this using "go.delivery.reports" : false, correct?), just needs to support fire-and-forget.

And good observation: the snippet above does not handle Produce() errors, but we are checking for err != nil and logging them in our actual production code. Mysteriously enough, we do not see any errors when the produce rate drops.

@edenhill
Copy link
Contributor

edenhill commented Oct 2, 2019

Yes, delivery reports may be disabled, but it is not recommended, an application should care if messages are produced, if nothing else just to catch config errors.

@edenhill
Copy link
Contributor

edenhill commented Oct 2, 2019

To troubleshoot this issue I recommend doing two things:

  • enable delivery reports and serve the Events() channel continously, propagate any delivery errors appropriately (log or whatever)
  • check produce() for errors.

@natemurthy
Copy link
Author

natemurthy commented Oct 3, 2019

Is selecting over the Events() channel equivalent to selecting over the deliveryChan if passed into Produce()?

Another reason I'm discarding delivery reports is because there is some complexity that I haven't managed to get around when synchronizing between the producer's event poller and the goroutine selecting from the delivery channel to avoid crashing the application as such:

panic: send on closed channel

goroutine 308 [running]:
REDACTED/xyz/my-app-name/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka.(*handle).eventPoll(0xc001f10f90, 0xc001fc32c0, 0x0, 0x3e8, 0xc001fc3380, 0x0, 0x0, 0xc0021ff200)
	/go/src/REDACTED/xyz/my-app-name/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/event.go:296 +0x56e
REDACTED/xyz/my-app-name/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka.poller(0xc001f10f80, 0xc001fc3380)
	/go/src/REDACTED/xyz/my-app-name/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:548 +0x75
created by REDACTED/xyz/my-app-name/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka.NewProducer
	/go/src/REDACTED/xyz/my-app-name/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:458 +0x53c

This occurs if the snippet above is modified like so:

func handler(stream xyz.Stream) {
    deliveryChan := make(chan kafka.Event)

    go func() {
        for {
            select {
            case event := <-deliveryChan:
                switch ev := event.(type) {
                case *kafka.Message:
                    m := ev
                    if m.TopicPartition.Error != nil {
                        // log the error
                    }
                    stream.Send(response(m)) // generate a response from the kafka message
            case <-stream.Context().Done():
                kp.Flush(100)
                return
            }
        }
    }()

    for  {
        batch, _ := stream.Recv()
        go publish(batch, deliveryChan)
    }

    defer close(deliveryChan)
}

func publish(batch []kafka.Message, ch chan kafka.Event) {
    for _, msg := range batch {
        metric.KafkaProduceAttemps.Inc()
        kp.Produce(msg, ch)
    }
}

I suspect there is a race between closing the delivery channel and the producer's underlying channel writing operations.

@edenhill
Copy link
Contributor

edenhill commented Oct 7, 2019

Right, you will need to wait for all outstanding messages delivery reports on the deliveryChan before closing it since there is no way for the Go client to know if your channel was closed or not.

@edenhill
Copy link
Contributor

edenhill commented Oct 7, 2019

If you don't pass a deliveryChan to Produce() the delivery reports will be emitted on the Events() channel instead. The Events() channel may also return other event types, such as Error.

@natemurthy
Copy link
Author

natemurthy commented Oct 7, 2019

Right, you will need to wait for all outstanding messages delivery reports on the deliveryChan before closing it since there is no way for the Go client to know if your channel was closed or not.

Is there a goroutine-safe way to know whether all outstanding messages on the deliveryChan have been sent (I could check len(deliveryChan)) OR block any goroutines from writing to it before calling close()?

Update: I've been able to run my producer with go.delivery.reports: false for the last 4 days without it stopping. So I'd be fine closing out this issue as a duplicate of #251.

@edenhill
Copy link
Contributor

edenhill commented Oct 9, 2019

You would have to keep track of the number of Produce() calls using the deliveryChan, since there will be a 1:1 relation between Produce() and Message events on the channel.
len() or similiar will not be safe since it will give you 0 if a message is in flight to the broker, by the time the ack is received and the client tries to send the Message event on the channel it might be closed.

@edenhill
Copy link
Contributor

edenhill commented Oct 9, 2019

We generally do not recommend disabling delivery reports since you will have no indication if messagses are produced or not, a simple configuration error could mean all your messages are lost.

@natemurthy
Copy link
Author

natemurthy commented Oct 9, 2019

Will add some logic to make sure Produce() and deliveryChan are properly synchronized based on that 1:1 relation in places where we need the reports.

For this application, we're using acks = 0 so we can tolerate some data loss as these are non-critical events, just need high-throughput.

Thanks for your help walking through this with me :) Closing this out.

@edenhill
Copy link
Contributor

nit: 'acks=0' means you will tolerate complete data loss without any indication of error. You will always want at least acks=1 (but preferably acks=all).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants