Skip to content

Commit

Permalink
Merge 27604d2 into 3bf9b10
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Nov 26, 2018
2 parents 3bf9b10 + 27604d2 commit c496c81
Show file tree
Hide file tree
Showing 11 changed files with 675 additions and 225 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ script:
jobs:
include:
- stage: after_success
script: yarn run coveralls
script:
- yarn install
- yarn run compile
- yarn run test-coverage
- yarn run send-coveralls
node_js: 9
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/8a89b90858734a6da07570eaf2e89849)](https://www.codacy.com/app/Borewit/then-read-stream?utm_source=github.com&utm_medium=referral&utm_content=Borewit/then-read-stream&utm_campaign=Badge_Grade)
[![Known Vulnerabilities](https://snyk.io/test/github/Borewit/then-read-stream/badge.svg?targetFile=package.json)](https://snyk.io/test/github/Borewit/then-read-stream?targetFile=package.json)

# then-read-stream

A promise based asynchronous stream reader, which makes reading from a stream easy.

Allows to read from a [Readable Stream](https://nodejs.org/api/stream.html#stream_readable_streams)
Expand Down
21 changes: 11 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
"url": "https://github.com/Borewit"
},
"scripts": {
"build": "yarn run compile",
"clean": "del-cli lib/** src/**/*.js src/**/*.js.map src/**/*.d.ts test/**/*.js test/**/*.js.map coverage .nyc_output",
"build": "npm run clean && npm run compile",
"compile-src": "tsc -p src",
"compile-test": "tsc -p test",
"compile": "yarn run compile-src && yarn run compile-test",
"prepare": "yarn run compile",
"compile": "npm run compile-src && yarn run compile-test",
"lint-ts": "tslint 'src/**/*.ts' --exclude 'src/**/*.d.ts' 'test/**/*.ts' --exclude 'test/**/*.d.ts'",
"lint-md": "remark -u preset-lint-recommended .",
"lint": "npm run lint-md && npm run lint-ts",
"test": "mocha --require ts-node/register test",
"cover-test": "nyc npm run test",
"coveralls": "yarn run cover-test && nyc report --reporter=text-lcov | coveralls",
"start": "yarn run compile && yarn run lint && npm run cover-test"
"test-coverage": "nyc npm run test",
"send-coveralls": "nyc report --reporter=text-lcov | coveralls",
"start": "npm run compile && npm run lint && npm run cover-test"
},
"engines": {
"node": ">=0.1.98"
Expand All @@ -28,23 +28,24 @@
"url": "https://github.com/Borewit/then-read-stream"
},
"license": "MIT",
"main": "./lib/index.js",
"main": "./lib/index",
"typings": "lib/index",
"bugs": {
"url": "https://github.com/Borewit/then-read-stream/issues"
},
"devDependencies": {
"@types/mocha": "^5.2.5",
"@types/node": "^10.12.0",
"@types/node": "^10.12.10",
"chai": "^4.2.0",
"coveralls": "^3.0.2",
"del-cli": "^1.1.0",
"mocha": "^5.2.0",
"nyc": "^13.1.0",
"remark-cli": "^6.0.0",
"remark-cli": "^6.0.1",
"remark-preset-lint-recommended": "^3.0.2",
"ts-node": "^7.0.1",
"tslint": "^5.11.0",
"typescript": "^3.1.3"
"typescript": "^3.1.6"
},
"keywords": [
"readable",
Expand Down
60 changes: 32 additions & 28 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as assert from "assert";
import * as stream from "stream";
import * as assert from 'assert';
import * as stream from 'stream';

interface IReadRequest {
buffer: Buffer | Uint8Array,
Expand All @@ -26,7 +26,7 @@ class Deferred<T> {
/**
* Error message
*/
export const endOfStream = "End-Of-Stream";
export const endOfStream = 'End-Of-Stream';

export class StreamReader {

Expand All @@ -45,15 +45,11 @@ export class StreamReader {

public constructor(private s: stream.Readable) {
if (!s.read || !s.once) {
throw new Error("Expected an instance of stream.Readable");
throw new Error('Expected an instance of stream.Readable');
}
this.s.once("end", () => {
this.endOfStream = true;
if (this.request) {
this.request.deferred.reject(new Error(endOfStream));
this.request = null;
}
});
this.s.once('end', () => this.reject(new Error(endOfStream)));
this.s.once('error', err => this.reject(err));
this.s.once('close', () => this.reject(new Error('Stream closed')));
}

/**
Expand All @@ -64,11 +60,10 @@ export class StreamReader {
* @param position Source offset
* @returns {any}
*/
public peek(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
return this.read(buffer, offset, length).then(bytesRead => {
this.peekQueue.unshift(buffer.slice(offset, bytesRead) as Buffer);
return bytesRead;
});
public async peek(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
const bytesRead = await this.read(buffer, offset, length);
this.peekQueue.unshift(buffer.slice(offset, bytesRead) as Buffer);
return bytesRead;
}

/**
Expand All @@ -78,9 +73,9 @@ export class StreamReader {
* @param length Number of bytes to read
* @returns {any}
*/
public read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
public async read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
if (length === 0) {
return Promise.resolve(0);
return 0;
}
if (this.peekQueue.length > 0) {
const peekData = this.peekQueue.shift();
Expand All @@ -89,16 +84,17 @@ export class StreamReader {
if (length < peekData.length) {
this.peekQueue.unshift(peekData.slice(length));
}
return Promise.resolve(length);
return length;
} else {
peekData.copy(buffer as Buffer, offset);
return this.read(buffer, offset + peekData.length, length - peekData.length).then(bytesRead => {
try {
const bytesRead = await this.read(buffer, offset + peekData.length, length - peekData.length);
return peekData.length + bytesRead;
}).catch(err => {
} catch (err) {
if (err.message === endOfStream) {
return peekData.length; // Return partial read
} else throw err;
});
}
}
} else {
return this._read(buffer, offset, length);
Expand All @@ -112,27 +108,27 @@ export class StreamReader {
* @param length Number of bytes to read
* @returns {any}
*/
private _read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {
private async _read(buffer: Buffer | Uint8Array, offset: number, length: number): Promise<number> {

assert.ok(!this.request, "Concurrent read operation?");
assert.ok(!this.request, 'Concurrent read operation?');

if (this.endOfStream) {
return Promise.reject(new Error(endOfStream));
throw new Error(endOfStream);
}

const readBuffer = this.s.read(length);

if (readBuffer) {
readBuffer.copy(buffer, offset);
return Promise.resolve<number>(readBuffer.length);
return readBuffer.length;
} else {
this.request = {
buffer,
offset,
length,
deferred: new Deferred<number>()
};
this.s.once("readable", () => {
this.s.once('readable', () => {
this.tryRead();
});
return this.request.deferred.promise.then(n => {
Expand All @@ -151,9 +147,17 @@ export class StreamReader {
readBuffer.copy(this.request.buffer, this.request.offset);
this.request.deferred.resolve(readBuffer.length);
} else {
this.s.once("readable", () => {
this.s.once('readable', () => {
this.tryRead();
});
}
}

private reject(err: Error) {
this.endOfStream = true;
if (this.request) {
this.request.deferred.reject(err);
this.request = null;
}
}
}
7 changes: 1 addition & 6 deletions src/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
{
"extends": "../tsconfig.json",
"compilerOptions": {
"sourceMap": false,
"inlineSources": false,
"module": "commonjs",
"moduleResolution": "node",
"declaration": true,
"target": "es6",
"outDir": "../lib"
}
}

Loading

0 comments on commit c496c81

Please sign in to comment.