Skip to content

Commit

Permalink
Make the user of the reader supply an AWS client (#24)
Browse files Browse the repository at this point in the history
Currently it makes a lot of assumptions about how to connect to AWS. This moves the responsibility for setting up the connection to AWS from the library to the user of the library. As a side benefit, tests get a lot simpler.

Inspired by https://github.com/rclark/kinesis-readable

closes #23 

* greenkeeper

* refactor so client is passed in

* yagni

* cleanup

* doc update

* cleanup package.json
  • Loading branch information
crccheck committed Feb 18, 2017
1 parent 1824ec9 commit 37ec665
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 65 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ Usage as a package
Hey, you can import this module into your own project to use as Kinesis Stream
Reader readable stream too!

const AWS = require('aws-sdk')
const { KinesisStreamReader } = require('kinesis-console-consumer')
const reader = new KinesisStreamReader(streamName, options)
const client = AWS.Kinesis()
const reader = new KinesisStreamReader(client, streamName, options)
reader.pipe(yourDestinationHere)


Expand Down
10 changes: 8 additions & 2 deletions cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
'use strict'

const fs = require('fs')

const AWS = require('aws-sdk')
const program = require('commander')
const updateNotifier = require('update-notifier')

const index = require('./')

const pkg = JSON.parse(fs.readFileSync(`${__dirname}/package.json`))
const client = new AWS.Kinesis()

program
.version(pkg.version)
Expand Down Expand Up @@ -43,13 +47,15 @@ program
} else {
options.ShardIteratorType = 'LATEST'
}
const reader = new index.KinesisStreamReader(streamName, options)
const reader = new index.KinesisStreamReader(client, streamName, options)
reader.pipe(process.stdout)
})
.parse(process.argv)

if (!program.args.length) {
index.getStreams().then((data) => console.log(data))
index.getStreams(client)
.then(console.log)
.catch(console.error)
}

updateNotifier({pkg}).notify()
24 changes: 11 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
'use strict'
// http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html
const Readable = require('stream').Readable
const AWS = require('aws-sdk')
const debug = require('debug')('kinesis-console-consumer')

const kinesis = new AWS.Kinesis()


function getStreams () {
return kinesis.listStreams({}).promise()
function getStreams (client) {
return client.listStreams({}).promise()
}

function getShardId (streamName) {
function getShardId (client, streamName) {
const params = {
StreamName: streamName,
}
return kinesis.describeStream(params).promise()
return client.describeStream(params).promise()
.then((data) => {
if (!data.StreamDescription.Shards.length) {
throw new Error('No shards!')
Expand All @@ -26,33 +23,34 @@ function getShardId (streamName) {
})
}

function getShardIterator (streamName, shardId, options) {
function getShardIterator (client, streamName, shardId, options) {
const params = Object.assign({
ShardId: shardId,
ShardIteratorType: 'LATEST',
StreamName: streamName,
}, options || {})
return kinesis.getShardIterator(params).promise()
return client.getShardIterator(params).promise()
.then((data) => {
debug('getShardIterator got iterator id: %s', data.ShardIterator)
return data.ShardIterator
})
}

class KinesisStreamReader extends Readable {
constructor (streamName, options) {
constructor (client, streamName, options) {
// is this objectMode since we get whole objects at a time?
super({})
this.client = client
this._started = false // TODO this is probably built into Streams
this._streamName = streamName
this._shardIteratorOptions = options
}

_startKinesis () {
return getShardId(this._streamName)
return getShardId(this.client, this._streamName)
.then((shardIds) => {
const shardIterators = shardIds.map((shardId) =>
getShardIterator(this._streamName, shardId, this._shardIteratorOptions))
getShardIterator(this.client, this._streamName, shardId, this._shardIteratorOptions))
return Promise.all(shardIterators)
})
.then((shardIterators) => {
Expand All @@ -70,7 +68,7 @@ class KinesisStreamReader extends Readable {
Limit: 10000, // https://github.com/awslabs/amazon-kinesis-client/issues/4#issuecomment-56859367
}
// Not written using Promises because they make it harder to keep the program alive here
kinesis.getRecords(params, (err, data) => {
this.client.getRecords(params, (err, data) => {
if (err) {
this.emit('error', err) || console.log(err, err.stack)
return
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"keywords": [
"aws",
"kinesis",
"stream",
"cli"
],
"author": "Chris Chang <c@crccheck.com> (http://crccheck.com/blog)",
Expand All @@ -28,7 +29,7 @@
"aws-sdk": "^2.16.0",
"commander": "^2.9.0",
"debug": "^2.6.1",
"update-notifier": "^2.0.0"
"update-notifier": "^2.1.0"
},
"devDependencies": {
"eslint": "^3.15.0",
Expand All @@ -37,7 +38,6 @@
"eslint-plugin-standard": "^2.0.1",
"istanbul": "^0.4.5",
"mocha": "^3.2.0",
"proxyquire": "^1.7.11",
"sinon": "^1.17.7"
},
"repository": {
Expand Down
77 changes: 30 additions & 47 deletions test/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
'use strict'

const assert = require('assert')
const proxyquire = require('proxyquire').noCallThru()
const sinon = require('sinon')

const main = require('../index')

// HELPERS
//////////

Expand All @@ -23,16 +24,12 @@ const AWSPromise = {


describe('main', () => {
let AWS
let client
let sandbox

beforeEach(() => {
client = {}
sandbox = sinon.sandbox.create()
sandbox.stub(console, 'log')
sandbox.stub(console, 'error')
AWS = {
Kinesis: class {},
}
})

afterEach(() => {
Expand All @@ -41,18 +38,16 @@ describe('main', () => {

describe('getStreams', () => {
it('returns data from AWS', () => {
AWS.Kinesis.prototype.listStreams = AWSPromise.resolve('dat data')
const main = proxyquire('../index', {'aws-sdk': AWS})
main.getStreams()
client.listStreams = AWSPromise.resolve('dat data')
main.getStreams(client)
.then((data) => {
assert.strictEqual(data, 'dat data')
})
})

it('handles errors', () => {
AWS.Kinesis.prototype.listStreams = AWSPromise.reject('lol error')
const main = proxyquire('../index', {'aws-sdk': AWS})
return main.getStreams()
client.listStreams = AWSPromise.reject('lol error')
return main.getStreams(client)
.then((data) => {
assert.strictEqual(true, false)
})
Expand All @@ -64,9 +59,8 @@ describe('main', () => {

describe('getShardId', () => {
it('throws when there are no shards', () => {
AWS.Kinesis.prototype.describeStream = AWSPromise.resolve({StreamDescription: {Shards: []}})
const main = proxyquire('../index', {'aws-sdk': AWS})
return main._getShardId()
client.describeStream = AWSPromise.resolve({StreamDescription: {Shards: []}})
return main._getShardId(client)
.then((data) => {
assert.ok(false, 'This should never run')
})
Expand All @@ -76,18 +70,16 @@ describe('main', () => {
})

it('gets shard id', () => {
AWS.Kinesis.prototype.describeStream = AWSPromise.resolve({StreamDescription: {Shards: [{ShardId: 'shard id'}]}})
const main = proxyquire('../index', {'aws-sdk': AWS})
return main._getShardId()
client.describeStream = AWSPromise.resolve({StreamDescription: {Shards: [{ShardId: 'shard id'}]}})
return main._getShardId(client)
.then((data) => {
assert.deepEqual(data, ['shard id'])
})
})

it('handles errors', () => {
AWS.Kinesis.prototype.describeStream = AWSPromise.reject('lol error')
const main = proxyquire('../index', {'aws-sdk': AWS})
return main._getShardId()
client.describeStream = AWSPromise.reject('lol error')
return main._getShardId(client)
.then((data) => {
assert.strictEqual(true, false)
})
Expand All @@ -99,18 +91,16 @@ describe('main', () => {

describe('getShardIterator', () => {
it('gets shard iterator', () => {
AWS.Kinesis.prototype.getShardIterator = AWSPromise.resolve({ShardIterator: 'shard iterator'})
const main = proxyquire('../index', {'aws-sdk': AWS})
return main._getShardIterator()
client.getShardIterator = AWSPromise.resolve({ShardIterator: 'shard iterator'})
return main._getShardIterator(client)
.then((data) => {
assert.strictEqual(data, 'shard iterator')
})
})

it('handles errors', () => {
AWS.Kinesis.prototype.getShardIterator = AWSPromise.reject('lol error')
const main = proxyquire('../index', {'aws-sdk': AWS})
return main._getShardIterator()
client.getShardIterator = AWSPromise.reject('lol error')
return main._getShardIterator(client)
.then((data) => {
assert.strictEqual(true, false)
})
Expand All @@ -122,18 +112,16 @@ describe('main', () => {

describe('KinesisStreamReader', () => {
it('constructor sets arguments', () => {
const KinesisStreamReader = require('../index').KinesisStreamReader
const reader = new KinesisStreamReader('stream name', {foo: 'bar'})
const reader = new main.KinesisStreamReader(client, 'stream name', {foo: 'bar'})
assert.ok(reader)
assert.equal(reader._streamName, 'stream name')
assert.equal(reader._shardIteratorOptions.foo, 'bar')
})

describe('_startKinesis', () => {
it('emits error when there is an error', () => {
AWS.Kinesis.prototype.describeStream = AWSPromise.reject('lol error')
const KinesisStreamReader = proxyquire('../index', {'aws-sdk': AWS}).KinesisStreamReader
const reader = new KinesisStreamReader('stream name', {foo: 'bar'})
client.describeStream = AWSPromise.reject('lol error')
const reader = new main.KinesisStreamReader(client, 'stream name', {foo: 'bar'})

reader.once('error', (err) => {
assert.equal(err, 'lol error')
Expand All @@ -143,9 +131,8 @@ describe('main', () => {
})

xit('logs when there is an error', () => {
AWS.Kinesis.prototype.describeStream = AWSPromise.reject('lol error')
const KinesisStreamReader = proxyquire('../index', {'aws-sdk': AWS}).KinesisStreamReader
const reader = new KinesisStreamReader('stream name', {foo: 'bar'})
client.describeStream = AWSPromise.reject('lol error')
const reader = new main.KinesisStreamReader(client, 'stream name', {foo: 'bar'})

return reader._startKinesis('stream name', {})
.then(() => {
Expand All @@ -156,9 +143,8 @@ describe('main', () => {

describe('readShard', () => {
it('exits when there is an error', () => {
AWS.Kinesis.prototype.getRecords = (params, cb) => cb('mock error')
const KinesisStreamReader = proxyquire('../index', {'aws-sdk': AWS}).KinesisStreamReader
const reader = new KinesisStreamReader('stream name', {foo: 'bar'})
client.getRecords = (params, cb) => cb('mock error')
const reader = new main.KinesisStreamReader(client, 'stream name', {foo: 'bar'})

reader.once('error', (err) => {
assert.equal(err, 'mock error')
Expand All @@ -168,9 +154,8 @@ describe('main', () => {
})

it('exits when shard is closed', () => {
AWS.Kinesis.prototype.getRecords = (params, cb) => cb(undefined, {Records: []})
const KinesisStreamReader = proxyquire('../index', {'aws-sdk': AWS}).KinesisStreamReader
const reader = new KinesisStreamReader('stream name', {foo: 'bar'})
client.getRecords = (params, cb) => cb(undefined, {Records: []})
const reader = new main.KinesisStreamReader(client, 'stream name', {foo: 'bar'})

reader.once('error', () => {
assert.ok(false, 'this should never run')
Expand All @@ -184,10 +169,9 @@ describe('main', () => {
const getNextIterator = sinon.stub()
getNextIterator.onFirstCall().returns('shard iterator')
getNextIterator.onSecondCall().returns(undefined)
AWS.Kinesis.prototype.getRecords = (params, cb) =>
client.getRecords = (params, cb) =>
cb(undefined, {Records: [{Data: ''}], NextShardIterator: getNextIterator()})
const KinesisStreamReader = proxyquire('../index', {'aws-sdk': AWS}).KinesisStreamReader
const reader = new KinesisStreamReader('stream name', {foo: 'bar'})
const reader = new main.KinesisStreamReader(client, 'stream name', {foo: 'bar'})

reader.once('error', () => {
assert.ok(false, 'this should never run')
Expand All @@ -203,8 +187,7 @@ describe('main', () => {
})

it('_read only calls _startKinesis once', () => {
const KinesisStreamReader = proxyquire('../index', {'aws-sdk': AWS}).KinesisStreamReader
const reader = new KinesisStreamReader('stream name', {foo: 'bar'})
const reader = new main.KinesisStreamReader(client, 'stream name', {foo: 'bar'})
sandbox.stub(reader, '_startKinesis').returns(Promise.resolve())

reader._read()
Expand Down

0 comments on commit 37ec665

Please sign in to comment.