Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add send error logic for connection #566

Merged
merged 9 commits into from
Jul 16, 2021

Conversation

wolfstudy
Copy link
Member

Signed-off-by: xiaolongran xiaolongran@tencent.com

Fixes #564

Motivation

Add command of SendError logic for connection

Signed-off-by: xiaolongran <xiaolongran@tencent.com>
@wolfstudy wolfstudy added this to the 0.6.0 milestone Jul 13, 2021
@wolfstudy wolfstudy self-assigned this Jul 13, 2021
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
@wolfstudy
Copy link
Member Author

ping @zymap PTAL

if ok {
producer.ConnectionClosed()
} else {
c.log.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong indentation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is automatically formatted by golanglint, probably for aesthetic reasons

c.listenersLock.RUnlock()

if ok {
producer.ConnectionClosed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Java implementation, we change the state to terminate and send the error to all pending messages, does the producer connection closed will do the same thing with that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called and the connection is not closed? What should the behavior be when this error happens? What do the other clients do? Calling producer.ConnectionClosed() when we don't close connection make the code confusing in my opinion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing here is to send a connection closed signal to runEventsLoop, try to trigger the logic of reconnection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java client closes the connection in the default case.

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L649

Should we do the same. What should the expected client behavior be here?

producerID := sendError.GetProducerId()

SendError:
switch *sendError.Error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also handle the ChecksumError in the java implementation, do we need to implement that?

Copy link
Contributor

@cckellogg cckellogg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the other clients handle this and is the correct behavior (send signal to producer, close producer, close connection, etc..)?

c.listenersLock.RUnlock()

if ok {
producer.ConnectionClosed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called and the connection is not closed? What should the behavior be when this error happens? What do the other clients do? Calling producer.ConnectionClosed() when we don't close connection make the code confusing in my opinion.


errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
request.callback(nil, errors.New(errMsg))
break SendError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following why there needs to be a break to SendError?

Warn("[HandleSendError] connection closed")
}
break SendError
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the pending request callback not needed in the default case?

Signed-off-by: xiaolongran <xiaolongran@tencent.com>
@wolfstudy
Copy link
Member Author

@zymap @cckellogg

There is currently no good idea to deal with TopicTerminatedError, so it is temporarily not supported.

@@ -33,7 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *puls
buildAndInjectSpan(message, producer).Finish()
}

func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert these formatting changes since they have nothing to do with this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause Action CI to fail:

Error: pulsar/internal/pulsartracing/producer_interceptor.go:36: line is 136 characters (lll)
func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
Error: pulsar/internal/pulsartracing/producer_interceptor_test.go:58: line is 132 characters (lll)
func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage, func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
Error: Process completed with exit code 1.

c.listenersLock.RUnlock()

if ok {
producer.ConnectionClosed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java client closes the connection in the default case.

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L649

Should we do the same. What should the expected client behavior be here?


switch *sendError.Error {
case pb.ServerError_NotAllowedError:
c.pendingLock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code is used in a couple of places now maybe we should add a function? Something like this?

func deletePendingRequest(requestID) (request, bool) {
   c.pendingLock.Lock()
   defer c.pendingLock.Unlock()
   request, ok := c.pendingReqs[requestID]
    if ok {
	delete(c.pendingReqs, requestID)
   }
   return request, ok
		
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good ideas for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java client closes the connection in the default case.

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L649

Should we do the same. What should the expected client behavior be here?

About this, Look at the comments like reconnect, look at the code like close the current connection:

            // By default, for transient error, let the **reconnection** logic
            // to take place and re-establish the produce again
            ctx.close();

Signed-off-by: xiaolongran <xiaolongran@tencent.com>
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
@wolfstudy wolfstudy merged commit f193975 into apache:master Jul 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support handleSendError command in Go SDK
3 participants