From 63c3681211ceb0454c4da800c5920822ea9d8b2f Mon Sep 17 00:00:00 2001 From: Sami Turcotte Date: Mon, 3 Dec 2018 09:18:49 -0500 Subject: [PATCH] Add buffer support to split, join, replace and parse streams --- package.json | 2 +- samples/split.js | 12 ++++++++++++ src/index.spec.ts | 24 ++++++++++++++++++++++++ src/index.ts | 24 ++++++++++++++++-------- 4 files changed, 53 insertions(+), 9 deletions(-) create mode 100644 samples/split.js diff --git a/package.json b/package.json index 2b6648d..8c4f583 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mhysa", - "version": "1.0.0", + "version": "1.0.1", "description": "Streams and event emitter utils for Node.js", "keywords": [ "promise", diff --git a/samples/split.js b/samples/split.js new file mode 100644 index 0000000..2bb492c --- /dev/null +++ b/samples/split.js @@ -0,0 +1,12 @@ +const mhysa = require("mhysa"); +let stream = mhysa.split(","); + +const buf = Buffer.from("a,b,c"); +stream.on("data", function(data) { + console.log(data); +}); + +for (let i = 0; i < buf.length; ++i) { + stream.write(buf.slice(i, i + 1)); +} +stream.end(); diff --git a/src/index.spec.ts b/src/index.spec.ts index 26d65ac..6066ae6 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -495,6 +495,30 @@ test.cb("split() splits chunks using the specified separator", t => { source.push(null); }); +test.cb( + "split() splits utf-8 encoded buffers using the specified separator", + t => { + t.plan(3); + const expectedElements = ["a", "b", "c"]; + let i = 0; + const through = split(","); + const buf = Buffer.from("a,b,c"); + through + .on("data", element => { + expect(element).to.equal(expectedElements[i]); + i++; + t.pass(); + }) + .on("error", t.end) + .on("end", t.end); + + for (let j = 0; j < buf.length; ++j) { + through.write(buf.slice(j, j + 1)); + } + through.end(); + }, +); + test.cb("join() joins chunks using the specified separator", t => { t.plan(9); const source = new Readable({ objectMode: true }); diff --git a/src/index.ts b/src/index.ts index 3564966..2147cc3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -208,8 +208,10 @@ export function split( return new Transform({ readableObjectMode: true, writableObjectMode: true, - async transform(chunk: string, encoding, callback) { - const splitted = chunk.split(separator); + transform(chunk: string | Buffer, encoding, callback) { + const asString = + chunk instanceof Buffer ? chunk.toString(encoding) : chunk; + const splitted = asString.split(separator); if (buffered.length > 0 && splitted.length > 1) { splitted[0] = buffered.concat(splitted[0]); buffered = ""; @@ -233,11 +235,13 @@ export function join(separator: string): NodeJS.ReadWriteStream { return new Transform({ readableObjectMode: true, writableObjectMode: true, - async transform(chunk: string, encoding, callback) { + async transform(chunk: string | Buffer, encoding, callback) { + const asString = + chunk instanceof Buffer ? chunk.toString(encoding) : chunk; if (!isFirstChunk) { this.push(separator); } - this.push(chunk); + this.push(asString); isFirstChunk = false; callback(); }, @@ -257,8 +261,10 @@ export function replace( return new Transform({ readableObjectMode: true, writableObjectMode: true, - transform(chunk: string, encoding, callback) { - callback(undefined, chunk.replace(searchValue, replaceValue)); + transform(chunk: string | Buffer, encoding, callback) { + const asString = + chunk instanceof Buffer ? chunk.toString(encoding) : chunk; + callback(undefined, asString.replace(searchValue, replaceValue)); }, }); } @@ -270,10 +276,12 @@ export function parse(): NodeJS.ReadWriteStream { return new Transform({ readableObjectMode: true, writableObjectMode: true, - async transform(chunk: string, encoding, callback) { + async transform(chunk: string | Buffer, encoding, callback) { try { + const asString = + chunk instanceof Buffer ? chunk.toString(encoding) : chunk; // Using await causes parsing errors to be emitted - callback(undefined, await JSON.parse(chunk)); + callback(undefined, await JSON.parse(asString)); } catch (err) { callback(err); }