Skip to content

Commit 9a1c5cc

Browse files
authored
Feature/node streams (#257)
* update fromNodeStream test, add toNodeStream tests * update fromNodeStream to only add one extra end event handler * add iterable/asynciterable toNodeStream implementations * ugh, update npm because semver * fix lint * update Ix.node AsyncIterable pipe implementation to support piping streams
1 parent 8685f40 commit 9a1c5cc

27 files changed

+850
-130
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ before_install:
2525
- export PATH=$HOME/.yarn/bin:$PATH
2626
- which yarn # for debugging Travis
2727
- echo $PATH # for debugging Travis
28+
- node -v && npm i -g npm && npm -v
2829

2930
script:
3031
- |

.vscode/launch.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"request": "launch",
2121
"name": "Debug Unit Tests",
2222
"cwd": "${workspaceRoot}",
23+
"console": "integratedTerminal",
2324
"program": "${workspaceFolder}/node_modules/.bin/jest",
2425
"skipFiles": [
2526
"<node_internals>/**/*.js",

gulp/closure-task.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ const closureTask = ((cache) => memoizeTask(cache, async function closure(target
4545
await mkdirp(out);
4646

4747
await Promise.all([
48-
`${mainExport}`,
49-
`${mainExport}.internal`
48+
`${mainExport}.dom`,
49+
`${mainExport}.dom.internal`
5050
].map(closureCompile));
5151

5252
async function closureCompile(entry) {

gulp/minify-task.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const minifyTask = ((cache, commonConfig) => memoizeTask(cache, function minifyJ
4343
output: { ...commonConfig.output,
4444
path: path.resolve(`./${out}`) } };
4545

46-
const webpackConfigs = [mainExport, `${mainExport}.internal`].map((entry) => ({
46+
const webpackConfigs = [`${mainExport}.dom`, `${mainExport}.dom.internal`].map((entry) => ({
4747
...targetConfig,
4848
name: entry,
4949
entry: { [entry]: path.resolve(`${src}/${entry}.js`) },

gulp/package-task.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ const createMainPackageJson = (target, format) => (orig) => ({
4646
...createTypeScriptPackageJson(target, format)(orig),
4747
bin: orig.bin,
4848
name: npmPkgName,
49-
browser: `${mainExport}`,
5049
main: `${mainExport}.node`,
50+
browser: `${mainExport}.dom`,
5151
types: `${mainExport}.node.d.ts`,
5252
unpkg: `${mainExport}.es5.min.js`,
5353
[`esm`]: { mode: `all`, sourceMap: true }
@@ -56,9 +56,9 @@ const createMainPackageJson = (target, format) => (orig) => ({
5656
const createTypeScriptPackageJson = (target, format) => (orig) => ({
5757
...createScopedPackageJSON(target, format)(orig),
5858
bin: undefined,
59-
browser: `${mainExport}.ts`,
6059
main: `${mainExport}.node.ts`,
6160
types: `${mainExport}.node.ts`,
61+
browser: `${mainExport}.dom.ts`,
6262
dependencies: {
6363
'@types/node': '*',
6464
...orig.dependencies
@@ -71,9 +71,9 @@ const createScopedPackageJSON = (target, format) => (({ name, ...orig }) =>
7171
(xs, key) => ({ ...xs, [key]: xs[key] || orig[key] }),
7272
{
7373
name: `${npmOrgName}/${packageName(target, format)}`,
74-
browser: format === 'umd' ? undefined : `${mainExport}`,
75-
main: format === 'umd' ? `${mainExport}` : `${mainExport}.node`,
76-
types: format === 'umd' ? undefined : `${mainExport}.node.d.ts`,
74+
browser: format === 'umd' ? undefined : `${mainExport}.dom`,
75+
main: format === 'umd' ? `${mainExport}.dom` : `${mainExport}.node`,
76+
types: format === 'umd' ? `${mainExport}.d.ts` : `${mainExport}.node.d.ts`,
7777
version: undefined, unpkg: undefined, module: undefined, [`esm`]: undefined,
7878
}
7979
)

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
],
4646
"dependencies": {
4747
"@types/node": "10.12.18",
48+
"is-stream": "1.1.0",
4849
"tslib": "^1.9.3"
4950
},
5051
"devDependencies": {

spec/Ix.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* tslint:disable */
22

3-
(<any> global).window = (<any> global).window || global;
3+
(<any>global).window = (<any>global).window || global;
44

55
// these are duplicated in the gulpfile :<
66
const targets = [`es5`, `es2015`, `esnext`];
@@ -20,7 +20,7 @@ else if (target === `ts` || target === `apache-arrow`) modulePath = target;
2020
else modulePath = path.join(target, format);
2121

2222
modulePath = path.resolve(`./targets`, modulePath);
23-
const IxPath = path.join(modulePath, `Ix${format === 'umd' ? '' : '.node'}.internal`);
23+
const IxPath = path.join(modulePath, `Ix${format === 'umd' ? '.dom' : '.node'}.internal`);
2424
const IxInternal: typeof import('../src/Ix.node.internal') = require(IxPath);
2525

2626
export = IxInternal;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import '../asynciterablehelpers';
2+
import { AsyncIterable, AsyncIterableReadable } from '../Ix';
3+
4+
(() => {
5+
if (!AsyncIterableReadable || process.env.TEST_NODE_STREAMS !== 'true') {
6+
return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {
7+
/**/
8+
});
9+
}
10+
11+
const stringsItr = () => AsyncIterable.from([1, 2, 3]).map(i => `${i}`);
12+
const buffersItr = () => stringsItr().map(val => Buffer.from(val));
13+
const objectsItr = () => stringsItr().map(val => ({ val }));
14+
const compare = <T>(a: T, b: T) => {
15+
let aVal = ArrayBuffer.isView(a) ? `${Buffer.from(a.buffer, a.byteOffset, a.byteLength)}` : a;
16+
let bVal = ArrayBuffer.isView(b) ? `${Buffer.from(b.buffer, b.byteOffset, b.byteLength)}` : b;
17+
// poor man's deep-equals
18+
try {
19+
expect(aVal).toEqual(bVal);
20+
} catch (e) {
21+
return false;
22+
}
23+
return true;
24+
};
25+
26+
describe(`AsyncIterable#toNodeStream`, () => {
27+
describe(`objectMode: true`, () => {
28+
const expectedStrings = ['1', '2', '3'];
29+
const expectedObjects = expectedStrings.map(val => ({ val }));
30+
const expectedBuffers = expectedStrings.map(x => Buffer.from(x));
31+
test(`yields Strings`, async () => {
32+
await expect(stringsItr().toNodeStream({ objectMode: true })).toEqualStream(
33+
AsyncIterable.from(expectedStrings),
34+
compare
35+
);
36+
});
37+
test(`yields Buffers`, async () => {
38+
await expect(buffersItr().toNodeStream({ objectMode: true })).toEqualStream(
39+
AsyncIterable.from(expectedBuffers),
40+
compare
41+
);
42+
});
43+
test(`yields Objects`, async () => {
44+
await expect(objectsItr().toNodeStream({ objectMode: true })).toEqualStream(
45+
AsyncIterable.from(expectedObjects),
46+
compare
47+
);
48+
});
49+
});
50+
51+
describe(`objectMode: false`, () => {
52+
const expectedStrings = ['123'];
53+
const expectedBuffers = expectedStrings.map(x => Buffer.from(x));
54+
test(`yields Strings`, async () => {
55+
await expect(stringsItr().toNodeStream({ objectMode: false })).toEqualStream(
56+
AsyncIterable.from(expectedStrings),
57+
compare
58+
);
59+
});
60+
test(`yields Buffers`, async () => {
61+
await expect(buffersItr().toNodeStream({ objectMode: false })).toEqualStream(
62+
AsyncIterable.from(expectedBuffers),
63+
compare
64+
);
65+
});
66+
});
67+
});
68+
})();
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import * as Ix from '../Ix';
2+
import { PassThrough } from 'stream';
3+
4+
(() => {
5+
if (process.env.TEST_NODE_STREAMS !== 'true') {
6+
return test('not testing node streams because process.env.TEST_NODE_STREAMS !== "true"', () => {
7+
/**/
8+
});
9+
}
10+
11+
const { of } = Ix.AsyncIterable;
12+
const { empty, map } = Ix.asynciterable;
13+
const { sequenceEqual } = Ix.asynciterable;
14+
const through = () => {
15+
return new PassThrough({
16+
objectMode: true,
17+
readableObjectMode: true,
18+
writableObjectMode: true
19+
});
20+
};
21+
22+
test('AsyncIterable#pipe writable-stream single element', async () => {
23+
const source = of({ name: 'Frank', custId: 98088 });
24+
const expected = of('Frank');
25+
26+
expect(await sequenceEqual(expected, map(source.pipe(through()), x => x.name))).toBeTruthy();
27+
});
28+
29+
test('AsyncIterable#pipe writable-stream maps property', async () => {
30+
const source = of<any>(
31+
{ name: 'Frank', custId: 98088 },
32+
{ name: 'Bob', custId: 29099 },
33+
{ name: 'Chris', custId: 39033 },
34+
{ name: null, custId: 30349 },
35+
{ name: 'Frank', custId: 39030 }
36+
);
37+
const expected = of('Frank', 'Bob', 'Chris', null, 'Frank');
38+
39+
expect(await sequenceEqual(expected, map(source.pipe(through()), x => x.name))).toBeTruthy();
40+
});
41+
42+
test('AsyncIterable#pipe writable-stream empty', async () => {
43+
expect(
44+
await sequenceEqual(empty<number>(), map(empty<string>(), (s, i) => s.length + i))
45+
).toBeTruthy();
46+
});
47+
48+
test('AsyncIterable#pipe writable-stream map property using index', async () => {
49+
const source = of(
50+
{ name: 'Frank', custId: 98088 },
51+
{ name: 'Bob', custId: 29099 },
52+
{ name: 'Chris', custId: 39033 }
53+
);
54+
const expected = of('Frank', null, null);
55+
56+
expect(
57+
await sequenceEqual(
58+
expected,
59+
map(source.pipe(through()), (x, i) => (i === 0 ? x.name : null))
60+
)
61+
).toBeTruthy();
62+
});
63+
64+
test('AsyncIterable#pipe writable-stream map property using index on last', async () => {
65+
const source = of(
66+
{ name: 'Frank', custId: 98088 },
67+
{ name: 'Bob', custId: 29099 },
68+
{ name: 'Chris', custId: 39033 },
69+
{ name: 'Bill', custId: 30349 },
70+
{ name: 'Frank', custId: 39030 }
71+
);
72+
const expected = of(null, null, null, null, 'Frank');
73+
74+
expect(
75+
await sequenceEqual(
76+
expected,
77+
map(source.pipe(through()), (x, i) => (i === 4 ? x.name : null))
78+
)
79+
).toBeTruthy();
80+
});
81+
82+
test('AsyncIterable#pipe writable-stream execution is deferred', async () => {
83+
let fnCalled = false;
84+
const source = of(() => {
85+
fnCalled = true;
86+
return 1;
87+
});
88+
89+
map(source.pipe(through()), x => x());
90+
91+
expect(fnCalled).toBeFalsy();
92+
});
93+
})();
Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { fromNodeStream } from '../Ix';
1+
import '../asynciterablehelpers';
22
import { Readable, ReadableOptions } from 'stream';
3+
import { fromNodeStream, AsyncIterable } from '../Ix';
34

45
(() => {
56
if (!fromNodeStream || process.env.TEST_NODE_STREAMS !== 'true') {
@@ -19,34 +20,25 @@ import { Readable, ReadableOptions } from 'stream';
1920
}
2021

2122
_read() {
22-
const i = ++this._index;
23-
if (i > this._max) {
24-
this.push(null);
25-
} else {
26-
const buf = Buffer.from(`${i}`, 'utf8');
27-
this.push(buf);
28-
}
23+
this.push(++this._index > this._max ? null : `${this._index}`);
2924
}
3025
}
3126

32-
test('AsyncIterable#fromNodeStream with readable', async () => {
33-
const c = new Counter({ objectMode: true });
34-
const xs = fromNodeStream(c);
27+
const compare = (a: string, b: string) => Buffer.from(a).compare(Buffer.from(b)) === 0;
3528

36-
const it = xs[Symbol.asyncIterator]();
37-
let next = await it.next();
38-
expect(next.done).toBeFalsy();
39-
expect((next.value as Buffer).compare(Buffer.from('1', 'utf8'))).toBe(0);
40-
41-
next = await it.next();
42-
expect(next.done).toBeFalsy();
43-
expect((next.value as Buffer).compare(Buffer.from('2', 'utf8'))).toBe(0);
44-
45-
next = await it.next();
46-
expect(next.done).toBeFalsy();
47-
expect((next.value as Buffer).compare(Buffer.from('3', 'utf8'))).toBe(0);
29+
describe(`AsyncIterable#fromNodeStream`, () => {
30+
test('objectMode: true', async () => {
31+
const c = new Counter({ objectMode: true });
32+
const xs = fromNodeStream(c) as AsyncIterable<string>;
33+
const expected = AsyncIterable.from(['1', '2', '3']);
34+
await expect(xs).toEqualStream(expected, compare);
35+
});
4836

49-
next = await it.next();
50-
expect(next.done).toBeTruthy();
37+
test('objectMode: false', async () => {
38+
const c = new Counter({ objectMode: false });
39+
const xs = fromNodeStream(c) as AsyncIterable<string>;
40+
const expected = AsyncIterable.from(['123']);
41+
await expect(xs).toEqualStream(expected, compare);
42+
});
5143
});
5244
})();

0 commit comments

Comments
 (0)