Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,36 @@ export const encodeMessage = (
export const decodeClientStream = (
dataStream: Readable
): AsyncIterable<ClientMessage> => {
return decoder().decodeStream(dataStream) as AsyncIterable<ClientMessage>;
// This is a hack to avoid messagepack utf8-ization of the msgpack str format, since it mangles data.
// Fluentd, when using the out_forward plugin, will pass the data in PackedForward mode using a str to represent the packed Forward messages.
// This would normally end up getting decoded as utf8 by @msgpack/msgpack, and turned into complete garbage. This short circuits that function in the parser,
const streamDecoder = decoder() as any;
streamDecoder._decodeUtf8String = streamDecoder.decodeUtf8String;
streamDecoder.decodeUtf8String = function (
this: typeof streamDecoder,
byteLength: number,
headerOffset: number
) {
if (this.bytes.byteLength < this.pos + headerOffset + byteLength) {
// Defer to the error handling inside the normal function, if we don't have enough data to parse
return this._decodeUtf8String(byteLength, headerOffset);
}
const offset = this.pos + headerOffset;
// If the first byte is 0x92 (fixarr of size 2), this represents a msgpack str encoded entry
// Also catch 0xdc and 0xdd, which represents arrays. This should never be passed, fixarr is more efficient, but just to cover all bases.
// If the first byte is 0x1f, then assume it is compressed
if (
this.bytes[offset] === 0x92 ||
this.bytes[offset] === 0x1f ||
this.bytes[offset] === 0xdc ||
this.bytes[offset] === 0xdd
) {
return this.decodeBinary(byteLength, headerOffset);
} else {
return this._decodeUtf8String(byteLength, headerOffset);
}
}.bind(streamDecoder);
return streamDecoder.decodeStream(dataStream) as AsyncIterable<ClientMessage>;
};

/**
Expand Down
41 changes: 41 additions & 0 deletions test/test.protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DecodeError,
SharedKeyMismatchError,
} from "../src/error";
import {Readable} from "stream";

describe("Protocol", () => {
describe("isHelo", () => {
Expand Down Expand Up @@ -647,4 +648,44 @@ describe("Protocol", () => {
);
});
});
describe("decodeClientStream", () => {
it("should parse msgpack str encoded records", async () => {
const entries = [
protocol.generateEntry(0, {abc: "def"}),
protocol.generateEntry(1, {ghi: "jkl"}),
];
const packedEntries = entries.map(protocol.packEntry);
const packedEntryLength = packedEntries.reduce((r, v) => r + v.length, 0);
const message = protocol.generatePackedForwardMode(
"test",
packedEntries,
packedEntryLength
);
const msg = protocol.encodeMessage(message);
// 1 byte for the array byte (0x93, 1 byte for the fixstr containing "test"), then one byte for each of the str
// Change the 0xc4 (bin8) representing the message to a 0xd9 (str8), this is the same format as FluentD sends data as
msg[1 + 1 + message[0].length] = 0xd9;

const s = new Readable();
s.push(msg);
s.push(null);

const iterable = protocol.decodeClientStream(s);
let parsedMessage: protocol.ClientMessage | undefined;
for await (const msg of iterable) {
parsedMessage = msg;
}

expect(parsedMessage).to.not.be.undefined;
expect(protocol.isClientTransportMessage(parsedMessage)).to.be.true;
if (!parsedMessage || !protocol.isClientTransportMessage(parsedMessage)) {
return;
}

const originalMessage = protocol.parseTransport(parsedMessage);
console.log(originalMessage);
expect(originalMessage.entries[0][0]).to.equal(entries[0][0]);
expect(originalMessage.entries[1][0]).to.equal(entries[1][0]);
});
});
});