Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunked Responses for Batch requests #396

Closed
gauravmk opened this issue May 21, 2017 · 11 comments
Closed

Chunked Responses for Batch requests #396

gauravmk opened this issue May 21, 2017 · 11 comments

Comments

@gauravmk
Copy link

gauravmk commented May 21, 2017

Because apollo-client automatically batches requests, you can sometimes get in a situation where a single slow query can block a whole page from loading instead of optimizing for "time to interactive". One way of solving this would be for client programmers to manually adjust the batching behavior. A more automatic and efficient way is for the server to stream results as each request completes.

You could imagine an option to have a "Streaming GraphQL Server" which sends down chunked responses as each query computes. Clients would have to be able to handle results coming in arbitrary orders.

I hacked together something quick in express using our own API and the graphql-server library
chunked response

The second query is much much faster than the first, and they get sent down in the order they finish.

@helfer
Copy link
Contributor

helfer commented Jun 9, 2017

@gauravmk I think that's a great idea. You'd have to make sure it works with Apollo Client's network interface though, because it's no use if the response is chunked but Apollo Client doesn't get the result until the entire response is back.

Maybe you could start with a PR to Apollo Server, and then one to Apollo Client to support the chunking on both ends of the transport? On the server there should be an option to turn it on or off, but on the client it can always be supported, I think.

@gauravmk
Copy link
Author

gauravmk commented Jun 9, 2017

Yep, I got a reference of it working in our codebase using a custom network interface. I got our server to respect a serverKey to piece together the query -> result on the client.

It uses the new fetch api but there's mixed support for it. Chrome natively supports and there are polyfills for some browsers, but others legitimately have no way of flushing partial responses predictably. I've only tested it with chrome.

I've had some conversations with other folks at Remind and a couple other thoughts were thrown around including using HTTP/2 to support a streaming interface

Here's the streaming interface I have on scratch branches. I'll clean it up and put out a couple PRs on server and on client.

Client (some copy-paste from BatchNetworkInterface):

import { ExecutionResult } from 'graphql';
import { HTTPBatchedNetworkInterface, Request } from 'apollo-client';
import { map } from 'lodash';
import { print } from 'graphql/language/printer';
import { TextDecoder } from 'text-encoding';
import uuid from 'utils/uuid';
import fetchStream from 'fetch-readablestream';

export interface BatchRequestAndOptions {
  requests: Request[],
  options: RequestInit,
}

export interface BatchResponseAndOptions {
  responses: Response[],
  options: RequestInit,
}

interface QueryFetchRequest {
  request: Request,
  promise?: Promise<ExecutionResult>,
  resolve?: (result: ExecutionResult) => void,
  reject?: (error: Error) => void,
}

export class HTTPStreamingBatchedNetworkInterface extends HTTPBatchedNetworkInterface {
  queuedRequests = {};

  query(request: Request): Promise<ExecutionResult> {
    const fetchRequest: QueryFetchRequest = { request };
    this.queuedRequests[uuid()] = fetchRequest;
    fetchRequest.promise = new Promise((resolve, reject) => {
      fetchRequest.resolve = resolve;
      fetchRequest.reject = reject;
    });

    // The first enqueued request triggers the queue consumption after `batchInterval` milliseconds.
    if (Object.keys(this.queuedRequests).length === 1) {
      setTimeout(() => {
        this.consumeQueue();
      }, this.batchInterval);
    }

    return fetchRequest.promise;
  }

  consumeQueue() {
    const processingRequests = { ...this.queuedRequests };
    this.queuedRequests = {};

    const middlewarePromise: Promise<BatchRequestAndOptions> = this.applyBatchMiddlewares({
      requests: map(processingRequests, (r, key) => ({ ...r.request, streamingKey: key })),
      options: {},
    });

    middlewarePromise
      .then((batchRequestAndOptions: BatchRequestAndOptions) => {
        const { options, requests } = batchRequestAndOptions;

        // Serialize the requests to strings of JSON
        const printedRequests = requests.map(request => ({
          ...request,
          query: print(request.query),
        }));

        const startTime = Date.now();

        const readAllChunks = readableStream => {
          const reader = readableStream.getReader();
          let chunk = '';

          function readChunk() {
            reader
              .read()
              .then(({ value, done }) => {
                if (!done) {
                  // There's got to be a client library that properly handles chunked responses, but I couldn't find one
                  chunk += new TextDecoder('utf-8').decode(value);
                  if (chunk.endsWith('\r\n')) {
                    chunk.split('\r\n').filter(e => !!e).forEach(resp => {
                      const data = JSON.parse(resp);
                      const request = processingRequests[data.request];
                      console.log(
                        `Sending down chunked response for ${request.request.operationName} after ${Date.now() - startTime}ms`
                      );
                      request.resolve(data.result);
                    });
                    chunk = '';
                  }
                  readChunk();
                }
              })
              .catch(err => {
                console.error('Error reading from a readableStream', err);
              });
          }

          readChunk();
        };
        fetchStream(this._uri, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            ...options.headers,
          },
          credentials: 'include',
          body: JSON.stringify(printedRequests),
        })
          .then(response => readAllChunks(response.body))
          .catch(err => {
            console.error(err);
          });
      })
      .catch(err => {
        console.error(err);
      });
  }
}

export function createStreamingNetworkInterface(options) {
  if (!options) {
    throw new Error('You must pass an options argument to createNetworkInterface.');
  }
  return new HTTPStreamingBatchedNetworkInterface(
    options.uri,
    options.batchInterval,
    options.opts || {}
  );
}

Server:

import { runQuery } from 'graphql-server-core';

export default function streamingGraphQLExpress(options) {
  return (req, res) => {
    const graphqlOptions = options(req, res);
    let requestPayload = req.body;

    if (!Array.isArray(requestPayload)) {
      requestPayload = [requestPayload];
    }

    const requests = requestPayload.map(requestParams => {
      const { query, operationName, streamingKey } = requestParams;
      let { variables } = requestParams;

      if (!streamingKey) {
        res.write(`${JSON.stringify({ error: 'Operations need streaming keys' })}\r\n`);
        return Promise.resolve();
      }

      if (typeof variables === 'string') {
        try {
          variables = JSON.parse(variables);
        } catch (error) {
          res.write(`${JSON.stringify({ error: 'Variables not parseable' })}\r\n`);
          return Promise.resolve();
        }
      }

      const context = Object.assign({}, graphqlOptions.context || {});

      let params = {
        ...graphqlOptions,
        query,
        variables,
        context,
        operationName,
      };

      if (graphqlOptions.formatParams) {
        params = graphqlOptions.formatParams(params);
      }

      return runQuery(params).then(queryResult => {
        res.write(`${JSON.stringify({ request: streamingKey, result: queryResult })}\r\n`);
      });
    });

    Promise.all(requests).then(() => {
      res.end();
    });
  };
}

@gauravmk
Copy link
Author

gauravmk commented Jun 9, 2017

The way I did it for our server is /graphql uses the normal response. /graphql/stream gives you back a streaming version because the shape of the request and response aren't compatible with each other

@dobesv
Copy link

dobesv commented Feb 9, 2018

Could follow the convention of ldjson, e.g.: https://www.npmjs.com/package/ldjson

So instead of returning an array of responses, return responses separated by newlines.

@intellix
Copy link

intellix commented Apr 2, 2018

Would this possible through Websockets and/or Server Sent Events as well? I'm already using graphql-subscriptions with sockets and have it all setup.

Would be sweet to request through socket and receive downstream as they resolve

@jpg013
Copy link

jpg013 commented Sep 29, 2018

I'm highly interested if there has been any traction with this? I have some use cases where I could really take advantage of this idea of streaming results as they come in.

@dobesv
Copy link

dobesv commented Sep 30, 2018 via email

@jpg013
Copy link

jpg013 commented Sep 30, 2018

Thanks, I'll certainly look into it.

@jbaxleyiii jbaxleyiii added the 🚧👷‍♀️👷‍♂️🚧 in triage Issue currently being triaged label Jul 8, 2019
@jbaxleyiii
Copy link
Contributor

@gauravmk this is a really interesting idea! We are planning with Apollo Server three to implement a more flexible transport layer and add support for @defer and @stream in the next version. Between these three solutions, this should be possible in the next major release. I'm going to close this issue since it is being worked on in a larger focus but it is added to the project board so we don't lose track of this feature!

@jbaxleyiii jbaxleyiii added 🚧👷‍♀️👷‍♂️🚧 in triage Issue currently being triaged and removed 🚧👷‍♀️👷‍♂️🚧 in triage Issue currently being triaged labels Jul 8, 2019
@abernix abernix removed 🚧👷‍♀️👷‍♂️🚧 in triage Issue currently being triaged labels Jul 9, 2019
@abernix
Copy link
Member

abernix commented Aug 24, 2019

The proposal for transports is now open on #3184. We still have a way to go before @defer is a stable thing, but I'm just creating appropriate cookie-crumbs to link some issues together. Support for chunked responses is entirely a goal of the new transport design, and @defer an eventual outcome!

@Spartano
Copy link

Spartano commented Jan 8, 2020

I suppose as a workaround you can use a graphql subscription to send the data if it is arriving in pieces.

On Sat, Sep 29, 2018, 2:58 PM Justin Graber, @.***> wrote: I'm highly interested if there has been any traction with this? I have some use cases where I could really take advantage of this idea of streaming results as they come in. — You are receiving this because you commented. Reply to this email directly, view it on GitHub <#396 (comment)>, or mute the thread https://github.com/notifications/unsubscribe-auth/AAUAmTbD_iAAGYPDdIxCbppCeDaFS6XTks5uf-0YgaJpZM4NhdkS .

Can you show an example pls.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Apr 20, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

8 participants