Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafkaProducer.Events() stops sending callback events, then kafkaProducer.ProduceChannel() fills and and becomes blocked indefinitely. #251

Closed
7 tasks
kurtostfeld opened this issue Nov 27, 2018 · 14 comments

Comments

@kurtostfeld
Copy link

Description

Sporadic issue where Kafka Go client works fine for days, then hits an issue where it stops sending callback events to kafkaProducer.Events(), then minutes later, kafkaProducer.ProduceChannel() becomes blocked and stops taking new messages. The Go app will never send or accept more messages until the Go app is manually restarted.

How to reproduce

It takes days of running in production for this issue to occur. It's happened multiple times. I don't have a way to reproduce easily.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): librdkafka1 version librdka0.11.6~1confluent5.0.1-
  • Apache Kafka broker version: Kafka 2.0.0. Confluent Platform 5.0.0.
  • Client configuration: &kafka.ConfigMap{ "bootstrap.servers": strings.Join(bootstrapServers,","), })
  • Operating system: Ubuntu 16.04.5
  • Provide client logs (with "debug": ".." as necessary): None.
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Interesting. And this is regardless of any error events in the cluster, such as a broker going down or partition leader becoming unavailable?

@kurtostfeld
Copy link
Author

If there is an issue within the Kafka cluster, it only affects a single client server at a time, and is remedied by a manual restart.

There are multiple server instances running the exact same code sending messages to the same Kafka cluster. One will hit this issue and require a manual restart to resume sending to the Kafka cluster, while the others will not experience any hiccups and run completely smoothly.

@trtg
Copy link

trtg commented Dec 5, 2018

@kurtostfeld have you had any luck debugging this issue?

@edenhill I am seeing a similar issue except more accelerated- producer stops working after a few hours running. I have a sample bit of code that reproduces the issue consistently. It's just a minimal webserver that writes the body of incoming requests into a topic. After a few million messages (5-6 million) the producer stops sending messages. Memory consumption looks fine (i'm monitoring the container this code runs in and there is plenty of memory free). I don't think I'm leaking goroutines- when i check pprof the number of goroutines is not growing, and in fact when the producers stop working, there are only 14 goroutines present according to pprof.

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	_ "net/http/pprof"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

type KafkaProxy struct {
	kafkaHandle *kafka.Producer
}

func (kProxy *KafkaProxy) handler(w http.ResponseWriter, r *http.Request) {
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		fmt.Println("problem reading body")
	}
	topic := "moments"
	kProxy.kafkaHandle.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          []byte(body),
	}, nil)
}

func main() {
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka.internal"})
	if err != nil {
		panic(err)
	}
	proxy := &KafkaProxy{kafkaHandle: p}
	http.HandleFunc("/", proxy.handler)
	fmt.Println("Listening on port 8177")
	log.Fatal(http.ListenAndServe(":8177", nil))
	defer proxy.kafkaHandle.Close()
}

Here is a stack trace of the active goroutines:

goroutine profile: total 11
6 @ 0x433a3b 0x42ed79 0x42e426 0x4945fa 0x49470d 0x495459 0x580a5f 0x593bc8 0x67549c 0x53f406 0x53f55f 0x6770e7 0x67a36e 0x460ed1
#	0x42e425	internal/poll.runtime_pollWait+0x65	/usr/lib/go/src/runtime/netpoll.go:173
#	0x4945f9	internal/poll.(*pollDesc).wait+0x99	/usr/lib/go/src/internal/poll/fd_poll_runtime.go:85
#	0x49470c	internal/poll.(*pollDesc).waitRead+0x3c	/usr/lib/go/src/internal/poll/fd_poll_runtime.go:90
#	0x495458	internal/poll.(*FD).Read+0x178		/usr/lib/go/src/internal/poll/fd_unix.go:169
#	0x580a5e	net.(*netFD).Read+0x4e			/usr/lib/go/src/net/fd_unix.go:202
#	0x593bc7	net.(*conn).Read+0x67			/usr/lib/go/src/net/net.go:177
#	0x67549b	net/http.(*connReader).Read+0xfb	/usr/lib/go/src/net/http/server.go:786
#	0x53f405	bufio.(*Reader).fill+0x105		/usr/lib/go/src/bufio/bufio.go:100
#	0x53f55e	bufio.(*Reader).Peek+0x3e		/usr/lib/go/src/bufio/bufio.go:132
#	0x6770e6	net/http.(*conn).readRequest+0xb26	/usr/lib/go/src/net/http/server.go:963
#	0x67a36d	net/http.(*conn).serve+0x49d		/usr/lib/go/src/net/http/server.go:1788

1 @ 0x408e1e 0x70237e 0x71094d 0x708d08 0x70ee18 0x460ed1
#	0x70237d	github.com/confluentinc/confluent-kafka-go/kafka._Cfunc__rk_queue_poll+0x4d		_cgo_gotypes.go:540
#	0x71094c	github.com/confluentinc/confluent-kafka-go/kafka.(*handle).eventPoll.func1+0x11c	/home/sebortiz/.go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v0.11.6/kafka/event.go:156
#	0x708d07	github.com/confluentinc/confluent-kafka-go/kafka.(*handle).eventPoll+0x167		/home/sebortiz/.go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v0.11.6/kafka/event.go:156
#	0x70ee17	github.com/confluentinc/confluent-kafka-go/kafka.poller+0x77				/home/sebortiz/.go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v0.11.6/kafka/producer.go:540

1 @ 0x433a3b 0x42ed79 0x42e426 0x4945fa 0x49470d 0x496a00 0x5813a2 0x59da1e 0x59c457 0x67f72f 0x69f2ac 0x67e55f 0x67e216 0x67f204 0x714217 0x433647 0x460ed1
#	0x42e425	internal/poll.runtime_pollWait+0x65		/usr/lib/go/src/runtime/netpoll.go:173
#	0x4945f9	internal/poll.(*pollDesc).wait+0x99		/usr/lib/go/src/internal/poll/fd_poll_runtime.go:85
#	0x49470c	internal/poll.(*pollDesc).waitRead+0x3c		/usr/lib/go/src/internal/poll/fd_poll_runtime.go:90
#	0x4969ff	internal/poll.(*FD).Accept+0x19f		/usr/lib/go/src/internal/poll/fd_unix.go:384
#	0x5813a1	net.(*netFD).accept+0x41			/usr/lib/go/src/net/fd_unix.go:238
#	0x59da1d	net.(*TCPListener).accept+0x2d			/usr/lib/go/src/net/tcpsock_posix.go:139
#	0x59c456	net.(*TCPListener).AcceptTCP+0x46		/usr/lib/go/src/net/tcpsock.go:247
#	0x67f72e	net/http.tcpKeepAliveListener.Accept+0x2e	/usr/lib/go/src/net/http/server.go:3232
#	0x67e55e	net/http.(*Server).Serve+0x22e			/usr/lib/go/src/net/http/server.go:2826
#	0x67e215	net/http.(*Server).ListenAndServe+0xb5		/usr/lib/go/src/net/http/server.go:2764
#	0x67f203	net/http.ListenAndServe+0x73			/usr/lib/go/src/net/http/server.go:3004
#	0x714216	main.main+0x196					/home/sebortiz/kiip_toplevel/kafka_proxy/main.go:55
#	0x433646	runtime.main+0x206				/usr/lib/go/src/runtime/proc.go:201

1 @ 0x433a3b 0x433ae3 0x40bc5e 0x40b98b 0x70e6c2 0x460ed1
#	0x70e6c1	github.com/confluentinc/confluent-kafka-go/kafka.channelProducer+0x51	/home/sebortiz/.go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v0.11.6/kafka/producer.go:465

