Skip to content

Commit

Permalink
Merge e781829 into 728fc0a
Browse files Browse the repository at this point in the history
  • Loading branch information
crccheck committed Feb 20, 2017
2 parents 728fc0a + e781829 commit 4a0f3c5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ Reader readable stream too!

### Options

* `interval` (default: `2000`) Milliseconds between each Kinesis read. Remember limit is 5 reads / second / shard
* Any [getShardIterator] param
* `interval: number` (default: `2000`) Milliseconds between each Kinesis read. Remember limit is 5 reads / second / shard
* `parser: Function` If this is set, this function is applied to the data. Example:
const client = AWS.Kinesis()
const reader = new KinesisStreamReader(client, streamName, {parser: JSON.parse})
reader.on('data', console.log(data.id))
* And any [getShardIterator] parameter

### Custom events

Expand Down
14 changes: 8 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ function getShardIterator (client, streamName, shardId, options) {

class KinesisStreamReader extends Readable {
constructor (client, streamName, options) {
// Use objectMode since we get whole objects at a time? Maybe have an
// `options.json` that sets object mode and automagically does JSON.parse
// or `options.parse` that can be set to `JSON.parse`
super({})
super({
objectMode: !!options.parser, // Should this always be true?
})
this.client = client
this.streamName = streamName
this.options = Object.assign({interval: 2000}, options)
this.options = Object.assign({
interval: 2000,
parser: (x) => x,
}, options)
this._started = false // TODO this is probably built into Streams
this.iterators = new Set()
}
Expand Down Expand Up @@ -85,7 +87,7 @@ class KinesisStreamReader extends Readable {
if (data.MillisBehindLatest > 60 * 1000) {
debug('warning: behind by %d milliseconds', data.MillisBehindLatest)
}
data.Records.forEach((x) => this.push(x.Data))
data.Records.forEach((x) => this.push(this.options.parser(x.Data)))
if (data.Records.length) {
this.emit('checkpoint', data.Records[data.Records.length - 1].SequenceNumber)
}
Expand Down
18 changes: 16 additions & 2 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ describe('main', () => {
})

it('continues to read open shard', () => {
const clock = sinon.useFakeTimers()
const clock = sandbox.useFakeTimers()
const getNextIterator = sinon.stub()
const record = {
Data: '',
Expand All @@ -215,7 +215,21 @@ describe('main', () => {
assert.strictEqual(getNextIterator.callCount, 1)
clock.tick(10000) // A number bigger than the idle time
assert.strictEqual(getNextIterator.callCount, 2)
clock.restore()
})

it('parses incoming records', () => {
const record = {
Data: '{}',
SequenceNumber: 'seq-1',
}
const getNextIterator = sinon.stub().returns(undefined)
client.getRecords = (params, cb) =>
cb(undefined, {Records: [record], NextShardIterator: getNextIterator()})
const reader = new main.KinesisStreamReader(client, 'stream name', {
parser: JSON.parse,
})

reader.readShard('shard-iterator-3')
})
})
})
Expand Down

0 comments on commit 4a0f3c5

Please sign in to comment.