Skip to content

Commit

Permalink
feat: added firehose client
Browse files Browse the repository at this point in the history
Closes #27
  • Loading branch information
theburningmonk committed Aug 23, 2019
1 parent ffb4abf commit 535e788
Show file tree
Hide file tree
Showing 5 changed files with 514 additions and 0 deletions.
49 changes: 49 additions & 0 deletions packages/lambda-powertools-firehose-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# `lambda-powertools-firehose-client`

Firehose client wrapper that knows how to forward correlation IDs (captured via `@dazn/lambda-powertools-correlation-ids`).

Main features:

* auto-injects correlation IDs into Firehose records when you call `putRecord` or `putRecordBatch` (only JSON payloads are supported currently)

* direct replacement for `AWS.Firehose` client

## Getting Started

Install from NPM: `npm install @dazn/lambda-powertools-firehose-client`

## API

It's exactly the same as the Kinesis client from the AWS SDK.

```js
const Firehose = require('@dazn/lambda-powertools-firehose-client')

const publishEvent = async () => {
const putRecordReq = {
DeliveryStreamName: 'lambda-powertools-demo',
PartitionKey: uuid(),
Data: JSON.stringify({ message: 'hello firehose' })
}

await Firehose.putRecord(putRecordReq).promise()
}

const publishEvents = async () => {
const putRecordBatchReq = {
DeliveryStreamName: 'lambda-powertools-demo',
Records: [
{
PartitionKey: uuid(),
Data: JSON.stringify({ message: 'hello kinesis' })
},
{
PartitionKey: uuid(),
Data: JSON.stringify({ message: 'hello lambda-powertools' })
}
]
}

await Firehose.putRecordBatch(putRecordBatchReq).promise()
}
```
257 changes: 257 additions & 0 deletions packages/lambda-powertools-firehose-client/__tests__/index.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
const AWS = require('aws-sdk')

const mockPutRecord = jest.fn()
const mockPutRecordBatch = jest.fn()
AWS.Firehose.prototype.putRecord = mockPutRecord
AWS.Firehose.prototype.putRecordBatch = mockPutRecordBatch

global.console.log = jest.fn()

const Firehose = require('../index')
const CorrelationIds = require('@dazn/lambda-powertools-correlation-ids')

beforeEach(() => {
mockPutRecord.mockReturnValueOnce({
promise: async () => Promise.resolve()
})

mockPutRecordBatch.mockReturnValueOnce({
promise: async () => Promise.resolve()
})
})

afterEach(() => {
mockPutRecord.mockClear()
mockPutRecordBatch.mockClear()

CorrelationIds.clearAll()
})

const verifyPutRecordContext = async (f) => {
const data = JSON.stringify({
eventType: 'wrote_test',
username: 'theburningmonk'
})
const params = {
Data: data,
DeliveryStreamName: 'test'
}
await Firehose.putRecord(params).promise()

expect(mockPutRecord).toBeCalled()
const actualParams = mockPutRecord.mock.calls[0][0]
const actualData = JSON.parse(actualParams.Data)
f(actualData.__context__)
}

const verifyPutRecordWithCorrelationIdsContext = async (correlationIds, f) => {
const data = JSON.stringify({
eventType: 'wrote_test',
username: 'theburningmonk'
})
const params = {
Data: data,
DeliveryStreamName: 'test'
}
await Firehose.putRecordWithCorrelationIds(correlationIds, params).promise()

expect(mockPutRecord).toBeCalled()
const actualParams = mockPutRecord.mock.calls[0][0]
const actualData = JSON.parse(actualParams.Data)
f(actualData.__context__)
}

const verifyPutRecordBatchContext = async (f) => {
const eventTypes = [
'wrote_test',
'ran_test',
'pass_test'
]
const records = eventTypes
.map(eventType => {
const data = { eventType, username: 'theburningmonk' }
return {
Data: JSON.stringify(data)
}
})
const params = {
Records: records,
DeliveryStreamName: 'test'
}
await Firehose.putRecordBatch(params).promise()

expect(mockPutRecordBatch).toBeCalled()
const actualParams = mockPutRecordBatch.mock.calls[0][0]
actualParams.Records.forEach(record => {
const actualData = JSON.parse(record.Data)
f(actualData.__context__)
})
}

const verifyPutRecordBatchWithCorrelationIdsContext = async (correlationIds, f) => {
const eventTypes = [
'wrote_test',
'ran_test',
'pass_test'
]
const records = eventTypes
.map(eventType => {
const data = { eventType, username: 'theburningmonk' }
return {
Data: JSON.stringify(data)
}
})
const params = {
Records: records,
DeliveryStreamName: 'test'
}
await Firehose.putRecordBatchWithCorrelationIds(correlationIds, params).promise()

expect(mockPutRecordBatch).toBeCalled()
const actualParams = mockPutRecordBatch.mock.calls[0][0]
actualParams.Records.forEach(record => {
const actualData = JSON.parse(record.Data)
f(actualData.__context__)
})
}

describe('Firehose client', () => {
describe('.putRecord', () => {
describe('when there are no correlation IDs', () => {
it('sends empty __context__ ', async () => {
await verifyPutRecordContext(x => expect(x).toEqual({}))
})
})

describe('when there are global correlationIds', () => {
it('forwards them in __context__', async () => {
const correlationIds = {
'x-correlation-id': 'id',
'debug-log-enabled': 'true'
}
CorrelationIds.replaceAllWith(correlationIds)

await verifyPutRecordContext(x => {
expect(x).toEqual({
'x-correlation-id': 'id',
'debug-log-enabled': 'true'
})
})
})
})

describe('when payload is not JSON', () => {
it('does not modify the request', async () => {
const params = {
Data: 'dGhpcyBpcyBub3QgSlNPTg==',
DeliveryStreamName: 'test'
}
await Firehose.putRecord(params).promise()

expect(mockPutRecord).toBeCalledWith(params)
})
})

describe('when payload is binary', () => {
it('does not modify the request', async () => {
const params = {
DeliveryStreamName: 'test',
Data: Buffer.from('dGhpcyBpcyBub3QgSlNPTg==', 'base64')
}

await Firehose.putRecord(params).promise()

expect(mockPutRecord).toBeCalledWith(params)
})
})
})

describe('.putRecordWithCorrelationIds', () => {
it('forwards given correlationIds in __context__ field', async () => {
const correlationIds = new CorrelationIds({
'x-correlation-id': 'child-id',
'debug-log-enabled': 'true'
})

await verifyPutRecordWithCorrelationIdsContext(correlationIds, x => {
expect(x).toEqual({
'x-correlation-id': 'child-id',
'debug-log-enabled': 'true'
})
})
})
})

describe('.putRecordBatch', () => {
describe('when there are no correlation IDs', () => {
it('sends empty __context__ ', async () => {
await verifyPutRecordBatchContext(x => expect(x).toEqual({}))
})
})

describe('when there are global correlationIds', () => {
it('forwards them in __context__', async () => {
const correlationIds = {
'x-correlation-id': 'id',
'debug-log-enabled': 'true'
}
CorrelationIds.replaceAllWith(correlationIds)

await verifyPutRecordBatchContext(x => {
expect(x).toEqual({
'x-correlation-id': 'id',
'debug-log-enabled': 'true'
})
})
})
})

describe('when payload is not JSON', () => {
it('does not modify the request', async () => {
const params = {
Records: [
{ Data: 'dGhpcyBpcyBub3QgSlNPTg==' },
{ Data: 'dGhpcyBpcyBhbHNvIG5vdCBKU09O' },
{ Data: 'c29ycnksIHN0aWxsIG5vdCBKU09O' }
],
DeliveryStreamName: 'test'
}
await Firehose.putRecordBatch(params).promise()

expect(mockPutRecordBatch).toBeCalledWith(params)
})
})

describe('when payload is binary', () => {
it('does not modify the request', async () => {
const params = {
Records: [
{ Data: Buffer.from('dGhpcyBpcyBub3QgSlNPTg==', 'base64') },
{ Data: Buffer.from('dGhpcyBpcyBhbHNvIG5vdCBKU09O', 'base64') },
{ Data: Buffer.from('c29ycnksIHN0aWxsIG5vdCBKU09O', 'base64') }
],
DeliveryStreamName: 'test'
}
await Firehose.putRecordBatch(params).promise()

expect(mockPutRecordBatch).toBeCalledWith(params)
})
})
})

describe('.putRecordBatchWithCorrelationIds', () => {
it('forwards given correlationIds in __context__ field', async () => {
const correlationIds = new CorrelationIds({
'x-correlation-id': 'child-id',
'debug-log-enabled': 'true'
})

await verifyPutRecordBatchWithCorrelationIdsContext(correlationIds, x => {
expect(x).toEqual({
'x-correlation-id': 'child-id',
'debug-log-enabled': 'true'
})
})
})
})
})
87 changes: 87 additions & 0 deletions packages/lambda-powertools-firehose-client/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const AWS = require('aws-sdk')
const https = require('https')
const sslAgent = new https.Agent({
keepAlive: true,
maxSockets: 50,
rejectUnauthorized: true
})
sslAgent.setMaxListeners(0)

AWS.config.update({
httpOptions: {
agent: sslAgent
}
})

const client = new AWS.Firehose()
const Log = require('@dazn/lambda-powertools-logger')
const CorrelationIds = require('@dazn/lambda-powertools-correlation-ids')

function tryJsonParse (data) {
if (typeof data !== 'string') {
return null
}

try {
return JSON.parse(data)
} catch (err) {
Log.warn('only JSON string data can be modified to insert correlation IDs', null, err)
return null
}
}

function addCorrelationIds (correlationIds, data) {
// only do this with JSON string data
const payload = tryJsonParse(data)
if (!payload) {
return data
}

const ids = correlationIds.get()
const newData = {
__context__: ids,
...payload
}
return JSON.stringify(newData)
}

client._putRecord = client.putRecord

client.putRecord = (...args) => {
return client.putRecordWithCorrelationIds(CorrelationIds, ...args)
}

client.putRecordWithCorrelationIds = (correlationIds, params, ...args) => {
const newData = addCorrelationIds(correlationIds, params.Data)
const extendedParams = {
...params,
Data: newData
}

return client._putRecord(extendedParams, ...args)
}

client._putRecordBatch = client.putRecordBatch

client.putRecordBatch = (...args) => {
return client.putRecordBatchWithCorrelationIds(CorrelationIds, ...args)
}

client.putRecordBatchWithCorrelationIds = (correlationIds, params, ...args) => {
const newRecords = params.Records.map(record => {
const newData = addCorrelationIds(correlationIds, record.Data)
return {
...record,
Data: newData
}
})

const extendedParams = {
...params,
Records: newRecords
}

return client._putRecordBatch(extendedParams, ...args)
}

module.exports = client
Loading

0 comments on commit 535e788

Please sign in to comment.