1 @ 0x480135 0x47de8a 0x47c879 0x4953f9 0x580a5f 0x593bc8 0x67501a 0x460ed1
#	0x480134	syscall.Syscall+0x4				/usr/lib/go/src/syscall/asm_linux_amd64.s:18
#	0x47de89	syscall.read+0x59				/usr/lib/go/src/syscall/zsyscall_linux_amd64.go:732
#	0x47c878	syscall.Read+0x48				/usr/lib/go/src/syscall/syscall_unix.go:172
#	0x4953f8	internal/poll.(*FD).Read+0x118			/usr/lib/go/src/internal/poll/fd_unix.go:165
#	0x580a5e	net.(*netFD).Read+0x4e				/usr/lib/go/src/net/fd_unix.go:202
#	0x593bc7	net.(*conn).Read+0x67				/usr/lib/go/src/net/net.go:177
#	0x675019	net/http.(*connReader).backgroundRead+0x59	/usr/lib/go/src/net/http/server.go:676

1 @ 0x6f8098 0x6f7ea0 0x6f4904 0x700b40 0x701413 0x67b4b4 0x67d147 0x67e0eb 0x67a516 0x460ed1
#	0x6f8097	runtime/pprof.writeRuntimeProfile+0x97	/usr/lib/go/src/runtime/pprof/pprof.go:707
#	0x6f7e9f	runtime/pprof.writeGoroutine+0x9f	/usr/lib/go/src/runtime/pprof/pprof.go:669
#	0x6f4903	runtime/pprof.(*Profile).WriteTo+0x3e3	/usr/lib/go/src/runtime/pprof/pprof.go:328
#	0x700b3f	net/http/pprof.handler.ServeHTTP+0x20f	/usr/lib/go/src/net/http/pprof/pprof.go:245
#	0x701412	net/http/pprof.Index+0x722		/usr/lib/go/src/net/http/pprof/pprof.go:268
#	0x67b4b3	net/http.HandlerFunc.ServeHTTP+0x43	/usr/lib/go/src/net/http/server.go:1964
#	0x67d146	net/http.(*ServeMux).ServeHTTP+0x126	/usr/lib/go/src/net/http/server.go:2361
#	0x67e0ea	net/http.serverHandler.ServeHTTP+0xaa	/usr/lib/go/src/net/http/server.go:2741
#	0x67a515	net/http.(*conn).serve+0x645		/usr/lib/go/src/net/http/server.go:1847

@edenhill
Copy link
Contributor

edenhill commented Dec 6, 2018

@trtg You don't seem to read from the producer's Events channel, which causes it to fill up with delivery reports.
Either disable go.delivery.reports, or (preferably) add a go-routine to serve the Events channel.

@trtg
Copy link

trtg commented Dec 7, 2018

@edenhill thanks for the clarification, I didn't realize consuming the delivery reports was required. I'll rework things to do that. Just out of curiosity, however, where would go.delivery.reports be set? It's not mentioned in CONFIGURATION.md

@edenhill
Copy link
Contributor

edenhill commented Dec 7, 2018

It is a go-level config property, it is documented in the NewProducer docs.
You specify it as any other configuration property though, in the ConfigMap.

@kurtostfeld
Copy link
Author

kurtostfeld commented Dec 23, 2018

I've modified my sending code to the following. This code will run for days to weeks, before it hits the issue. When the ProduceChannel() fills up, the Close() blocks and never returns.

In the following code, the first log message shows up but the second message does not. What I was hoping to do, was when this rare even occurs, just close+reopen the Kafka Producer. Unfortunately, Close() blocks and that strategy doesn't work.

Is there anything else that I can try to resolve this issue?

	select {
	case p.kafkaProducer.ProduceChannel() <- &msg:
		// Success.
	default:
		// There seems to be a relatively rare bug in the Go Kafka client, where the ProduceChannel can fill up
		// and the client needs to be manually reset.
		log.Println("Kafka ProduceChannel() full. Resetting channel...")
                // fyi, this is a Prometheus Metric counter.
		kafkaClientResetsCounter.Inc()
                // Unfortunately, the code will hang here. This Close() will never complete.
                // The application needs to manually restarted at this point.
		p.kafkaProducer.Close()
		p.kafkaProducer = nil
		log.Println("Reset channel complete.")
	}

@kurtostfeld
Copy link
Author

@edenhill, any comment on the above. Is there anything else that I can try? I guess start adding logs+breakpoints to the confluent-kafka-go code to debug into that?

@namitmr
Copy link

namitmr commented Jan 7, 2019

@kurtostfeld we are facing a similar issue where, we face the error message local_queue_full (notably the librdkafka queue) and cannot produce any further. Is this particular issue specific to the 0.11.6 release or did you face this on the previous versions as well ?
Note : As mentioned the above issue not particularly reproduce-able

@edenhill
Copy link
Contributor

edenhill commented Jan 7, 2019

@kurtostfeld The channel producer will block on its internal produce() call until there is room in the send-queue, which unfortunately also blocks Close().
The queue is full either because the broker is not available (or can't keep up), or due to the delivery reports not being handled and thus filling up the delivery-report/Events channel.

@AlexJF
Copy link

AlexJF commented Mar 4, 2019

Hi everyone,

We were apparently hitting the same issue after upgrading to librdkafka 0.11.6

Over the course of 3 days, 5 of our 300 consumers/producers experienced an unexpected "hang" (no further processing was done and there was one core at 100% cpu usage). Some of these producers were also complaining of full queues. This happened at apparently random times. If you grabbed a profile from one of these stalled apps, it would look like the following:

image

We've tracked this down to these 2 issues:

Given that these fixes are very simple and have already been merged into master but haven't been released in a stable version, we patched them into the 0.11.6 release, rebuilt librdkafka with them and haven't had another stall for over 5 days now.

  cd /tmp
  curl -OL https://github.com/edenhill/librdkafka/archive/v0.11.6.tar.gz
  echo "9c0afb8b53779d968225edf1e79da48a162895ad557900f75e7978f65e642032  v0.11.6.tar.gz" | sha256sum -c -
  tar -xf v0.11.6.tar.gz -C /usr/local/src
  # TODO: Remove when updating to librdkafka 1.x.x
  # Backport important fixes for deadlock
  # https://github.com/edenhill/librdkafka/commit/672dbf42d2a0ab8694be160740dad48554178a7c
  # https://github.com/edenhill/librdkafka/commit/d62dd0e7f16cde184e7c809a01ed5aa06861b660
  # https://github.com/edenhill/librdkafka/issues/2108
  # https://github.com/edenhill/librdkafka/issues/2208
  curl -OL https://github.com/edenhill/librdkafka/commit/672dbf42d2a0ab8694be160740dad48554178a7c.patch
  echo "c88a090ea674d7197e7104f8a6aa28a855c37a38f5479d08464916fe0eda3e19  672dbf42d2a0ab8694be160740dad48554178a7c.patch" | sha256sum -c -
  curl -OL https://github.com/edenhill/librdkafka/commit/d62dd0e7f16cde184e7c809a01ed5aa06861b660.patch
  echo "4af86643b31f1c698f61029cec6eec0d54056696db79e724bd7f044f9e965a96  d62dd0e7f16cde184e7c809a01ed5aa06861b660.patch" | sha256sum -c -
  cd /usr/local/src/librdkafka-0.11.6
  # TODO: Remove when updating to librdkafka 1.x.x
  patch -p1 < /tmp/672dbf42d2a0ab8694be160740dad48554178a7c.patch
  patch -p1 < /tmp/d62dd0e7f16cde184e7c809a01ed5aa06861b660.patch
  ./configure --prefix=/usr
  make
  make check
  make install

@edenhill
Copy link
Contributor

@AlexJF That's great! Thank you!

We'll have a new v1.0.0 release (with the fixes) of both librdkafka and confluent-kafka-go within a week or two

@jpiper
Copy link

jpiper commented Jul 17, 2019

FWIW I'm still seeing the issues in 1.0.0 and 1.10 in alpine3.10 - is there anything I can do to help debug this?

@saifat29
Copy link

saifat29 commented Oct 4, 2019

I too have the exact same issue.
One core at 100% CPU usage and rest others at around 60%.
I use the following-
Broker - v2.3.0
librdkafka - v1.1.0
confluent-kafka-go - v1.1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants