Skip to content

Commit

Permalink
Test exception condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Nov 26, 2018
1 parent 3cd7526 commit a95d312
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 180 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ script:
jobs:
include:
- stage: after_success
script: yarn run coveralls
script:
- yarn install
- yarn run compile
- yarn run test-coverage
- yarn run send-coveralls
node_js: 9
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/8a89b90858734a6da07570eaf2e89849)](https://www.codacy.com/app/Borewit/then-read-stream?utm_source=github.com&utm_medium=referral&utm_content=Borewit/then-read-stream&utm_campaign=Badge_Grade)
[![Known Vulnerabilities](https://snyk.io/test/github/Borewit/then-read-stream/badge.svg?targetFile=package.json)](https://snyk.io/test/github/Borewit/then-read-stream?targetFile=package.json)

# then-read-stream

A promise based asynchronous stream reader, which makes reading from a stream easy.

Allows to read from a [Readable Stream](https://nodejs.org/api/stream.html#stream_readable_streams)
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"url": "https://github.com/Borewit"
},
"scripts": {
"clean": "del-cli lib/** src/**/*.js src/**/*.js.map src/**/*.d.ts test/**/*.js test/**/*.js.map",
"clean": "del-cli lib/** src/**/*.js src/**/*.js.map src/**/*.d.ts test/**/*.js test/**/*.js.map coverage .nyc_output",
"build": "npm run clean && npm run compile",
"compile-src": "tsc -p src",
"compile-test": "tsc -p test",
Expand All @@ -16,8 +16,8 @@
"lint-md": "remark -u preset-lint-recommended .",
"lint": "npm run lint-md && npm run lint-ts",
"test": "mocha --require ts-node/register test",
"cover-test": "nyc npm run test",
"coveralls": "npm run cover-test && nyc report --reporter=text-lcov | coveralls",
"test-coverage": "nyc npm run test",
"send-coveralls": "nyc report --reporter=text-lcov | coveralls",
"start": "npm run compile && npm run lint && npm run cover-test"
},
"engines": {
Expand All @@ -28,7 +28,7 @@
"url": "https://github.com/Borewit/then-read-stream"
},
"license": "MIT",
"main": "./lib/index.js",
"main": "./lib/index",
"typings": "lib/index",
"bugs": {
"url": "https://github.com/Borewit/then-read-stream/issues"
Expand Down
56 changes: 34 additions & 22 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as assert from "assert";
import * as stream from "stream";
import * as assert from 'assert';
import * as stream from 'stream';

interface IReadRequest {
buffer: Buffer | Uint8Array,
Expand All @@ -26,7 +26,7 @@ class Deferred<T> {
/**
* Error message
*/
export const endOfStream = "End-Of-Stream";
export const endOfStream = 'End-Of-Stream';

export class StreamReader {

Expand All @@ -45,15 +45,27 @@ export class StreamReader {

public constructor(private s: stream.Readable) {
if (!s.read || !s.once) {
throw new Error("Expected an instance of stream.Readable");
throw new Error('Expected an instance of stream.Readable');
}
this.s.once("end", () => {
this.s.once('end', () => {
this.endOfStream = true;
if (this.request) {
this.request.deferred.reject(new Error(endOfStream));
this.request = null;
}
});
this.s.once('error', err => {
this.endOfStream = true;
if (this.request) {
this.request.deferred.reject(err);
}
});
this.s.once('close', err => {
this.endOfStream = true;
if (this.request) {
this.request.deferred.reject(new Error('Stream closed'));
}
});
}

/**
Expand All @@ -64,11 +76,10 @@ export class StreamReader {
* @param position Source offset
* @returns {any}
*/
public peek(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
return this.read(buffer, offset, length).then(bytesRead => {
this.peekQueue.unshift(buffer.slice(offset, bytesRead) as Buffer);
return bytesRead;
});
public async peek(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
const bytesRead = await this.read(buffer, offset, length);
this.peekQueue.unshift(buffer.slice(offset, bytesRead) as Buffer);
return bytesRead;
}

/**
Expand All @@ -78,9 +89,9 @@ export class StreamReader {
* @param length Number of bytes to read
* @returns {any}
*/
public read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
public async read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
if (length === 0) {
return Promise.resolve(0);
return 0;
}
if (this.peekQueue.length > 0) {
const peekData = this.peekQueue.shift();
Expand All @@ -89,16 +100,17 @@ export class StreamReader {
if (length < peekData.length) {
this.peekQueue.unshift(peekData.slice(length));
}
return Promise.resolve(length);
return length;
} else {
peekData.copy(buffer as Buffer, offset);
return this.read(buffer, offset + peekData.length, length - peekData.length).then(bytesRead => {
try {
const bytesRead = await this.read(buffer, offset + peekData.length, length - peekData.length);
return peekData.length + bytesRead;
}).catch(err => {
} catch (err) {
if (err.message === endOfStream) {
return peekData.length; // Return partial read
} else throw err;
});
}
}
} else {
return this._read(buffer, offset, length);
Expand All @@ -112,27 +124,27 @@ export class StreamReader {
* @param length Number of bytes to read
* @returns {any}
*/
private _read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
private async _read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {

assert.ok(!this.request, "Concurrent read operation?");
assert.ok(!this.request, 'Concurrent read operation?');

if (this.endOfStream) {
return Promise.reject(new Error(endOfStream));
throw new Error(endOfStream);
}

const readBuffer = this.s.read(length);

if (readBuffer) {
readBuffer.copy(buffer, offset);
return Promise.resolve<number>(readBuffer.length);
return readBuffer.length;
} else {
this.request = {
buffer,
offset,
length,
deferred: new Deferred<number>()
};
this.s.once("readable", () => {
this.s.once('readable', () => {
this.tryRead();
});
return this.request.deferred.promise.then(n => {
Expand All @@ -151,7 +163,7 @@ export class StreamReader {
readBuffer.copy(this.request.buffer, this.request.offset);
this.request.deferred.resolve(readBuffer.length);
} else {
this.s.once("readable", () => {
this.s.once('readable', () => {
this.tryRead();
});
}
Expand Down
Loading

0 comments on commit a95d312

Please sign in to comment.