Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counterpart to writeDelimitedTo/parseDelimitedFrom #115

Closed
sorccu opened this issue Mar 28, 2014 · 9 comments
Closed

Counterpart to writeDelimitedTo/parseDelimitedFrom #115

sorccu opened this issue Mar 28, 2014 · 9 comments

Comments

@sorccu
Copy link

sorccu commented Mar 28, 2014

I often find myself missing the built-in parseDelimitedFrom and writeDelimitedTo from the official Java API.

You can kind of replicate the functionality with this:

var ByteBuffer = require('protobufjs/node_modules/bytebuffer')

var lengthBuffer = new ByteBuffer()
var messageBuffer = new SomeMessage(muchData).encodeNB()

// Delimiter
lengthBuffer.writeVarint32(messageBuffer.length)

socket.write(lengthBuffer.toBuffer())
socket.write(messageBuffer)

... but it's obviously a bit of a pain. Now I know that for example the C++ API doesn't have these methods either, but then again it's C++, where few things are convenient.

I may have missed an easier method to achieve the same result (if so, please let me know), but it would be great if we could get some kind of an encodeDelimited and decodeDelimited methods for convenience. Or, it would at least be nice to have a one-liner to create the delimiter without having to require ByteBuffer.

@sorccu
Copy link
Author

sorccu commented May 1, 2014

FWIW I ended up just implementing two stream transforms. One accepts a delimited stream, reads the length and emits full message chunks, which can then be decoded very easily. The other one automatically prepends every write with a delimiter.

@dcodeIO
Copy link
Member

dcodeIO commented May 7, 2014

@joliss
Copy link

joliss commented Jul 27, 2014

Hm, I'm missing this too - I'd like to read from a stream (stdin in particular) and emit messages as soon as they are available. The new decodeDelimited function doesn't quite seem to do it for me, because it requires that I buffer up lots of bytes in advance, rather than just the number of bytes that is necessary to decode the current message.

@sorccu, do you have any code you can share?

Or is there any cute idiom that makes this easy?

@sorccu
Copy link
Author

sorccu commented Jul 28, 2014

@joliss See https://gist.github.com/sorccu/63cdb5cb55df46e26a17. It could be made a little cleaner and more efficient but it works like you want.

@joliss
Copy link

joliss commented Jul 28, 2014

Thanks @sorccu!

@dcodeIO
Copy link
Member

dcodeIO commented Jul 28, 2014

I've adapted Message#decodeDelimited a bit to make it more useful when streaming data: https://github.com/dcodeIO/ProtoBuf.js/blob/master/src/ProtoBuf/Builder/Message.js#L473

It now reads the preceding varint length and simply returns null if there is not enough data available, yet, to parse the entire message. Thus, using a ByteBuffer (available as ProtoBuf.ByteBuffer) for buffering incoming data and successively calling decodeDelimited on it will yield messages as they become fully available. To prevent memory leaks, calling ByteBuffer#compact each Nth time a message has been returned (or, alternatively, when it becomes larger than let's say 8kb) should be good practice.

Hope this helps. If you have any additional suggestions, let me know.

@jfromaniello
Copy link

@dcodeIO Can you show me an example of this? I'm looking for a way to buffer an stream of a socket. It would be nice to have an api like this:

socket.pipe(ByteStream)
    .pipe(Message.decodeDelimited)
    .on(data, function (message) {
      console.log('do something with my message', message);
    });

@jfromaniello
Copy link

This is what I ended up doing

var server = net.createServer(function (socket) {
  var buffer;

  socket.pipe(through(function (chunk) {
    chunk = ByteBuffer.wrap(chunk);
    buffer = buffer ? ByteBuffer.concat([buffer, chunk]) : chunk;

    var decoded;

    while (buffer.remaining() > 0 && (decoded = Request.decodeDelimited(buffer))) {
      buffer.compact();
      this.queue(decoded);
    }

  })).on('data', function (m) {
    console.log(m);
  });

});

@kirillgroshkov
Copy link

Spent few hours looking for solution, and here's my working version:

function transformFromProtobuf<OUT>(type: protobuf.Type): TransformTyped<Buffer, OUT> {
  let remainder = Buffer.alloc(0)

  return new Transform({
    objectMode: false,
    readableObjectMode: true,
    async transform(chunk: Buffer, _encoding, cb) {
      // console.log(chunk)
      const buf = Buffer.concat([remainder, chunk])

      let lastPos = 0
      const reader = protobuf.Reader.create(buf)
      try {
        while (reader.pos < reader.len) {
          lastPos = reader.pos
          const frame = type.decodeDelimited(reader)
          this.push(frame)
        }

        remainder = Buffer.alloc(0)
      } catch (err) {
        // console.log({pos: reader.pos, lastPos, len: reader.len}, 'got err')
        remainder = buf.slice(lastPos)
      }
      // console.log(`remainder size: ${remainder.length}`)
      cb() // finished processing
    },
  })
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants