Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sarama does not seem to re-connect approriately #133

Closed
sclasen opened this issue Jul 9, 2014 · 7 comments · Fixed by #134
Closed

sarama does not seem to re-connect approriately #133

sclasen opened this issue Jul 9, 2014 · 7 comments · Fixed by #134
Labels

Comments

@sclasen
Copy link

sclasen commented Jul 9, 2014

Since #65 was closed, I was hoping the re-connect stuff would work, but it doesnt appear to.

Using commit c761d93 I am attempting to constantly sync send to a single kafka broker.

I kill and restart the broker and sarama does not re-connect. The fix for #65 solved the deadlock issue I saw but now I get an infinite stream of

kafka: Dropped 1 messages: write tcp 127.0.0.1:9092: broken pipe

in my log.

Is there any special configuration necessary to get sarama to reconnect? Here is the code that creates the client and producer

    log.Println("go=kafka at=new-kafka-deliver")
    clientConfig := &sarama.ClientConfig{
        MetadataRetries:      10,
        WaitForElection:      10 * time.Second,
    }
    producerConfig := &sarama.ProducerConfig{
        Partitioner:      sarama.NewRandomPartitioner(),
        RequiredAcks:     sarama.WaitForLocal,
        Timeout:          5 * time.Second,
        Compression:      sarama.CompressionNone,
        MaxBufferedBytes: uint32(1000),
        MaxBufferTime:    1 * time.Second,
    }

    client, err := sarama.NewClient(clientId, brokerList, clientConfig)
    if err != nil {
        return nil, err
    }
    log.Println("go=kafka at=created-client")

    producer, err := sarama.NewProducer(client, producerConfig)
    if err != nil {
        return nil, err
    }
    log.Println("go=kafka at=created-producer")
@sclasen sclasen changed the title sarama does not seem to re-connect aproriately sarama does not seem to re-connect approriately Jul 9, 2014
@eapache
Copy link
Contributor

eapache commented Jul 9, 2014

Hmph, no, that should work as-is. I will investigate.

@eapache
Copy link
Contributor

eapache commented Jul 9, 2014

Note that you should typically use the NewConfig methods (NewClientConfig, NewProducerConfig, etc) to generate your starting config, but that doesn't appear to be the issue here - your config values look OK (though 10 seconds for election is unusually high).

eapache added a commit that referenced this issue Jul 9, 2014
Specifically, in the cases that indicate it is probably unreachable or
something. Otherwise we never reconnect, even if it comes back up.

Fixes #133
@eapache
Copy link
Contributor

eapache commented Jul 9, 2014

With the linked branch, it takes 100 seconds (retries * wait) to timeout, but then eventually reconnects. I suggest lowering those values somewhat. The defaults are 3 retries with 250ms in between.

@eapache
Copy link
Contributor

eapache commented Jul 9, 2014

Also, I feel like I should apologize. I'm really not sure how that line was missed (nor how we've somehow avoided all of these issues so far...)

@sclasen
Copy link
Author

sclasen commented Jul 9, 2014

So I grabbed c2d008d and tried it out. Somewhat better I think, sarama definitley notices when the broker comes back, but is still not resuming sending.

Also using stock NewClientConfig() and NewBrokerConfig()

Here is the log output after the broker is killed

2014/07/09 11:11:00 go=deliver num=0 at=send-error error=kafka: Dropped 1 messages: kafka: broker: not connected
2014/07/09 11:11:00 go=deliver num=7 at=send-error error=kafka: Dropped 1 messages: EOF
2014/07/09 11:11:00 go=deliver num=3 at=send-error error=kafka: Dropped 1 messages: EOF
2014/07/09 11:11:00 go=deliver num=4 at=send-error error=kafka: Dropped 1 messages: kafka: broker: not connected
2014/07/09 11:11:00 go=deliver num=2 at=send-error error=kafka: Dropped 1 messages: EOF
2014/07/09 11:11:00 go=deliver num=6 at=send-error error=kafka: Dropped 1 messages: EOF
2014/07/09 11:11:00 go=deliver num=1 at=send-error error=kafka: Dropped 1 messages: EOF
2014/07/09 11:11:00 go=deliver num=5 at=send-error error=kafka: Dropped 1 messages: EOF
2014/07/09 11:11:01 go=deliver num=1 at=send-error error=kafka: Client has run out of available brokers to talk to. Is your cluster reachable?
2014/07/09 11:11:01 go=deliver num=2 at=send-error error=kafka: Client has run out of available brokers to talk to. Is your cluster reachable?
2014/07/09 11:11:01 go=deliver num=3 at=send-error error=kafka: Client has run out of available brokers to talk to. Is your cluster reachable?
2014/07/09 11:11:01 go=deliver num=5 at=send-error error=kafka: Client has run out of available brokers to talk to. Is your cluster reachable?
2014/07/09 11:11:01 go=deliver num=0 at=send-error error=kafka: Client has run out of available brokers to talk to. Is your cluster reachable?

I restart kafka and the error messages stop, but seems like maybe there is another deadlock, as messages are not being delivered after the restart, here are the goroutines


goroutine profile: total 31
1 @ 0x177460 0x177259 0x1742c4 0x601af 0x602fc 0x710d0 0x72573 0x72d7e 0x70d27 0x1ff00
#   0x177460    runtime/pprof.writeRuntimeProfile+0xa0  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/pprof/pprof.go:540
#   0x177259    runtime/pprof.writeGoroutine+0x89   /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/pprof/pprof.go:502
#   0x1742c4    runtime/pprof.(*Profile).WriteTo+0xb4   /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/pprof/pprof.go:229
#   0x601af     net/http/pprof.handler.ServeHTTP+0x25f  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/pprof/pprof.go:165
#   0x602fc     net/http/pprof.Index+0x13c      /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/pprof/pprof.go:177
#   0x710d0     net/http.HandlerFunc.ServeHTTP+0x40 /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1220
#   0x72573     net/http.(*ServeMux).ServeHTTP+0x163    /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1496
#   0x72d7e     net/http.serverHandler.ServeHTTP+0x16e  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1597
#   0x70d27     net/http.(*conn).serve+0x7b7        /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1167

1 @ 0x1fd36 0xc69b 0xc7c8 0x3063 0x1d9cf 0x1ff00
#   0x3063  main.main+0x703     /Users/sclasen/code/src/github.com/heroku/event-shuttle/event-shuttle.go:64
#   0x1d9cf runtime.main+0x11f  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/proc.c:220

1 @ 0x13901 0x1a263 0x1ff00
#   0x13901 runtime.notetsleepg+0x71    /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/lock_sema.c:254
#   0x1a263 runtime.MHeap_Scavenger+0xa3    /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/mheap.c:463

1 @ 0x1fd36 0xbe76 0xc77c 0x59db 0x1ff00
#   0x59db  main.(*Store).readEvents+0x28b  /Users/sclasen/code/src/github.com/heroku/event-shuttle/store.go:166

1 @ 0x13901 0x2b362 0x4c70e 0x1ff00
#   0x4c70e os/signal.loop+0x1e /usr/local/Cellar/go/1.2.2/libexec/src/pkg/os/signal/signal_unix.go:21

1 @ 0x1fd36 0x2a506 0x29e6a 0x11f084 0x11f0e0 0x1221f2 0x135117 0x1351e7 0x72f01 0x72e50 0x7317d 0x6e20 0x1ff00
#   0x2a506     netpollblock+0xa6           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/netpoll.goc:280
#   0x29e6a     net.runtime_pollWait+0x6a       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/netpoll.goc:116
#   0x11f084    net.(*pollDesc).Wait+0x34       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_poll_runtime.go:81
#   0x11f0e0    net.(*pollDesc).WaitRead+0x30       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_poll_runtime.go:86
#   0x1221f2    net.(*netFD).accept+0x2c2       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_unix.go:382
#   0x135117    net.(*TCPListener).AcceptTCP+0x47   /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/tcpsock_posix.go:233
#   0x1351e7    net.(*TCPListener).Accept+0x27      /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/tcpsock_posix.go:243
#   0x72f01     net/http.(*Server).Serve+0x91       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1622
#   0x72e50     net/http.(*Server).ListenAndServe+0xa0  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1612
#   0x7317d     net/http.ListenAndServe+0x6d        /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1677
#   0x6e20      main.func·003+0x40         /Users/sclasen/code/src/github.com/heroku/event-shuttle/event-shuttle.go:27

1 @ 0x1ff00

1 @ 0x1fd36 0xbe76 0xc77c 0x5eb5 0x1ff00
#   0x5eb5  main.(*Store).cleanEvents+0x3e5 /Users/sclasen/code/src/github.com/heroku/event-shuttle/store.go:204

1 @ 0x544c5 0x52b04 0x493d0 0xe4507 0xf4016 0xf2ce4 0xeb757 0x61cc 0x551f 0x1ff00
#   0x544c5 syscall.Syscall+0x5             /usr/local/Cellar/go/1.2.2/libexec/src/pkg/syscall/asm_darwin_amd64.s:17
#   0x52b04 syscall.Fsync+0x54              /usr/local/Cellar/go/1.2.2/libexec/src/pkg/syscall/zsyscall_darwin_amd64.go:533
#   0x493d0 os.(*File).Sync+0x90                /usr/local/Cellar/go/1.2.2/libexec/src/pkg/os/file_posix.go:150
#   0xe4507 github.com/boltdb/bolt.fdatasync+0x27       /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/boltdb/bolt/bolt.go:8
#   0xf4016 github.com/boltdb/bolt.(*Tx).write+0x326    /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go:351
#   0xf2ce4 github.com/boltdb/bolt.(*Tx).Commit+0x3e4   /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go:174
#   0xeb757 github.com/boltdb/bolt.(*DB).Update+0xe7    /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/boltdb/bolt/db.go:455
#   0x61cc  main.(*Store).writeEvent+0x6c           /Users/sclasen/code/src/github.com/heroku/event-shuttle/store.go:254
#   0x551f  main.(*Store).storeEvents+0x9f          /Users/sclasen/code/src/github.com/heroku/event-shuttle/store.go:125

1 @ 0x1fd36 0xd244 0xce92 0x615b 0x1ff00
#   0xd244  selectgo+0x384          /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/chan.c:996
#   0xce92  runtime.selectgo+0x12       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/chan.c:840
#   0x615b  main.(*Store).report+0x28b  /Users/sclasen/code/src/github.com/heroku/event-shuttle/store.go:215

8 @ 0x1fd36 0xbe76 0xc77c 0x2891 0x2780 0x1ff00
#   0x2891  main.noAckEvent+0x51                /Users/sclasen/code/src/github.com/heroku/event-shuttle/deliver.go:111
#   0x2780  main.(*KafkaDeliver).deliverEvents+0x260    /Users/sclasen/code/src/github.com/heroku/event-shuttle/deliver.go:83

1 @ 0x13901 0x2c5fd 0x1ff00
#   0x2c5fd timerproc+0xbd  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/time.goc:232

7 @ 0x1fd36 0xc382 0xc81b 0x30ca6 0x42a06 0x42992 0x1ff00
#   0x30ca6 github.com/Shopify/sarama.(*Broker).responseReceiver+0x86               /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/Shopify/sarama/broker.go:359
#   0x42a06 github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm+0x26 /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/Shopify/sarama/broker.go:112
#   0x42992 github.com/Shopify/sarama.withRecover+0x32                      /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/Shopify/sarama/utils.go:27

1 @ 0x1fd36 0x2a506 0x29e6a 0x11f084 0x11f0e0 0x1221f2 0x135117 0x1351e7 0x72f01 0x72e50 0x7317d 0x1ff00
#   0x2a506     netpollblock+0xa6           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/netpoll.goc:280
#   0x29e6a     net.runtime_pollWait+0x6a       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/netpoll.goc:116
#   0x11f084    net.(*pollDesc).Wait+0x34       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_poll_runtime.go:81
#   0x11f0e0    net.(*pollDesc).WaitRead+0x30       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_poll_runtime.go:86
#   0x1221f2    net.(*netFD).accept+0x2c2       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_unix.go:382
#   0x135117    net.(*TCPListener).AcceptTCP+0x47   /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/tcpsock_posix.go:233
#   0x1351e7    net.(*TCPListener).Accept+0x27      /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/tcpsock_posix.go:243
#   0x72f01     net/http.(*Server).Serve+0x91       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1622
#   0x72e50     net/http.(*Server).ListenAndServe+0xa0  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1612
#   0x7317d     net/http.ListenAndServe+0x6d        /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1677

2 @ 0x1fd36 0xd244 0xce92 0x43482 0x1ff00
#   0xd244  selectgo+0x384                  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/chan.c:996
#   0xce92  runtime.selectgo+0x12               /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/chan.c:840
#   0x43482 github.com/Shopify/sarama.func·008+0x262   /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/Shopify/sarama/producer.go:224

1 @ 0x1fd36 0x2a506 0x29e6a 0x11f084 0x11f0e0 0x120460 0x12f015 0x6bad5 0x112f6b 0x15cd10 0x15d8b4 0x15da83 0x186631 0x186427 0x68818 0x6daeb 0x70924 0x1ff00
#   0x2a506     netpollblock+0xa6               /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/netpoll.goc:280
#   0x29e6a     net.runtime_pollWait+0x6a           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/netpoll.goc:116
#   0x11f084    net.(*pollDesc).Wait+0x34           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_poll_runtime.go:81
#   0x11f0e0    net.(*pollDesc).WaitRead+0x30           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_poll_runtime.go:86
#   0x120460    net.(*netFD).Read+0x2a0             /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/fd_unix.go:204
#   0x12f015    net.(*conn).Read+0xc5               /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/net.go:122
#   0x6bad5     net/http.(*liveSwitchReader).Read+0xa5      /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:204
#   0x112f6b    io.(*LimitedReader).Read+0xbb           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/io/io.go:398
#   0x15cd10    bufio.(*Reader).fill+0x110          /usr/local/Cellar/go/1.2.2/libexec/src/pkg/bufio/bufio.go:91
#   0x15d8b4    bufio.(*Reader).ReadSlice+0x204         /usr/local/Cellar/go/1.2.2/libexec/src/pkg/bufio/bufio.go:274
#   0x15da83    bufio.(*Reader).ReadLine+0x63           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/bufio/bufio.go:305
#   0x186631    net/textproto.(*Reader).readLineSlice+0x61  /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/textproto/reader.go:55
#   0x186427    net/textproto.(*Reader).ReadLine+0x27       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/textproto/reader.go:36
#   0x68818     net/http.ReadRequest+0x88           /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/request.go:526
#   0x6daeb     net/http.(*conn).readRequest+0x1bb      /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:575
#   0x70924     net/http.(*conn).serve+0x3b4            /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1123

1 @ 0x1fd36 0xd244 0xce92 0x4951 0x6e74 0x710d0 0xce5ae 0x72d7e 0x70d27 0x1ff00
#   0xd244  selectgo+0x384                          /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/chan.c:996
#   0xce92  runtime.selectgo+0x12                       /usr/local/Cellar/go/1.2.2/libexec/src/pkg/runtime/chan.c:840
#   0x4951  main.(*Endpoint).PostEvent+0x3f1                /Users/sclasen/code/src/github.com/heroku/event-shuttle/http.go:41
#   0x6e74  main.*Endpoint.PostEvent·fm+0x44               /Users/sclasen/code/src/github.com/heroku/event-shuttle/http.go:21
#   0x710d0 net/http.HandlerFunc.ServeHTTP+0x40             /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1220
#   0xce5ae github.com/bmizerany/pat.(*PatternServeMux).ServeHTTP+0x1be /Users/sclasen/code/src/github.com/heroku/event-shuttle/Godeps/_workspace/src/github.com/bmizerany/pat/mux.go:109
#   0x72d7e net/http.serverHandler.ServeHTTP+0x16e              /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1597
#   0x70d27 net/http.(*conn).serve+0x7b7                    /usr/local/Cellar/go/1.2.2/libexec/src/pkg/net/http/server.go:1167

@eapache
Copy link
Contributor

eapache commented Jul 9, 2014

Check your code - the only two sarama goroutines in that trace are both waiting for "userspace" input (ie they're not stuck internally).

My super-simple test code:

for {
    err = producer.SendMessage(...)
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println("> message sent")
    }
}

Reconnects and begins sending messages again properly.

(also, if you haven't lowered your retry count/timer, maybe just waiting 100 seconds will do the trick)

@sclasen
Copy link
Author

sclasen commented Jul 9, 2014

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants