Skip to content

Allow sending custom headers and status codes on a SSE controller when an async flow is required #12260

Open
@notaphplover

Description

@notaphplover

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

I've been doing a PoC involving the SSE feature.

The abstraction looks good, working with Observables feel like a nice way to representing an SSE based flow. Having said that, I really don't know how I should implement a non 200 HTTP status code in some scenarios.

After reading the spec, I would say it's perfectly fine to send non 200 status codes, the spec even states that redirections are allowed:

Event stream requests can be redirected using HTTP 301 and 307 redirects as with normal HTTP requests. Clients will reconnect if the connection is closed; a client can be told to stop reconnecting using the HTTP 204 No Content response code.

Having a look to the original issue that let to the SSE feature (#4826), it seems this can be done accesing the response:

@Sse("verify-payment")
	verifyPayment(@Res() response: { raw: ServerResponse }, @Query("tx") tx?: string): Observable<{ data: { status: DTOBookingStage | "close", msg?: string } }> {
	response.raw.setHeader('Access-Control-Allow-Origin', BBConfig.envVars.NODE_ENV === "production" ? "https://www.example.com" : "http://localhost:3000"); //FIXME: It won't be good when i'll have staging
	return this.bookService.verifyClientPayment({}, tx);
}

It seems I could probably set the status code in the same way I set a header, but I would say this is sort of tricky: it only allow us to set headers / status code if we do this in a syncronous flow, before the observable result is returned.

Having a look at the nest repository, it seems the headers are send as soon as the observable object is returned by the controller handler:

Having a look at packages/core/router/router-response-controller.ts:

public sse<
    TInput extends Observable<unknown> = any,
    TResponse extends WritableHeaderStream = any,
    TRequest extends IncomingMessage = any,
  >(
    result: TInput,
    response: TResponse,
    request: TRequest,
    options?: { additionalHeaders: AdditionalHeaders },
  ) {
    // It's possible that we sent headers already so don't use a stream
    if (response.writableEnded) {
      return;
    }

    this.assertObservable(result);

    const stream = new SseStream(request);
    stream.pipe(response, options);

    // Code continues handling the observable result.
}

When piping the SSE stream, it seems headers are flushed whatsoever:

Having a look at packages/core/router/sse-stream.ts

pipe<T extends WritableHeaderStream>(
    destination: T,
    options?: {
      additionalHeaders?: AdditionalHeaders;
      end?: boolean;
    },
  ): T {
    if (destination.writeHead) {
      destination.writeHead(200, {
        ...options?.additionalHeaders,
        // See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
        'Content-Type': 'text/event-stream',
        Connection: 'keep-alive',
        // Disable cache, even for old browsers and proxies
        'Cache-Control':
          'private, no-cache, no-store, must-revalidate, max-age=0, no-transform',
        Pragma: 'no-cache',
        Expire: '0',
        // NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
        'X-Accel-Buffering': 'no',
      });
      destination.flushHeaders();
    }

    destination.write('\n');
    return super.pipe(destination, options);
  }

So, keeping all of this in mind, I cannot see a way of sending a non 200 http status code nor some custom headers if I require an async flow to determine these headers / status code.

A simple example would be a SSE endpoint to subscribe to the events happening on a resource. Let's say I want to implement a GET v1/games/{gameId}/events SSE endpoint. I expect this endpoint to return a 404 status code if the {gameId} game resource does not exist. In order to accomplish that, I would need to search my game resource in my data source, an operation it's very likely to be asyncronous. By the way I would had fetched my data source response, the headers would had been flushed, so it would be too late to send my 404 status code in case no game is found.

Describe the solution you'd like

I would love an abstraction that lets the developer do an async flow in order to determine custom headers / status codes.

Maybe this is a terrible approach, but allowing SSE controller handlers to return a Promise<Observable<MessageEvent>> could be a simple, non breaking change solution. Sending an Observable<MessageEvent> would still be allowed. The idea would be waiting for the observable promise to be fullfilled so the sse handler can set custom headers / custom status codes. Once the promise is fullfilled, headers would be flushed.

Teachability, documentation, adoption, migration strategy

In case of going this way, SSE doc page should be updated to reflect the possibilty of sending a Promise<Observable<MessageEvent>> and some examples should be added.

The migration strategy would be trivial imo due to the fact this is not a breaking change. Sending an Observable<MessageEvent> would still be allowed.

What is the motivation / use case for changing the behavior?

The lack of an approach to set custom headers / status code in the scenario I described in the previous section of the issue.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions