Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush should not wait for queue buffering time #1013

Closed
4 of 7 tasks
jdemeyer opened this issue Jun 22, 2023 · 2 comments · Fixed by #1021
Closed
4 of 7 tasks

Flush should not wait for queue buffering time #1013

jdemeyer opened this issue Jun 22, 2023 · 2 comments · Fixed by #1021
Assignees
Labels

Comments

@jdemeyer
Copy link
Contributor

Description

This is basically confluentinc/librdkafka#3489 for confluent-kakfa-go.

Flush() should ignore queue.buffering.max.ms and send queued messages immediately. The Go wrapper manually implements Producer.Flush() instead of using rd_kafka_flush, so it didn't benefit from the above fix.

How to reproduce

package main

import (
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "time"
)

func main() {
        _, version := kafka.LibraryVersion()
        fmt.Printf("librdkafka %s\n", version)

        mockCluster, err := kafka.NewMockCluster(2)
        if err != nil {
                panic(err)
        }

        cfg := kafka.ConfigMap{
                "bootstrap.servers":      mockCluster.BootstrapServers(),
                "queue.buffering.max.ms": 1000,
        }
        p, err := kafka.NewProducer(&cfg)
        if err != nil {
                panic(err)
        }

        topic := "test-topic"
        msg := kafka.Message{
                TopicPartition: kafka.TopicPartition{
                        Topic: &topic,
                },
        }
        err = p.Produce(&msg, nil)
        if err != nil {
                panic(err)
        }

        // Consume all producer events
        go func() {
                for range p.Events() {
                }
        }()

        startTime := time.Now()
        n := p.Flush(2000)
        elapsed := time.Since(startTime)
        fmt.Printf("flush took %.3fs, %d messages remaining\n", elapsed.Seconds(), n)
}

This outputs something like

flush took 1.102s, 0 messages remaining

indicating that it's waited for the queue.buffering.max.ms time to pass before queued messages were actually flushed.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 2.1.1
  • Apache Kafka broker version: librdkafka mock cluster
  • Client configuration: see code above, use a large value for queue.buffering.max.ms
  • Operating system: GNU/Linux
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@jdemeyer
Copy link
Contributor Author

My actual use case is with a transactional producer: my application requires a mutex lock while committing a transaction (because it has to also update some state in the application). And to avoid the mutex being locked for too long, I call Flush() first without the mutex.

@milindl
Copy link
Contributor

milindl commented Jul 7, 2023

Thank you for reporting this @jdemeyer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants