Skip to content

Commit

Permalink
Merge 22985d8 into 34ac7e1
Browse files Browse the repository at this point in the history
  • Loading branch information
crccheck committed Jan 7, 2018
2 parents 34ac7e1 + 22985d8 commit 5b85f27
Show file tree
Hide file tree
Showing 6 changed files with 2,709 additions and 29 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
language: node_js
node_js:
- "6"
- "5"
- "4"
- "7"
- "8"
before_script:
- npm install coveralls mocha-lcov-reporter
script: npm run travisci
4 changes: 2 additions & 2 deletions cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ program
} else {
options.ShardIteratorType = 'LATEST'
}
if(program.newLine) {
if (program.newLine) {
options.NewLine = true
} else {
options.Newline = false
}
if (program.regexFilter) {
options.RegexFilter = program.regexFilter
} else {
options.RegexFilter = ".*"
options.RegexFilter = '.*'
}
const reader = new index.KinesisStreamReader(client, streamName, options)
reader.pipe(process.stdout)
Expand Down
12 changes: 6 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ function getShardIterator (client, streamName, shardId, options) {
class KinesisStreamReader extends Readable {
constructor (client, streamName, options) {
super({
objectMode: !!options.parser, // Should this always be true?
objectMode: !!options.parser, // Should this always be true?
})
this.client = client
this.streamName = streamName
this.options = Object.assign({
interval: 2000,
parser: (x) => x,
filter: new RegExp(options.RegexFilter)
filter: new RegExp(options.RegexFilter),
}, options)
this._started = false // TODO this is probably built into Streams
this._started = false // TODO this is probably built into Streams
this.iterators = new Set()
}

Expand Down Expand Up @@ -76,7 +76,7 @@ class KinesisStreamReader extends Readable {
debug('readShard starting from %s (out of %d)', shardIterator, this.iterators.size)
const params = {
ShardIterator: shardIterator,
Limit: 10000, // https://github.com/awslabs/amazon-kinesis-client/issues/4#issuecomment-56859367
Limit: 10000, // https://github.com/awslabs/amazon-kinesis-client/issues/4#issuecomment-56859367
}
// Not written using Promises because they make it harder to keep the program alive here
this.client.getRecords(params, (err, data) => {
Expand All @@ -90,8 +90,8 @@ class KinesisStreamReader extends Readable {
}
data.Records.forEach((x) => {
var record = this.options.parser(x.Data)
if(this.options.NewLine) record += '\n'
if(this.options.filter.test(record)) this.push(record)
if (this.options.NewLine) record += '\n'
if (this.options.filter.test(record)) this.push(record)
})

if (data.Records.length) {
Expand Down

0 comments on commit 5b85f27

Please sign in to comment.