From df2a6d3684d26a7a5df8b1afaf40736fe75c53ea Mon Sep 17 00:00:00 2001 From: James Elias Sigurdarson Date: Mon, 6 Jun 2022 15:43:19 +0000 Subject: [PATCH] fix issue with msgpack str Signed-off-by: James Elias Sigurdarson --- src/protocol.ts | 31 ++++++++++++++++++++++++++++++- test/test.protocol.ts | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/src/protocol.ts b/src/protocol.ts index 35d47fe..8302c3c 100644 --- a/src/protocol.ts +++ b/src/protocol.ts @@ -716,7 +716,36 @@ export const encodeMessage = ( export const decodeClientStream = ( dataStream: Readable ): AsyncIterable => { - return decoder().decodeStream(dataStream) as AsyncIterable; + // This is a hack to avoid messagepack utf8-ization of the msgpack str format, since it mangles data. + // Fluentd, when using the out_forward plugin, will pass the data in PackedForward mode using a str to represent the packed Forward messages. + // This would normally end up getting decoded as utf8 by @msgpack/msgpack, and turned into complete garbage. This short circuits that function in the parser, + const streamDecoder = decoder() as any; + streamDecoder._decodeUtf8String = streamDecoder.decodeUtf8String; + streamDecoder.decodeUtf8String = function ( + this: typeof streamDecoder, + byteLength: number, + headerOffset: number + ) { + if (this.bytes.byteLength < this.pos + headerOffset + byteLength) { + // Defer to the error handling inside the normal function, if we don't have enough data to parse + return this._decodeUtf8String(byteLength, headerOffset); + } + const offset = this.pos + headerOffset; + // If the first byte is 0x92 (fixarr of size 2), this represents a msgpack str encoded entry + // Also catch 0xdc and 0xdd, which represents arrays. This should never be passed, fixarr is more efficient, but just to cover all bases. + // If the first byte is 0x1f, then assume it is compressed + if ( + this.bytes[offset] === 0x92 || + this.bytes[offset] === 0x1f || + this.bytes[offset] === 0xdc || + this.bytes[offset] === 0xdd + ) { + return this.decodeBinary(byteLength, headerOffset); + } else { + return this._decodeUtf8String(byteLength, headerOffset); + } + }.bind(streamDecoder); + return streamDecoder.decodeStream(dataStream) as AsyncIterable; }; /** diff --git a/test/test.protocol.ts b/test/test.protocol.ts index 60b7421..027f276 100644 --- a/test/test.protocol.ts +++ b/test/test.protocol.ts @@ -7,6 +7,7 @@ import { DecodeError, SharedKeyMismatchError, } from "../src/error"; +import {Readable} from "stream"; describe("Protocol", () => { describe("isHelo", () => { @@ -647,4 +648,44 @@ describe("Protocol", () => { ); }); }); + describe("decodeClientStream", () => { + it("should parse msgpack str encoded records", async () => { + const entries = [ + protocol.generateEntry(0, {abc: "def"}), + protocol.generateEntry(1, {ghi: "jkl"}), + ]; + const packedEntries = entries.map(protocol.packEntry); + const packedEntryLength = packedEntries.reduce((r, v) => r + v.length, 0); + const message = protocol.generatePackedForwardMode( + "test", + packedEntries, + packedEntryLength + ); + const msg = protocol.encodeMessage(message); + // 1 byte for the array byte (0x93, 1 byte for the fixstr containing "test"), then one byte for each of the str + // Change the 0xc4 (bin8) representing the message to a 0xd9 (str8), this is the same format as FluentD sends data as + msg[1 + 1 + message[0].length] = 0xd9; + + const s = new Readable(); + s.push(msg); + s.push(null); + + const iterable = protocol.decodeClientStream(s); + let parsedMessage: protocol.ClientMessage | undefined; + for await (const msg of iterable) { + parsedMessage = msg; + } + + expect(parsedMessage).to.not.be.undefined; + expect(protocol.isClientTransportMessage(parsedMessage)).to.be.true; + if (!parsedMessage || !protocol.isClientTransportMessage(parsedMessage)) { + return; + } + + const originalMessage = protocol.parseTransport(parsedMessage); + console.log(originalMessage); + expect(originalMessage.entries[0][0]).to.equal(entries[0][0]); + expect(originalMessage.entries[1][0]).to.equal(entries[1][0]); + }); + }); });