Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnection blocked in producer by request timed out #697

Closed
wolfstudy opened this issue Dec 29, 2021 · 6 comments · Fixed by #703
Closed

Reconnection blocked in producer by request timed out #697

wolfstudy opened this issue Dec 29, 2021 · 6 comments · Fixed by #703
Labels
help wanted anyone who are interested in could help on it

Comments

@wolfstudy
Copy link
Member

wolfstudy commented Dec 29, 2021

Expected behavior

When the reconnection logic is triggered, the reconnection can be successful.

Actual behavior

When the reconnection logic is triggered, Go SDK has been trying to reconnect, and it has been unable to reconnect successfully.

When the reconnection logic continues to be triggered, the log information of the Go SDK is as follows:

  1. Broker notifies the client to close the producer
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=info msg="Broker notification of Closed producer: 1" local_addr="1.1.1.2:51860" remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Connection was closed]" cnx="1.1.1.2:51860 -> 1.1.1.3:6650" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=info msg="[Reconnecting to broker in  109.456323ms]" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"

  1. ReceivedSendReceipt process error. After that, the send action of this topic has not been restored successfully, and the request timeout has been reported.
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431457 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431462 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431465 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431470 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431473 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431479 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431483 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431488 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431492 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431495 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431497 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431500 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431503 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431504 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431506 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431512 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431519 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431523 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"
Dec 29 15:18:08 gg-1-1-1-2 gateway: time="2021-12-29T15:18:08+08:00" level=warning msg="[Got unexpected send receipt for messageID=%+v ledgerId:18625173 entryId:431526 ]" local_addr="1.1.1.2:51860" producerID=1 remote_addr="pulsar://1.1.1.3:6650"

....
....
....
  1. The logic of send timeout was triggered, and the message failed to be sent
Dec 29 15:18:16 gg-1-1-1-2 gateway: time="2021-12-29T15:18:16+08:00" level=info msg="Failing 685 messages" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
  1. Continue to try the logic of reconnection, and reconnection fails, output request timed out
Dec 29 15:18:18 gg-1-1-1-2 gateway: time="2021-12-29T15:18:18+08:00" level=error msg="[Failed to create producer]" error="request timed out" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
Dec 29 15:18:18 gg-1-1-1-2 gateway: time="2021-12-29T15:18:18+08:00" level=info msg="[Reconnecting to broker in  221.849265ms]" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"

Dec 29 15:18:28 gg-1-1-1-2 gateway: time="2021-12-29T15:18:28+08:00" level=error msg="[Failed to create producer]" error="request timed out" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
Dec 29 15:18:28 gg-1-1-1-2 gateway: time="2021-12-29T15:18:28+08:00" level=info msg="[Reconnecting to broker in  468.906379ms]" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"

Dec 29 15:18:39 gg-1-1-1-2 gateway: time="2021-12-29T15:18:39+08:00" level=error msg="[Failed to create producer]" error="request timed out" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
Dec 29 15:18:39 gg-1-1-1-2 gateway: time="2021-12-29T15:18:39+08:00" level=info msg="[Reconnecting to broker in  840.895497ms]" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"

Dec 29 15:18:50 gg-1-1-1-2 gateway: time="2021-12-29T15:18:50+08:00" level=error msg="[Failed to create producer]" error="request timed out" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
Dec 29 15:18:50 gg-1-1-1-2 gateway: time="2021-12-29T15:18:50+08:00" level=info msg="[Reconnecting to broker in  1.913524483s]" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"

Dec 29 15:19:02 gg-1-1-1-2 gateway: time="2021-12-29T15:19:02+08:00" level=error msg="[Failed to create producer]" error="request timed out" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
Dec 29 15:19:02 gg-1-1-1-2 gateway: time="2021-12-29T15:19:02+08:00" level=info msg="[Reconnecting to broker in  3.485446s]" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"

....
....

ec 29 15:22:52 gz-9-139-45-78 gateway: time="2021-12-29T15:22:52+08:00" level=error msg="[Failed to create producer]" error="request timed out" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"
Dec 29 15:22:52 gz-9-139-45-78 gateway: time="2021-12-29T15:22:52+08:00" level=info msg="[Reconnecting to broker in  1m3.390120353s]" producerID=1 producer_name=cmq_1.1.1.2_ac48dcea-6164-11ec-8357-5254007f7980 topic="persistent://test-tenant/tst-ns/test-topic-partition-0"

When this phenomenon occurs, it will continue to reconnect until the Go SDK is restarted.

And we can see that the error is that the cmdProducer was blocked when the producer reconnected, when trigger grabCnx() func, the blocked happens on:


func (p *partitionProducer) grabCnx() error {
        ....
	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
	if err != nil {
		p.log.WithError(err).Error("Failed to create producer")
		return err
	}
        ....
}

Broker

During this time period, we checked the log information on the broker side and found that the broker side did not receive any information about processing cmdProducer at this time,

in handleProduce of clientCnx

@Override
protected void handleProducer(final CommandProducer cmdProducer) {
            ....
            log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
            ....
}

so it can be judged that the Go SDK is blocked and cmdProducer is not sent to the broker.

Where the Go SDK is blocked occurs:

func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
	cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
	c.metrics.RPCRequestCount.Inc()


       // 1. It doesn't look like the place where the blocking occurs, because the logic of reconnect is happening all the time, and the error log output is `request timed out`
	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
	if err != nil {
		return nil, err
	}


	ch := make(chan result, 1)
        // 2. Where the blocking logic occurs, there is no specific reason why the Go SDK is blocked here.
	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
		c.log.Infof("[Request] the internal send request ch size: %d--[%v]", len(ch), cmdType)
		ch <- result{&RPCResult{
			Cnx:      cnx,
			Response: response,
		}, err}
		close(ch)
	})

	select {
	case res := <-ch:
		return res.RPCResult, res.error
	case <-time.After(c.requestTimeout):
		return nil, ErrRequestTimeOut
	}
}

Steps to reproduce

This is not a stable recurring issue

System configuration

Pulsar version: 2.7.2
Go SDK: master branch

@bschofield
Copy link
Contributor

bschofield commented Jan 4, 2022

Could this issue possibly be related to #691?

I tried out the client library with that PR applied, but after 24-48hrs I got a problem with clients freezing as they attempted to reconnect. I rolled back to d5d4903 and the issue has not reoccurred after ~1 week.

@wolfstudy
Copy link
Member Author

Could this issue possibly be related to #691?

I tried out the client library with that PR applied, but after 24-48hrs I got a problem with clients freezing as they attempted to reconnect. I rolled back to d5d4903 and the issue has not reoccurred after ~1 week.

Hello @bschofield Can you provide more log info for this, thanks.

The #689 does not seem to cause the problem described in this issue, because the reconnect stuck occurs on the prouder side

@bschofield
Copy link
Contributor

bschofield commented Jan 5, 2022

Hello @bschofield Can you provide more log info for this, thanks.

It is hard for me to know exactly which log lines are relevant as I have very many pods running, each with a large number of producers, but here is an example that looks interesting. This may not be the root cause of the issue.

2021-12-29 16:12:31 {"level":"warn","ts":1640794351.682869,"caller":"messaging/logging_zap.go:67","msg":"[Detected stale connection to broker]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:59382"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.6829515,"caller":"messaging/logging_zap.go:67","msg":"[Failed to write on connection]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:59382","error":"write tcp 10.244.1.173:59382->10.244.0.2:6650: use of closed network connection"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.6829886,"caller":"messaging/logging_zap.go:67","msg":"[Connection was closed]","topic":"persistent://REDACTED-P2","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":1,"cnx":"10.244.1.173:59382 -> 10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.6830094,"caller":"messaging/logging_zap.go:67","msg":"[Connection was closed]","topic":"persistent://REDACTED-W0","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":8,"cnx":"10.244.1.173:59382 -> 10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.6830208,"caller":"messaging/logging_zap.go:67","msg":"[Connection was closed]","topic":"persistent://REDACTED-J1","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":6,"cnx":"10.244.1.173:59382 -> 10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.6830392,"caller":"messaging/logging_zap.go:67","msg":"[Failed to write on connection]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:59382","error":"write tcp 10.244.1.173:59382->10.244.0.2:6650: use of closed network connection"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.6830513,"caller":"messaging/logging_zap.go:67","msg":"[Failed to write on connection]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:59382","error":"write tcp 10.244.1.173:59382->10.244.0.2:6650: use of closed network connection"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.6830642,"caller":"messaging/logging_zap.go:67","msg":"[Failed to write on connection]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:59382","error":"write tcp 10.244.1.173:59382->10.244.0.2:6650: use of closed network connection"}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.6830602,"caller":"messaging/logging_zap.go:62","msg":"[runEventsLoop will reconnect in producer]","topic":"persistent://REDACTED-P2","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":1}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.6830988,"caller":"messaging/logging_zap.go:62","msg":"[Reconnecting to broker in  108.668329ms]","topic":"persistent://REDACTED-P2","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":1}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.6830504,"caller":"messaging/logging_zap.go:62","msg":"[runEventsLoop will reconnect in producer]","topic":"persistent://REDACTED-W0","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":8}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.6831107,"caller":"messaging/logging_zap.go:62","msg":"[Reconnecting to broker in  107.590908ms]","topic":"persistent://REDACTED-W0","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":8}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.68313,"caller":"messaging/logging_zap.go:62","msg":"[runEventsLoop will reconnect in producer]","topic":"persistent://REDACTED-J1","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":6}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.6831586,"caller":"messaging/logging_zap.go:62","msg":"[Reconnecting to broker in  111.142612ms]","topic":"persistent://REDACTED-J1","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":6}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.7922926,"caller":"messaging/logging_zap.go:62","msg":"[Connecting to broker]","remote_addr":"pulsar://10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.7961068,"caller":"messaging/logging_zap.go:62","msg":"[TCP connection established]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:39250"}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.79917,"caller":"messaging/logging_zap.go:62","msg":"[Connection is ready]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:39250"}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.802493,"caller":"messaging/logging_zap.go:82","msg":"Resending [1780] pending batches","topic":"persistent://REDACTED-P2","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":1}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.8025355,"caller":"messaging/logging_zap.go:82","msg":"Resending [680] pending batches","topic":"persistent://REDACTED-J1","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":6}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.8025396,"caller":"messaging/logging_zap.go:82","msg":"Resending [1866] pending batches","topic":"persistent://REDACTED-W0","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":8}
2021-12-29 16:12:31 {"level":"info","ts":1640794351.8164892,"caller":"messaging/logging_zap.go:62","msg":"[Reconnected producer to broker]","topic":"persistent://REDACTED-J1","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":6,"cnx":"10.244.1.173:39250 -> 10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.8165426,"caller":"messaging/logging_zap.go:87","msg":"Received ack for [ledgerId:1021695 entryId:47743  739 711] on sequenceId %!v(MISSING) - expected: %!v(MISSING), closing connection","topic":"persistent://REDACTED-J1","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":6}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.8166032,"caller":"messaging/logging_zap.go:67","msg":"[Connection was closed]","topic":"persistent://REDACTED-P2","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":1,"cnx":"10.244.1.173:39250 -> 10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.8166149,"caller":"messaging/logging_zap.go:67","msg":"[Connection was closed]","topic":"persistent://REDACTED-W0","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":8,"cnx":"10.244.1.173:39250 -> 10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.8166232,"caller":"messaging/logging_zap.go:67","msg":"[Connection was closed]","topic":"persistent://REDACTED-J1","producer_name":"REDACTED-76b5b9cbf9-6rs76","producerID":6,"cnx":"10.244.1.173:39250 -> 10.244.0.2:6650"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.8166335,"caller":"messaging/logging_zap.go:67","msg":"[Failed to write on connection]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:39250","error":"write tcp 10.244.1.173:39250->10.244.0.2:6650: use of closed network connection"}
2021-12-29 16:12:31 {"level":"warn","ts":1640794351.816643,"caller":"messaging/logging_zap.go:67","msg":"[Failed to write on connection]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.1.173:39250","error":"write tcp 10.244.1.173:39250->10.244.0.2:6650: use of closed network connection"}

I also see some other errors which might be unrelated problems, e.g.

2021-12-29 16:13:02 {"level":"warn","ts":1640794382.4651952,"caller":"messaging/logging_zap.go:87","msg":"Received send error from server: [[PersistenceError org.apache.bookkeeper.mledger.ManagedLedgerException: Bookie operation timeout]] : [%!s(MISSING)]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.0.113:54102"}

The #689 does not seem to cause the problem described in this issue, because the reconnect stuck occurs on the prouder side

I think there is a little miscommunication here, I was trying to suggest that you look at #691 and not #689.

In PR #691 (https://github.com/apache/pulsar-client-go/pull/691/files), you changed pulsar/producer_partition.go so that p.reconnectToBroker() is called in a separate goroutine.

My (untested) hypothesis is that one of the other p.*() calls from the original goroutine could perhaps now be happening at the same time as p.reconnectToBroker():

for {
	select {
	case i := <-p.eventsChan:
		switch v := i.(type) {
		case *sendRequest:
			p.internalSend(v)
		case *flushRequest:
			p.internalFlush(v)
		case *closeProducer:
			p.internalClose(v)
			return
		}
	case <-p.batchFlushTicker.C:
		if p.batchBuilder.IsMultiBatches() {
			p.internalFlushCurrentBatches()
		} else {
			p.internalFlushCurrentBatch()
		}
	}
}

I was wondering if this could cause the freeze in p.grabCnx() that you are seeing.

Again, I haven't tested this idea, but thought it could be worth mentioning since I experienced a similar problem but only with PR #691 applied. If this turns out to be something unrelated, apologies!

@wolfstudy
Copy link
Member Author

wolfstudy commented Jan 6, 2022

@bschofield Thanks for your work on this. The #691 change ignores some problems. When the producer is closed, it may cause the go rutine to leak, so I submitted a new pull request and listened to the action of the producer close by adding the closeCh of the event notification to ensure that the producer is closed. When closed, the go rutine of reconnect can exit normally. The new pull request is here #700, PTAL.

In addition, the problem you encountered here seems to be somewhat different from the problem I encountered. Indeed, for some reason, the broker proactively notified and disconnected from the producer, and then in the stage of trying to reconnect, the reconnection failed due to request time out. Because this problem is difficult to reproduce, there is still no way to determine where p.grabCnx() is blocked, causing the reconnection to fail.

2021-12-29 16:13:02 {"level":"warn","ts":1640794382.4651952,"caller":"messaging/logging_zap.go:87","msg":"Received send error from server: [[PersistenceError org.apache.bookkeeper.mledger.ManagedLedgerException: Bookie operation timeout]] : [%!s(MISSING)]","remote_addr":"pulsar://10.244.0.2:6650","local_addr":"10.244.0.113:54102"}

This seems to be that the broker returned a clear error. When the broker tried to read data from bookie, the read operation timed out.

@bschofield
Copy link
Contributor

bschofield commented Jan 6, 2022

Thanks a lot for the reply, @wolfstudy.

I agree that the logs I posted don't seem to show exactly the same issue, but I'm not sure that I got the root-cause logs -- as a broader outline of the problem, several of my producers shut down over a period of an hour or so, and the broker/bookie seemed to get into quite a confused state for the affected topics. I had to shut down the entire system and restart it to get things started again.

I'll put some comments on PR #700.

@wolfstudy
Copy link
Member Author

Update for my side:

I ran into a similar problem today, please provide further context about the problem:

image
As can be seen in the above picture, when we encounter this problem, we can see that the Go SDK has been trying to send the cmdProducer command to the broker, and indeed this command is also sent to the broker, but we can see that the Broker Haven't responded to this response.

And the Epoch means reconnect count.

zymap pushed a commit that referenced this issue Jan 12, 2022
Signed-off-by: xiaolongran <rxl@apache.org>

Fixes #697

### Motivation

As #697 said, In Go SDK, when the reconnection logic is triggered under certain conditions, the reconnection will not succeed due to request timeout.

Comparing the implementation of the Java SDK, we can see that each time the reconnection logic is triggered, the original connection will be closed and a new connection will be created.

![image](https://user-images.githubusercontent.com/20965307/148906906-1cfc5c07-1836-4185-94ec-e43f5565a4a8.png)


So in this pr, we introduced a new `reconnectFlag` field in the `connection` struct to mark the reconnection state. When the broker actively informs the client to close the connection to trigger the reconnection logic, we will store it from the `connections` cache of the `connectionPool`. The old connection object is deleted, and a new connection is created to complete the reconnection

### Modifications

- Add `reconnectFlag` in `connection` struct
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted anyone who are interested in could help on it
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants