Skip to content

Commit

Permalink
Implement new StreamHandler with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hershal committed Jun 30, 2017
1 parent 09a6013 commit 1d1b3de
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 39 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"nodemon": "^1.11.0",
"nyc": "^10.0.0",
"rimraf": "^2.0.0",
"stdio-mock": "^1.1.0",
"ts-node": "^3.1.0",
"tslint": "^5.0.0",
"typescript": "^2.4.1"
Expand Down
62 changes: 23 additions & 39 deletions src/StreamHandler.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,38 @@
import Debug from './Debug';
import { IStandardInputStreamHandlerDelegate } from './Interfaces';
import { IStreamHandlerDelegate } from './Interfaces';
import { StreamSerializer } from './StreamSerializer';


class StandardInputStreamHandler {
public delegate: IStandardInputStreamHandlerDelegate;
export class StandardInputStreamHandler {
public delegate: IStreamHandlerDelegate;

private debug = Debug('InputParserReadFilesOptionalStandardInput');
private buffer: string[] = [];

public once(): Promise<string[]> {
const serializer = new StreamSerializer();
return new Promise((resolve, reject) => {
const callback = (serialized: string): string => {
this.debug(`callback called on string of length ${serialized.length}.`);
if (!this.delegate) {
this.debug(`StandardInputStreamHandlerDelegate not specified; ` +
`saving serialized data into internal buffer.`);
this.buffer.push(serialized);
} else {
/* flush the buffer */
this.buffer.forEach((l) => this.delegate.streamDidReceiveChunk(l));

/* flush out the newly serialized data */
this.delegate.streamDidReceiveChunk(serialized);

/* clear out the buffer */
this.buffer = [];
}

return serialized;
};
public once(stream: any): Promise<string[]> {
const chunkString = this.delegate.stdinStreamSerializeCharacter();
const serializer = new StreamSerializer(chunkString);

process.stdin.on('data', (data: Buffer) => {
this.debug(`Stdin sent chunk of size ${data.length}.`);
serializer.serialize(data, (serialized: string) => callback(serialized));
return new Promise((resolve, reject) => {
stream.on('data', (data: Buffer) => {
this.debug(`Stream sent chunk of size ${data.length}.`);
serializer.serialize(data,
(serialized: string) => this.serializationCallback(serialized));
});
this.debug('Installed stream data handler.');

process.stdin.on('end', () => {
this.debug(`Stdin ended.`);
serializer.flush((data: string) => callback(data));

if (this.delegate) {
this.delegate.streamDidEnd([]);
}

resolve(this.buffer);
this.buffer = [];
stream.on('end', () => {
serializer.flush((data: string) => this.serializationCallback(data));
this.delegate.streamDidEnd();
this.debug(`Stream ended. Resolving once() Promise.`);
resolve();
});
this.debug('Installed stream end handler.');
});
}

private serializationCallback(serialized: string): string {
this.debug(`callback called on string of length ${serialized.length}.`);
this.delegate.streamDidReceiveChunk(serialized);
return serialized;
}
}
63 changes: 63 additions & 0 deletions test/StreamHandlerSpec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { StandardInputStreamHandler } from '../src/StreamHandler'
import { IStreamHandlerDelegate } from '../src/Interfaces'

import * as chai from "chai";
const expect = chai.expect;

import { stdio } from 'stdio-mock';

class BasicStreamHandlerDelegate implements IStreamHandlerDelegate {
public streamSerializationString: string;
public buffer: string[];

constructor() {
this.buffer = [];
}

/* Delegate methods */
public stdinStreamSerializeCharacter(): string {
return this.streamSerializationString;
}

public streamDidReceiveChunk(chunk: string) {
this.buffer.push(chunk);
}

public streamDidEnd() { }
}

describe('Stream Handler', function () {
describe('StandardInputStreamHandler', function () {
let handler: StandardInputStreamHandler;
let handlerDelegate: BasicStreamHandlerDelegate;
let stdin: any;

beforeEach(function () {
handler = new StandardInputStreamHandler();
handlerDelegate = new BasicStreamHandlerDelegate();
handler.delegate = handlerDelegate;
handler.delegate
stdin = stdio().stdin;
});

it('Serializes a simple hello world', function (done) {
handlerDelegate.streamSerializationString = '\n';
handler.once(stdin).then(() => {
expect(handlerDelegate.buffer).to.deep.equal(['hello world']);
done();
});
stdin.write('hello world');
stdin.end();
});

it('Serializes a simple hello world with a space chunk string', function (done) {
handlerDelegate.streamSerializationString = ' ';
handler.once(stdin).then(() => {
expect(handlerDelegate.buffer).to.deep.equal(['hello', 'world']);
done();
});
stdin.write('hello world');
stdin.end();
});
});
});

0 comments on commit 1d1b3de

Please sign in to comment.