Skip to content

Commit

Permalink
Added tests for serial streams
Browse files Browse the repository at this point in the history
  • Loading branch information
alexindigo committed Jun 14, 2016
1 parent dac5690 commit 14ee2f1
Show file tree
Hide file tree
Showing 9 changed files with 1,121 additions and 92 deletions.
3 changes: 3 additions & 0 deletions lib/readable_serial_ordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ var serialOrdered = require('../serialOrdered.js');

// API
module.exports = ReadableSerialOrdered;
// expose sort helpers
module.exports.ascending = serialOrdered.ascending;
module.exports.descending = serialOrdered.descending;

/**
* Streaming wrapper to `asynckit.serialOrdered`
Expand Down
2 changes: 1 addition & 1 deletion stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module.exports =
{
parallel : ReadableParallel,
serial : ReadableSerial,
serialOrdered : ReadableSerialOrdered
serialOrdered : ReadableSerialOrdered,
};

inherits(ReadableAsyncKit, Readable);
Expand Down
4 changes: 4 additions & 0 deletions test/lib/stream_assert.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ function Receiver(test, fixture)
{
test.fail('+ do not expect stream to have normal ending');
}
else
{
test.ok(true, '+ stream finished successfully.');
}
});

}
Expand Down
6 changes: 3 additions & 3 deletions test/test-serial-object.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ test('serial: handles sync object iterator asynchronously', function(t)

test('serial: object: longest finishes in order', function(t)
{
var source = { first: 1, one: 1, four: 4, sixteen: 16, sixtyFour: 64, thirtyTwo: 32, eight: 8, two: 2 }
, expected = [ 1, 1, 4, 16, 64, 32, 8, 2 ]
, target = []
var source = { first: 1, one: 1, four: 4, sixteen: 16, sixtyFour: 64, thirtyTwo: 32, eight: 8, two: 2 }
, expected = [ 1, 1, 4, 16, 64, 32, 8, 2 ]
, target = []
;

t.plan(3);
Expand Down
11 changes: 6 additions & 5 deletions test/test-stream-parallel-array.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ test('stream: parallel: iterates over array', function(t)
, stream
;

t.plan(expected.length * 2 + 3);
t.plan(expected.length * 2 + 4);

stream = asynckitStream.parallel(source, function(item, cb)
{
Expand Down Expand Up @@ -128,7 +128,7 @@ test('stream: parallel: array: handles unclean callbacks', function(t)
, stream
;

t.plan(expected.length * 2 + 2);
t.plan(expected.length * 2 + 3);

stream = asynckitStream.parallel(source, function(item, cb)
{
Expand Down Expand Up @@ -160,7 +160,7 @@ test('stream: parallel: array: destroyed cleanly', function(t)
, stream
;

t.plan(expected.length * 3 + 3);
t.plan(expected.length * 3 + 4);

// destroy stream before element 16 is processed
setTimeout(function()
Expand Down Expand Up @@ -201,7 +201,7 @@ test('stream: parallel: array: destroyed cleanly at start', function(t)
, stream
;

t.plan(2);
t.plan(3);

stream = asynckitStream.parallel(source, function(item, cb)
{
Expand Down Expand Up @@ -234,7 +234,7 @@ test('stream: parallel: array: destroyed after finish', function(t)
, stream
;

t.plan(expected.length * 2 + 3);
t.plan(expected.length * 2 + 4);

stream = asynckitStream.parallel(source, function(item, cb)
{
Expand All @@ -254,6 +254,7 @@ test('stream: parallel: array: destroyed after finish', function(t)
function(err, result)
{
stream.destroy();
stream.destroy(); // do it couple times to make sure

t.error(err, 'expect no errors');
t.deepEqual(result, salvaged, 'expect result to contain salvaged parts of the source array');
Expand Down
172 changes: 89 additions & 83 deletions test/test-stream-parallel-object.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ test('stream: parallel: iterates over object', function(t)
, stream
;

t.plan(keys.length * 3 + 3);
t.plan(keys.length * 3 + 4);

// supports full value, key, callback (shortcut) interface
stream = asynckitStream.parallel(source, function(item, key, cb)
Expand Down Expand Up @@ -45,7 +45,7 @@ test('stream: parallel: handles sync object iterator asynchronously', function(t
, stream
;

t.plan(keys.length * 3 + 3);
t.plan(keys.length * 3 + 4);

defer(function(){ isAsync = true; });

Expand Down Expand Up @@ -77,7 +77,7 @@ test('stream: parallel: object: longest finishes last', function(t)
, stream
;

t.plan(keys.length * 2 + 3);
t.plan(keys.length * 2 + 4);

// supports just value, callback (shortcut) interface
stream = asynckitStream.parallel(source, function(item, cb)
Expand Down Expand Up @@ -159,83 +159,6 @@ test('stream: parallel: object: terminates early', function(t)
streamAssert.failure(t, stream, {elements: salvaged, error: expError});
});

test('stream: parallel: object: handles non terminable iterations', function(t)
{
var source = { first: 1, one: 1, four: 4, sixteen: 16, sixtyFour: 64, thirtyTwo: 32, eight: 8, two: 2 }
, salvaged = { first: 1, one: 1, two: 2, four: 4, eight: 8}
, expected = [ 1, 1, 2, 4, 8 ]
, expError = {item: 16}
, target = []
, previous = 0
, stream
;

t.plan(expected.length * 2 + 3);

// supports just value, callback (shortcut) interface
stream = asynckitStream.parallel(source, function(item, cb)
{
var id = setTimeout(function()
{
if (item < 10)
{
// expect it to be invoked in order
t.ok(item >= previous, 'expect item (' + item + ') to be equal or greater than previous item (' + previous + ')');
previous = item;

target.push(item);
cb(null, item);
}
// return error on big numbers
else
{
cb({item: item});
}
}, 10 * item);

return (item % 2) ? null : clearTimeout.bind(null, id);
},
function(err)
{
t.equal(err.item, expError.item, 'expect to error out on 16');
t.deepEqual(target, expected, 'expect target to contain passed numbers');
});

streamAssert.failure(t, stream, {elements: salvaged, error: expError});
});

test('stream: parallel: object: handles unclean callbacks', function(t)
{
var source = { first: 1, second: 2, third: 3, fourth: 4, three: 3, two: 2, one: 1 }
, keys = Object.keys(source)
, expected = { first: 2, second: 4, third: 6, fourth: 8, three: 6, two: 4, one: 2 }
, stream
;

t.plan(keys.length * 3 + 2);

// supports full value, key, callback (shortcut) interface
stream = asynckitStream.parallel(source, function(item, key, cb)
{
setTimeout(function()
{
t.ok(keys.indexOf(key) != -1, 'expect key (' + key + ') to exist in the keys array');
t.equal(item, source[key], 'expect item (' + item + ') to match in same key (' + key + ') element in the source object');

cb(null, item * 2);
cb(null, item * -2);

}, 10 * item);
},
function(err, result)
{
t.error(err, 'expect no errors');
t.deepEqual(result, expected, 'expect result to be an multiplied by two object values');
});

streamAssert.success(t, stream, {elements: expected});
});

test('stream: parallel: object: destroyed cleanly', function(t)
{
var source = { first: 1, one: 1, four: 4, sixteen: 16, sixtyFour: 64, thirtyTwo: 32, eight: 8, two: 2 }
Expand All @@ -246,14 +169,20 @@ test('stream: parallel: object: destroyed cleanly', function(t)
, stream
;

t.plan(expected.length * 3 + 3);
t.plan(expected.length * 3 + 4);

// destroy stream before element 16 is processed
setTimeout(function()
{
stream.destroy();
}, 25 * limitNum);

// do it couple times to make sure
setTimeout(function()
{
stream.destroy();
}, 25 * (limitNum + 1));

stream = asynckitStream.parallel(source, function(item, key, cb)
{
var id = setTimeout(function()
Expand Down Expand Up @@ -287,7 +216,7 @@ test('stream: parallel: object: destroyed cleanly at start', function(t)
, stream
;

t.plan(2);
t.plan(3);

stream = asynckitStream.parallel(source, function(item, cb)
{
Expand Down Expand Up @@ -320,7 +249,7 @@ test('stream: parallel: object: destroyed after finish', function(t)
, stream
;

t.plan(expected.length * 2 + 3);
t.plan(expected.length * 2 + 4);

stream = asynckitStream.parallel(source, function(item, cb)
{
Expand Down Expand Up @@ -348,3 +277,80 @@ test('stream: parallel: object: destroyed after finish', function(t)

streamAssert.success(t, stream, {elements: salvaged});
});

test('stream: parallel: object: handles non terminable iterations', function(t)
{
var source = { first: 1, one: 1, four: 4, sixteen: 16, sixtyFour: 64, thirtyTwo: 32, eight: 8, two: 2 }
, salvaged = { first: 1, one: 1, two: 2, four: 4, eight: 8}
, expected = [ 1, 1, 2, 4, 8 ]
, expError = {item: 16}
, target = []
, previous = 0
, stream
;

t.plan(expected.length * 2 + 3);

// supports just value, callback (shortcut) interface
stream = asynckitStream.parallel(source, function(item, cb)
{
var id = setTimeout(function()
{
if (item < 10)
{
// expect it to be invoked in order
t.ok(item >= previous, 'expect item (' + item + ') to be equal or greater than previous item (' + previous + ')');
previous = item;

target.push(item);
cb(null, item);
}
// return error on big numbers
else
{
cb({item: item});
}
}, 10 * item);

return (item % 2) ? null : clearTimeout.bind(null, id);
},
function(err)
{
t.equal(err.item, expError.item, 'expect to error out on 16');
t.deepEqual(target, expected, 'expect target to contain passed numbers');
});

streamAssert.failure(t, stream, {elements: salvaged, error: expError});
});

test('stream: parallel: object: handles unclean callbacks', function(t)
{
var source = { first: 1, second: 2, third: 3, fourth: 4, three: 3, two: 2, one: 1 }
, keys = Object.keys(source)
, expected = { first: 2, second: 4, third: 6, fourth: 8, three: 6, two: 4, one: 2 }
, stream
;

t.plan(keys.length * 3 + 3);

// supports full value, key, callback (shortcut) interface
stream = asynckitStream.parallel(source, function(item, key, cb)
{
setTimeout(function()
{
t.ok(keys.indexOf(key) != -1, 'expect key (' + key + ') to exist in the keys array');
t.equal(item, source[key], 'expect item (' + item + ') to match in same key (' + key + ') element in the source object');

cb(null, item * 2);
cb(null, item * -2);

}, 10 * item);
},
function(err, result)
{
t.error(err, 'expect no errors');
t.deepEqual(result, expected, 'expect result to be an multiplied by two object values');
});

streamAssert.success(t, stream, {elements: expected});
});
Loading

0 comments on commit 14ee2f1

Please sign in to comment.