Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Data streaming interfaces #56

Closed
Tracked by #49
denis-ilchishin opened this issue Dec 11, 2023 · 3 comments · Fixed by #63
Closed
Tracked by #49

Data streaming interfaces #56

denis-ilchishin opened this issue Dec 11, 2023 · 3 comments · Fixed by #63

Comments

@denis-ilchishin
Copy link
Owner

denis-ilchishin commented Dec 11, 2023

In addition to #55, it also will be usefull to come up with interfaces for streams.

Server-to-Client:

Basically, there will be two major types of data streaming:

  1. binary data - just bunch of bytes splitted and sent in chunks
  2. json data - some structured data
    Server-side streams API :
class BaseStreamResponse extends Writable {}

class BinaryStreamResponse extends BaseStreamResponse {}

class JsonStreamResponse extends BaseStreamResponse {}

// example of binary data stream
declareProcedure(() => {
  const stream = new BinaryStreamResponse()
  fs.createReadableStream('/path/to/static/file').pipe(stream)
  return stream 
})

// example of json data stream
declareProcedure(() => {
  async function getDataFromDb() {
    for await (const rows of cursor) {
       rows.length ? stream.write(rows) : stream.end()
    }
  }
  const stream = new JsonStreamResponse()
  getDataFromDb()
  return stream 
})

Then client-side stream API could be something like this, where client.rpc return instance of Readable (which can be used as async iterable):

const stream = await client.rpc(`v1/binary-stream`, someInput)

for await (const arrayBuffer of stream) {
  // do something with buffer
}
const items = []
function loadItems() {
  const stream = await client.rpc(`v1/json-stream`, someInput)
  
  for await (const rows of stream) {
    items.push(...rows)
  }
}

loadItems()

Concerns and important things that need to take care of:

  1. Clean API for stream abortion, both client and server side
  2. Backpressure
  3. Consistency across different transports and environments (e.g browsers will require polyfils if use node's streams that will noticeable increase to bundle size)

Client-to-Server:

When it comes to uploading data to server, there's no point of having json streams, since any input from client should be validated, so only there will be only binary data streaming. In the end it should be easy and convenient, something like:

// client-side
const data = {
  name: 'Den',
  occupation: 'se',
  profilePhoto: client.createStream(imageBlob),
}
client.rpc('create-user', data)

And on the server side:

// server-side `create-user` procedure
declareProcedure({
  handle: async (ctx, data) => {
    const {
      name,  // string
      occupation,  // string
      profilePhoto // Readable somehow magically appears here, but at this point of time, it only awaits to 
    } = data
    
    // do some things, e.g validations, db calls, etc...
    
     // this signals to finally start uploading stuff and pipe it to the storage, 
     // since all the validations or other async operations are done, 
     // and it's actually ready to do process the data
    await profilePhoto.pipe(storage.saveTo(savePath)) 
    
  }
})

Underlying implementations need to solve following problems:

  1. Inject a stream as a "part" of an actual RPC payload, which will be very convenient and straightforward for both operations: send data (client-side) and consume it (server-side)
  2. Allow to perform async operations without building internal buffer - not starting to push data to the server, before it actually is ready to consume it - which means that server need to signals back to the client that it's ready. This particular requirement could be quite a challenge for some transports, like HTTP.

Possible solution to problem №2 with HTTP transport is to use multiple connection for RPC with included streams:

  1. Client makes a request to Server to initialize Stream. Server replies with headers which contains Stream ID, but not closes it (aka long-polling) and awaits for further signals.
  2. Client makes RPC. Server calls procedure handler. Then handler has to options:
  3. Accept stream -> signals to the awaiting connection to close with OK status in body.
  4. Reject stream -> signals to the awaiting connection to close with NON-OK status in body.
  5. On first long-polling response Client closes the stream if rejected, or starts uploading data if accepted

But, this approach comes with one limitation. All requests must be done to the same API worker, which could require proper load balancing configuration.

UPD:

After some thought, the best option for client-side streams so far seem to be just Streams API. It is the most portable solution for now, supported pretty much everywhere: Web, Node, Bun and Deno.

If a client is being used in Node.js or Bun env, then it can easily be transfromed to Node's Readable with Readable.fromWeb().
Also, asyncIterable is not widely supported in browsers yet, so it might be usefull to provider some toAsyncIterable() method.

UPD 2:

There's also quite common usecase, when for some RPC server no only "responds" with stream, but also with some payload. Therefore:

// server-side
// example of binary data stream
declareProcedure(() => {
  const payload = { some: 'thing' }
  const stream = createBinaryStreamResponse(payload)
  fs.createReadableStream('/path/to/file').pipe(stream)
  return stream 
})

// example of json data stream
declareProcedure(() => {
  async function getDataFromDb() {
    for await (const rows of cursor) {
       rows.length ? stream.write(rows) : stream.end() // .write typing expect { data: string }[]
    }
  }
  const payload = { some: 'thing' }
  const stream = createJsonStreamResponse(payload).with<{ data: string }[]>() // helper for static typing
  getDataFromDb()
  return stream 
})

Then client-side API will be the following:

const {  
  payload,  // { some: string }
  stream, // ReadableStream<{ data: string }>
} = await client.rpc(`v1/json-stream`, someInput)
// or
const {  
  payload,  // { some: string }
  stream, // ReadableStream<ArrayBuffer>
} = await client.rpc(`v1/binary-stream`, someInput)
@denis-ilchishin
Copy link
Owner Author

Reopen as it is not finished yet. There's also client-to-server streaming

@denis-ilchishin
Copy link
Owner Author

Added with #66

@denis-ilchishin
Copy link
Owner Author

Finalized in #67

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant