Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify connection close logic #559

Merged
merged 7 commits into from
Jul 1, 2021

Conversation

cckellogg
Copy link
Contributor

Motivation

Simplify the connection close logic into one function. I thinks this makes the code easier to reason about and maintain.

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also do something like

        var err error
	var host *url.URL
	var rpcResult *RPCResult
	backoff := Backoff{100 * time.Millisecond}
	var timeoutCh <-chan time.Time
	for {
		host, err = c.serviceNameResolver.ResolveHost()
		if err != nil {
			c.log.WithError(err).Errorf("rpc client failed to resolve host")
			return nil, err
		}
		rpcResult, err = c.Request(host, host, requestID, cmdType, message)
		// success we got a response
		if err == nil {
			break
		}

		retryTime := backoff.Next()
		sleepCh := time.After(retryTime)
		if timeoutCh == nil {
			timeoutCh = time.After(c.requestTimeout)
		}
		c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
		select {
		case <- sleepCh:
			break
		case <- timeoutCh:
			return rpcResult, err
		}
	}

And we may want to think about adding a context (in another PR) if can we need to cancel this when shutting down producer or consumer on reconnect?

Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great Jobs

@wolfstudy wolfstudy added this to the 0.6.0 milestone Jul 1, 2021
@wolfstudy
Copy link
Member

wolfstudy commented Jul 1, 2021

Hello @cckellogg , In connection start, we first judge whether the TCP connection is established successfully.

if c.connect(){}

If the connection is established successfully but the handshake fails, we will branch else, but here we just change the connection status to connectionClosed without directly closing the TCP connection.

else {
	c.metrics.ConnectionsHandshakeErrors.Inc()
	c.changeState(connectionClosed)
}

But in the Close function, when the connection is in the connectionClosed state, it returns directly and does not perform the TCP close operation

func (c *connection) Close() {
	if c.getState() == connectionClosed {
		return
	}

So here in the reconnect process, it will cause multiple TCP connections to be created locally, resulting in TCP connection resources leakage.

image

Here for the 515 process, only one pulsar Client object is created. In theory, only one TCP connection should be established here, but during the reconnect process, it was found that multiple TCP connections were created locally, and there was a leak of TCP connection resources.

@wolfstudy
Copy link
Member

cc / @freeznet @zymap @merlimat

Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @cckellogg Here I have confusion. When we change the connection status to connectionClosed, when did we actually close the TCP connection?

@wolfstudy
Copy link
Member

For example, in https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L222-L224, When the handshake fails, we have created a new connection, but we directly changed the status to connectionClosed but did not actually close the connection

@cckellogg
Copy link
Contributor Author

For example, in https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L222-L224, When the handshake fails, we have created a new connection, but we directly changed the status to connectionClosed but did not actually close the connection

Changed the code to call Close() instead of setting the state to connectionClosed. This should cleanup and close the connection.

Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cckellogg Great changes LGTM +1

@wolfstudy wolfstudy merged commit 3e091ae into apache:master Jul 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants