Skip to content

Commit

Permalink
Add declarations for all streams
Browse files Browse the repository at this point in the history
  • Loading branch information
chbrown committed Nov 12, 2015
1 parent e4f13ab commit f4d12e3
Show file tree
Hide file tree
Showing 14 changed files with 298 additions and 0 deletions.
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tests/
Makefile
shims.d.ts
batcher.ts
Expand Down
16 changes: 16 additions & 0 deletions batcher.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Transform } from 'stream';
/**
Batcher transforms a stream of <T> into a stream of Array<T>, where each array
has at most `size` elements (the last chunk may have fewer, but more than 0).
*/
export declare class Batcher<T> extends Transform {
protected batchSize: number;
protected batchBuffer: any[];
constructor(batchSize: number);
/**
checkFlush is called by both _transform and _flush, with different `end` values.
*/
protected checkFlush(end: boolean, callback: (error?: Error) => void): void;
_transform(chunk: T, encoding: string, callback: TransformCallback): void;
_flush(callback: FlushCallback): void;
}
22 changes: 22 additions & 0 deletions eventsource.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Transform, TransformOptions } from 'stream';
/**
EventSource is a subclass of stream.Transform for converting a stream of objects
into `text/event-stream` style output.
You should call `res.setHeader('Content-Type', 'text/event-stream');` on your
HTTP response `res` before piping this EventSource stream into the response.
And for this use-case, you'll want to listen for onerror on the client-side,
and close() the EventSource object on the first error, otherwise it will try
to reconnect immediately when the response stream closes. As far as I know,
there is no special event you can send to trigger a complete stop.
Documentation: http://www.w3.org/TR/eventsource/
MDN is usually great, but their article on EventSource sucks:
https://developer.mozilla.org/en-US/docs/Web/API/EventSource
*/
export declare class EventSource extends Transform {
constructor(options?: TransformOptions);
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
}
6 changes: 6 additions & 0 deletions filter.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Transform } from 'stream';
export declare class Filter<T> extends Transform {
protected testFn: (chunk: T) => boolean;
constructor(testFn: (chunk: T) => boolean);
_transform(chunk: T, encoding: string, callback: TransformCallback): void;
}
12 changes: 12 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/** Read a stream to the end, storing all chunks in an array.
For example, to read all STDIN:
streaming.readToEnd(process.stdin, function(err, chunks) {
if (err) throw err;
var input = chunks.join('');
console.log('Got input of length: ' + input.length);
});
*/
export declare function readToEnd(stream: NodeJS.ReadableStream, callback: (error: Error, chunks?: any[]) => void): NodeJS.EventEmitter;
50 changes: 50 additions & 0 deletions json.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Transform } from 'stream';
/** streaming.json.ArrayStringifier stringifies all written objects into a
single proper JSON array, surrounded by [ and ] delimiters, and separated by commas.
* `replacer` Function If a function, transforms values and properties
encountered while stringifying; if an array, specifies the set of
properties included in objects in the final string. Details on
[MDN](https://developer.mozilla.org/En/Using_native_JSON#The_replacer_parameter).
* `space` Number | String Causes the resulting string to be pretty-printed
(by some number of spaces or literal space).
*/
export declare class ArrayStringifier extends Transform {
protected replacer: any;
protected space: string | number;
protected _seen_first_item: boolean;
constructor(replacer?: any, space?: string | number);
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
_flush(callback: any): void;
}
/** streaming.json.Stringifer expects objects and outputs strings / buffers
_writableState.objectMode = true
_readableState.objectMode = false
* `replacer` Function If a function, transforms values and properties
encountered while stringifying; if an array, specifies the set of
properties included in objects in the final string. Details on
[MDN](https://developer.mozilla.org/En/Using_native_JSON#The_replacer_parameter).
* `space` Number | String Causes the resulting string to be pretty-printed
(by some number of spaces or literal space).
*/
export declare class Stringifier extends Transform {
protected replacer: any;
protected space: string | number;
constructor(replacer?: any, space?: string | number);
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
}
/** streaming.json.Parser expects Buffer input with universal newlines
dividing JSON objects.
*/
export declare class Parser extends Transform {
protected replacer: any;
protected space: string | number;
protected _buffer: Buffer;
constructor(replacer?: any, space?: string | number);
_line(buffer: any): void;
_process_buffer(eof: any): void;
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
_flush(callback: FlushCallback): void;
}
12 changes: 12 additions & 0 deletions mapper.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Transform } from 'stream';
/**
Mapper transforms a stream of <T> into a stream of <R>, using a synchronous
transform function.
Use Transformer() instead if you need an asynchronous callback function.
*/
export declare class Mapper<T, R> extends Transform {
protected transformFn: (chunk: T) => R;
constructor(transformFn: (chunk: T) => R);
_transform(chunk: T, encoding: string, callback: TransformCallback): void;
}
13 changes: 13 additions & 0 deletions property.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Transform } from 'stream';
export declare class Picker extends Transform {
fields: string[];
constructor(fields: string[]);
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
}
export declare class Omitter extends Transform {
fieldsMap: {
[index: string]: number;
};
constructor(fields: string[]);
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
}
48 changes: 48 additions & 0 deletions queue.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Duplex, DuplexOptions } from 'stream';
/** new streaming.Queue(concurrency: number,
transform: TransformCallback,
options: any)
Queue is much like streaming.Transfomer(transform, options), but it applies the
`transform` function to each piece of data, at most `concurrency` at a time.
Except! the `transform` function does not have access to the stream as `this`,
and so it cannot `this.push(...)` to handle data; it must use the `callback`
function to return the result (or error if an error arose).
Order of the output is not guaranteed, but it shouldn't get mixed up more than
`concurrency` places different.
We use stream.Duplex rather than stream.Transform because output is not
precisely "causally connected" to the input -- there are side-effects (the
duration of the transform function) that complicate the mapping.
Example:
new streaming.Queue(10, function(chunk, encoding, callback) {
setTimeout(function() {
callback(null, {result: 'chunk length is ' + chunk.length + '.'});
}, Math.random() * 500);
}, {objectMode: true});
*/
export declare class Queue extends Duplex {
protected concurrency: number;
protected transformFn: TransformCall<any>;
protected _in_progress: number;
constructor(concurrency: number, transformFn: TransformCall<any>, options?: DuplexOptions);
/** _read is called when the user wants data from this stream.
From the [stream docs](http://nodejs.org/api/stream.html#stream_readable_read_size_1):
> When data is available, put it into the read queue by calling
> readable.push(chunk). If push returns false, then you should stop reading.
> When _read is called again, you should start pushing more data.
Since are mostly pulling, rather than pushing, this is a no-op.
We might conceivably use it to determine if are free to stop processing
incoming tasks; i.e., if no one wants them, we don't need to read them.
*/
_read(size: any): void;
_write(chunk: any, encoding: any, callback: any): void;
}
11 changes: 11 additions & 0 deletions sink.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Transform, TransformOptions } from 'stream';
/**
Similar to piping to /dev/null at the command line.
This stream can be piped into, but it will never output anything. However, it
will end when the source stream ends.
*/
export declare class Sink<T> extends Transform {
constructor(options?: TransformOptions);
_transform(chunk: T, encoding: string, callback: TransformCallback): void;
}
40 changes: 40 additions & 0 deletions splitter.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Transform, TransformOptions } from 'stream';
export interface SplitterOptions extends TransformOptions {
split?: string;
}
/**
A splitter stream rechunks a stream at every `split` byte, if `split` is defined.
If `split` is not defined, it will split at the universal newline (\r, \r\n, or \n).
_writableState.decodeStrings = true
_writableState.objectMode = false
_readableState.objectMode = true
Node.js 'stream' API calls _transform and _flush:
_transform calls _advance:
_advance calls _chunk (maybe multiple times)
_chunk calls push()
_flush calls _chunk, either once or not at all
_chunk calls push()
*/
export declare class Splitter extends Transform {
protected _buffer: Buffer;
protected _encoding: any;
constructor(options?: SplitterOptions);
/** calling this will call toString on all emitted chunks, instead of
returning buffers. */
setEncoding(encoding: any): this;
/** _chunk handles what we do to each split part */
_chunk(buffer: any): void;
/** _advance handles how we decide where the split points are */
_advance(buffer: any): any;
/**
`encoding` describes the type of `chunk` -- if the decodeStrings option is
true, this will be useful; otherwise, `chunk` will be just a buffer, or if
objectMode is true, it'll be an arbirary object, and `encoding` will just be
'buffer'.
*/
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
_flush(callback: any): void;
}
15 changes: 15 additions & 0 deletions timeout.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Transform, TransformOptions } from 'stream';
/**
Timeout is mostly a PassThrough (identity) stream, but will throw an error if
a period of `timeoutMilliseconds` elapses without seeing any new data.
*/
export declare class Timeout extends Transform {
/** The timestamp epoch in milliseconds when the most recent chunk was received. */
lastChunkReceived: number;
/** How often to check. */
timeoutMilliseconds: number;
constructor(timeoutSeconds: number, options?: TransformOptions);
_check(): void;
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
_flush(callback: any): void;
}
22 changes: 22 additions & 0 deletions transformer.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Transform, TransformOptions } from 'stream';
/** Like Mapper, but more bare-bones. The predicate function has to call a
given callback, which means the processing code can be async.
`this` is bound to the stream object inside the transform function, so you
can use `this.push(...)` to output multiple data per input datum.
Example:
new streaming.Transformer(function(chunk, encoding, callback) {
var self = this;
setTimeout(function() {
self.push('...');
self.push(chunk);
callback();
}, 1000);
}, {objectMode: true});
*/
export declare class Transformer<T> extends Transform {
constructor(transformFn: TransformCall<T>, options?: TransformOptions);
}
30 changes: 30 additions & 0 deletions vm.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Script } from 'vm';
import { Transform } from 'stream';
/**
code: string
Two important facts about this code:
1. It should process a global variable, `$in`, which represents the
current object in the stream.
2. It should set the global variable `$out`, which represents the
object that will be returned and sent downstream.
context?: any = {}
Results and side effects are tracked in this global context object.
filename?: string = 'streaming.vm'
Used in stack traces.
*/
export declare class VM<T> extends Transform {
context: any;
filename: string;
protected script: Script;
constructor(code: string, context?: any, filename?: string);
/**
each chunk should be a discrete object
encoding should be null
*/
_transform(chunk: any, encoding: string, callback: TransformCallback): void;
/** Run a bit of code once using the streaming.VM's global context.
code: string
*/
run(code: string): void;
}

0 comments on commit f4d12e3

Please sign in to comment.