Skip to content

Commit

Permalink
Merge 2136ead into 728fc0a
Browse files Browse the repository at this point in the history
  • Loading branch information
crccheck committed Feb 20, 2017
2 parents 728fc0a + 2136ead commit ec0c6d2
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 10 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,21 @@ Reader readable stream too!

### Options

* `interval` (default: `2000`) Milliseconds between each Kinesis read. Remember limit is 5 reads / second / shard
* Any [getShardIterator] param
* `interval: number` (default: `2000`) Milliseconds between each Kinesis read. Remember limit is 5 reads / second / shard
* `parser: Function` If this is set, this function is applied to the data. Example:

const client = AWS.Kinesis()
const reader = new KinesisStreamReader(client, streamName, {parser: JSON.parse})
reader.on('data', console.log(data.id))

* And any [getShardIterator] parameter

### Custom events

These are the WIP events you can attach to the reader:

* `checkpoint` Inspired by [kinesis-readable], this fires when data is received so you can keep track of the last successful sequence read

reader.on('checkpoint', (sequenceNumber: string) => {})


Expand Down
14 changes: 8 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ function getShardIterator (client, streamName, shardId, options) {

class KinesisStreamReader extends Readable {
constructor (client, streamName, options) {
// Use objectMode since we get whole objects at a time? Maybe have an
// `options.json` that sets object mode and automagically does JSON.parse
// or `options.parse` that can be set to `JSON.parse`
super({})
super({
objectMode: !!options.parser, // Should this always be true?
})
this.client = client
this.streamName = streamName
this.options = Object.assign({interval: 2000}, options)
this.options = Object.assign({
interval: 2000,
parser: (x) => x,
}, options)
this._started = false // TODO this is probably built into Streams
this.iterators = new Set()
}
Expand Down Expand Up @@ -85,7 +87,7 @@ class KinesisStreamReader extends Readable {
if (data.MillisBehindLatest > 60 * 1000) {
debug('warning: behind by %d milliseconds', data.MillisBehindLatest)
}
data.Records.forEach((x) => this.push(x.Data))
data.Records.forEach((x) => this.push(this.options.parser(x.Data)))
if (data.Records.length) {
this.emit('checkpoint', data.Records[data.Records.length - 1].SequenceNumber)
}
Expand Down
47 changes: 45 additions & 2 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ describe('main', () => {
})

it('continues to read open shard', () => {
const clock = sinon.useFakeTimers()
const clock = sandbox.useFakeTimers()
const getNextIterator = sinon.stub()
const record = {
Data: '',
Expand All @@ -215,7 +215,50 @@ describe('main', () => {
assert.strictEqual(getNextIterator.callCount, 1)
clock.tick(10000) // A number bigger than the idle time
assert.strictEqual(getNextIterator.callCount, 2)
clock.restore()
})

it('parses incoming records', () => {
const record = {
Data: '{"foo":"bar"}',
SequenceNumber: 'seq-1',
}
const getNextIterator = sinon.stub().returns(undefined)
client.getRecords = (params, cb) =>
cb(undefined, {Records: [record], NextShardIterator: getNextIterator()})
const reader = new main.KinesisStreamReader(client, 'stream name', {
parser: JSON.parse,
})

reader.readShard('shard-iterator-5')

assert.ok(reader._readableState.objectMode)
assert.equal(reader._readableState.buffer.length, 1)
if (reader._readableState.buffer.head) {
assert.deepEqual(reader._readableState.buffer.head.data, {foo: 'bar'})
} else {
// NODE4
assert.deepEqual(reader._readableState.buffer[0], {foo: 'bar'})
}
})

it('parser exceptions are passed through', () => {
const record = {
Data: '{"foo":"bar"}',
SequenceNumber: 'seq-1',
}
const getNextIterator = sinon.stub().returns(undefined)
client.getRecords = (params, cb) =>
cb(undefined, {Records: [record], NextShardIterator: getNextIterator()})
const reader = new main.KinesisStreamReader(client, 'stream name', {
parser: () => { throw new Error('lolwut') },
})

try {
reader.readShard('shard-iterator-6')
assert(false, 'reader should have thrown')
} catch (err) {
assert.equal(err.message, 'lolwut')
}
})
})
})
Expand Down

0 comments on commit ec0c6d2

Please sign in to comment.