Skip to content

Commit

Permalink
fix(tests): added tests for erroneous whatwg streams
Browse files Browse the repository at this point in the history
  • Loading branch information
grantila committed Nov 23, 2020
1 parent c7a0e64 commit 7781e79
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 31 deletions.
162 changes: 131 additions & 31 deletions lib/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,43 @@ import "web-streams-polyfill/es2018"
import * as through2 from 'through2'

import streamHead from './'
import { Buffer } from "buffer";


function makeReadableStream( arr: Array< any > ): ReadableStream
{
let index = 0;
return new ReadableStream( {
start( ) { },
pull( controller )
{
if ( index >= arr.length )
controller.close( );
else
{
const chunk = arr[ index++ ];
if ( chunk instanceof Error )
controller.error( chunk );
else
controller.enqueue( chunk );
}
}
} );
}

async function consumeReadableStream( stream: ReadableStream )
: Promise< Array< any > >
{
const reader = stream.getReader( );
const chunks: Array< any > = [ ];
let chunk: ReadableStreamReadResult<any>;
while ( chunk = await reader.read( ) )
{
if ( chunk.done )
return chunks;
chunks.push( chunk.value );
}
}

describe( 'Node.js streams', ( ) =>
{
Expand Down Expand Up @@ -111,34 +148,6 @@ describe( 'Node.js streams', ( ) =>

describe( 'Whatwg streams', ( ) =>
{
function makeReadableStream( arr: Array< any > ): ReadableStream
{
let index = 0;
return new ReadableStream( {
start( ) { },
pull( controller )
{
if ( index >= arr.length )
controller.close( );
else
controller.enqueue( arr[ index++ ] );
}
} );
}
async function consumeReadableStream( stream: ReadableStream )
: Promise< Array< any > >
{
const reader = stream.getReader( );
const chunks: Array< any > = [ ];
let chunk: ReadableStreamReadResult<any>;
while ( chunk = await reader.read( ) )
{
if ( chunk.done )
return chunks;
chunks.push( chunk.value );
}
}

it( 'should be possible to have a zero bytes in an empty stream',
async ( ) =>
{
Expand Down Expand Up @@ -266,19 +275,110 @@ describe( 'Whatwg streams', ( ) =>
.toStrictEqual( Uint8ClampedArray.from( [ 123 ] ) );
} );

it.skip( 'should return rejected promise on erroneous streams',
it( 'should return rejected promise on stream errors before head',
async ( ) =>
{
const notCalled = jest.fn( );
const called = jest.fn( );

const inStream = < NodeJS.ReadableStream >< any >"foobar";
const err = new Error( "efter-head" );

const sh = streamHead( inStream, { bytes: 2 } );
const inStream = makeReadableStream( [
Buffer.from( "foo" ),
err,
Buffer.from( "bar" ),
Buffer.from( "baz" )
] );

const sh = streamHead( inStream, { bytes: 5 } );

await sh.then( notCalled, called );

expect( notCalled ).toHaveBeenCalledTimes( 0 );
expect( called ).toHaveBeenCalledTimes( 1 );
expect( called ).toHaveBeenCalledWith( err );
} );

it( 'should return resolved promise on stream errors after head',
async ( ) =>
{
const notCalled = jest.fn( );
const called = jest.fn( );

const err = new Error( "efter-head" );

const inStream = makeReadableStream( [
Buffer.from( "foo" ),
Buffer.from( "bar" ),
err,
Buffer.from( "baz" )
] );

const { head, stream } = await streamHead( inStream, { bytes: 5 } );

expect( head ).toEqual( new Uint8Array( Buffer.from( "foobar" ) ) );

const reader = stream.getReader( );
expect( await reader.read( ) )
.toStrictEqual( { done: false, value: Buffer.from( "foo" ) } );
expect( await reader.read( ) )
.toStrictEqual( { done: false, value: Buffer.from( "bar" ) } );

await reader.read( ).then( notCalled, called );
expect( notCalled ).toHaveBeenCalledTimes( 0 );
expect( called ).toHaveBeenCalledTimes( 1 );
expect( called ).toHaveBeenCalledWith( err );
} );

it( 'should fail on non-buffer chunks before head',
async ( ) =>
{
const notCalled = jest.fn( );
const called = jest.fn( );

const inStream = makeReadableStream( [
Buffer.from( "foo" ),
47,
Buffer.from( "bar" ),
Buffer.from( "baz" )
] );

const sh = streamHead( inStream, { bytes: 5 } );

await sh.then( notCalled, called );

expect( notCalled ).toHaveBeenCalledTimes( 0 );
expect( called ).toHaveBeenCalledTimes( 1 );
expect( called.mock.calls[ 0 ][ 0 ] ).toBeInstanceOf( Error );
expect( called.mock.calls[ 0 ][ 0 ].message )
.toMatch( /Expecting chunk to be a typed array/ )
} );

it( 'should allow non-buffer chunks after head',
async ( ) =>
{
const notCalled = jest.fn( );
const called = jest.fn( );

const inStream = makeReadableStream( [
Buffer.from( "foo" ),
Buffer.from( "bar" ),
47,
Buffer.from( "baz" )
] );

const { head, stream } = await streamHead( inStream, { bytes: 5 } );

expect( head ).toEqual( new Uint8Array( Buffer.from( "foobar" ) ) );

const reader = stream.getReader( );
expect( await reader.read( ) )
.toStrictEqual( { done: false, value: Buffer.from( "foo" ) } );
expect( await reader.read( ) )
.toStrictEqual( { done: false, value: Buffer.from( "bar" ) } );
expect( await reader.read( ) )
.toStrictEqual( { done: false, value: 47 } );
expect( await reader.read( ) )
.toStrictEqual( { done: false, value: Buffer.from( "baz" ) } );
} );
} );
1 change: 1 addition & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ function streamHeadWhatwg( readable: ReadableStream, bytes: number )

function complete( )
{
/* istanbul ignore next */
if ( isComplete )
return null;
isComplete = true;
Expand Down

0 comments on commit 7781e79

Please sign in to comment.