Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client streaming retry interceptor #2115

Closed
Arun-KumarH opened this issue May 6, 2022 · 1 comment
Closed

Client streaming retry interceptor #2115

Arun-KumarH opened this issue May 6, 2022 · 1 comment

Comments

@Arun-KumarH
Copy link

We are implementing a retry Intercepting call, so that in case of connection reset error the request is made again (ECONNRESET [{"message":"14 UNAVAILABLE: read ECONNRESET"}]).

This retry interceptor works fine for Unary requests but for client streaming request we observe that after the first chunk of data is sent then we receive a status OK and connection is closed. (although still other chunks are not being sent).
From the retry block if we remove the line newCall.halfClose(); then the connection hangs after sending the first chunk of data and connection times out.

Is there an example for client / server stream retry mechanism ?

  retry_interceptor(options: InterceptorOptions, nextCall: NextCall): any {
    let savedMetadata: Metadata;
    let savedSendMessage: any;
    let savedReceiveMessage: any;
    let savedMessageNext: any;
    let maxRetries = 50;
    let requester = (new RequesterBuilder())
      .withStart(function (metadata: Metadata, listener: InterceptingListener,
        next: (metadata: Metadata, listener: InterceptingListener | Listener) => void): void {
        savedMetadata = metadata;
        let new_listener = (new ListenerBuilder())
          .withOnReceiveMessage(function (message: any, next: any) {
            savedReceiveMessage = message;
            savedMessageNext = next;
            // for response streaming
            if (options?.method_definition?.responseStream === true) {
              savedMessageNext(savedReceiveMessage);
            }
          })
          .withOnReceiveStatus(function (status: any, next: any) {
            let retries = 0;
            let retry = function (message: any, metadata: any) {
              retries++;
              let newCall = nextCall(options);
              let receivedMessage: any;
              newCall.start(metadata, {
                onReceiveMessage: function (message: any) {
                  receivedMessage = message;
                },
                onReceiveStatus: async function (status: StatusObject) {
                  if (status.code !== grpcStatus.OK) {
                    if (retries <= maxRetries) {
                      console.log('Retrying request', { retries, options });
                      retry(message, metadata);
                    } else if (savedMessageNext) {
                      savedMessageNext(receivedMessage);
                      next(status);
                    } else {
                      next(status);
                    }
                  } else {
                    let new_status: any = (new StatusBuilder())
                      .withCode(grpcStatus.OK).build();
                    savedMessageNext(receivedMessage);
                    next(new_status);
                  }
                }
              });
              newCall.sendMessage(message);
              newCall.halfClose();
            };
            if (status.code !== grpcStatus.OK) {
              console.log('Retrying due to status', status);
              retry(savedSendMessage, savedMetadata);
            } else {
              // not to send last chunk twice for response stream
              if (options?.method_definition?.responseStream === false) {
                savedMessageNext(savedReceiveMessage);
              }
              next(status);
            }
          }
          ).build();
        next(metadata, new_listener);
      })
      .withSendMessage(function (message, next) {
        savedSendMessage = message;
        next(message);
      }).build();
    return new InterceptingCall(nextCall(options), requester);
  };
@Arun-KumarH Arun-KumarH changed the title Client / server streaming retry interceptor Client streaming retry interceptor May 6, 2022
@murgatroid99
Copy link
Member

Now that version 1.8.x is out, I recommend instead using the built-in retry support, which is implemented according to this specification. It handles streaming automatically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants