Skip to content

Commit

Permalink
feat(analytics): Add analytics provider for AWS Firehose (#2823)
Browse files Browse the repository at this point in the history
* Add Firehose provider for analytics
* Add KinesisFirehose provider unit test
  • Loading branch information
seang96 authored and iartemiev committed Nov 21, 2019
1 parent 6e7d693 commit 29103ae
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
jest.mock('aws-sdk/clients/firehose', () => {
const Firehose = () => {
const firehose = null;
return firehose;
};

Firehose.prototype.putRecordBatch = (params, callback) => {
callback(null, 'data');
};

return Firehose;
});

import * as Firehose from 'aws-sdk/clients/firehose';
import { Credentials } from '@aws-amplify/core';
import KinesisFirehoseProvider from '../../src/Providers/AWSKinesisFirehoseProvider';

const credentials = {
accessKeyId: 'accessKeyId',
sessionToken: 'sessionToken',
secretAccessKey: 'secretAccessKey',
identityId: 'identityId',
authenticated: true,
};

jest.useFakeTimers();

describe('kinesis firehose provider test', () => {
describe('getCategory test', () => {
test('happy case', () => {
const analytics = new KinesisFirehoseProvider();

expect(analytics.getCategory()).toBe('Analytics');
});
});

describe('getProviderName test', () => {
test('happy case', () => {
const analytics = new KinesisFirehoseProvider();

expect(analytics.getProviderName()).toBe('AWSKinesisFirehose');
});
});

describe('configure test', () => {
test('happy case', () => {
const analytics = new KinesisFirehoseProvider();

expect(analytics.configure({ region: 'region1' })).toEqual({
bufferSize: 1000,
flushInterval: 5000,
flushSize: 100,
region: 'region1',
resendLimit: 5,
});
});
});

describe('record test', () => {
test('record without credentials', async () => {
const analytics = new KinesisFirehoseProvider();

const spyon = jest
.spyOn(Credentials, 'get')
.mockImplementationOnce(() => {
return Promise.reject('err');
});

expect(await analytics.record('params')).toBe(false);
spyon.mockRestore();
});

test('record happy case', async () => {
const analytics = new KinesisFirehoseProvider();
analytics.configure({ region: 'region1' });

const spyon = jest.spyOn(Firehose.prototype, 'putRecordBatch');

jest.spyOn(Credentials, 'get').mockImplementationOnce(() => {
return Promise.resolve(credentials);
});

await analytics.record({
event: {
data: {
data: 'data',
},
streamName: 'stream',
},
config: {},
});

jest.advanceTimersByTime(6000);

expect(spyon).toBeCalled();

spyon.mockRestore();
});
});
});
111 changes: 111 additions & 0 deletions packages/analytics/src/Providers/AWSKinesisFirehoseProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

import { ConsoleLogger as Logger } from '@aws-amplify/core';
import { AWSKinesisProvider } from './index';
import * as Firehose from 'aws-sdk/clients/firehose';

const logger = new Logger('AWSKineisFirehoseProvider');

export default class AWSKinesisFirehoseProvider extends AWSKinesisProvider {
private _kinesisFirehose;

constructor(config?) {
super(config);
}

/**
* get provider name of the plugin
*/
public getProviderName(): string {
return 'AWSKinesisFirehose';
}

protected _sendEvents(group) {
if (group.length === 0) {
return;
}

const { config, credentials } = group[0];

const initClients = this._init(config, credentials);
if (!initClients) return false;

const records = {};

group.map(params => {
// spit by streamName
const evt = params.event;
const { streamName } = evt;
if (records[streamName] === undefined) {
records[streamName] = [];
}

const PartitionKey =
evt.partitionKey || `partition-${credentials.identityId}`;

Object.assign(evt.data, { PartitionKey });
const Data = JSON.stringify(evt.data);
const record = { Data };
records[streamName].push(record);
});

Object.keys(records).map(streamName => {
logger.debug(
'putting records to kinesis',
streamName,
'with records',
records[streamName]
);
this._kinesisFirehose.putRecordBatch(
{
Records: records[streamName],
DeliveryStreamName: streamName,
},
err => {
if (err) logger.debug('Failed to upload records to Kinesis', err);
else logger.debug('Upload records to stream', streamName);
}
);
});
}

protected _init(config, credentials) {
logger.debug('init clients');

if (
this._kinesisFirehose &&
this._config.credentials &&
this._config.credentials.sessionToken === credentials.sessionToken &&
this._config.credentials.identityId === credentials.identityId
) {
logger.debug('no change for analytics config, directly return from init');
return true;
}

this._config.credentials = credentials;
const { region } = config;

return this._initFirehose(region, credentials);
}

private _initFirehose(region, credentials) {
logger.debug('initialize kinesis firehose with credentials', credentials);
this._kinesisFirehose = new Firehose({
apiVersion: '2015-08-04',
region,
credentials,
});
return true;
}
}
30 changes: 13 additions & 17 deletions packages/analytics/src/Providers/AWSKinesisProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
Expand All @@ -13,7 +13,6 @@

import { ConsoleLogger as Logger, Credentials } from '@aws-amplify/core';
import * as Kinesis from 'aws-sdk/clients/kinesis';
import Cache from '@aws-amplify/cache';
import { AnalyticsProvider } from '../types';

const logger = new Logger('AWSKineisProvider');
Expand All @@ -25,23 +24,19 @@ const FLUSH_INTERVAL = 5 * 1000; // 5s
const RESEND_LIMIT = 5;

export default class AWSKinesisProvider implements AnalyticsProvider {
private _config;
protected _config;
private _kinesis;
private _buffer;
private _timer;

constructor(config?) {
this._buffer = [];
this._config = config ? config : {};
this._config = config || {};
this._config.bufferSize = this._config.bufferSize || BUFFER_SIZE;
this._config.flushSize = this._config.flushSize || FLUSH_SIZE;
this._config.flushInterval = this._config.flushInterval || FLUSH_INTERVAL;
this._config.resendLimit = this._config.resendLimit || RESEND_LIMIT;

// events batch
const that = this;

// flush event buffer
this._setupTimer();
}

Expand All @@ -50,7 +45,6 @@ export default class AWSKinesisProvider implements AnalyticsProvider {
clearInterval(this._timer);
}
const { flushSize, flushInterval } = this._config;
const that = this;
this._timer = setInterval(() => {
const size =
this._buffer.length < flushSize ? this._buffer.length : flushSize;
Expand All @@ -59,7 +53,7 @@ export default class AWSKinesisProvider implements AnalyticsProvider {
const params = this._buffer.shift();
events.push(params);
}
that._sendFromBuffer(events);
this._sendFromBuffer(events);
}, flushInterval);
}

Expand All @@ -83,7 +77,7 @@ export default class AWSKinesisProvider implements AnalyticsProvider {
*/
public configure(config): object {
logger.debug('configure Analytics', config);
const conf = config ? config : {};
const conf = config || {};
this._config = Object.assign({}, this._config, conf);

this._setupTimer();
Expand Down Expand Up @@ -156,9 +150,8 @@ export default class AWSKinesisProvider implements AnalyticsProvider {
});
}

private _sendEvents(group) {
protected _sendEvents(group) {
if (group.length === 0) {
// logger.debug('events array is empty, directly return');
return;
}

Expand Down Expand Up @@ -202,7 +195,7 @@ export default class AWSKinesisProvider implements AnalyticsProvider {
});
}

private _init(config, credentials) {
protected _init(config, credentials) {
logger.debug('init clients');

if (
Expand All @@ -217,13 +210,17 @@ export default class AWSKinesisProvider implements AnalyticsProvider {

this._config.credentials = credentials;
const { region } = config;

return this._initKinesis(region, credentials);
}

private _initKinesis(region, credentials) {
logger.debug('initialize kinesis with credentials', credentials);
this._kinesis = new Kinesis({
apiVersion: '2013-12-02',
region,
credentials,
});

return true;
}

Expand All @@ -232,11 +229,10 @@ export default class AWSKinesisProvider implements AnalyticsProvider {
* check if current credentials exists
*/
private _getCredentials() {
const that = this;
return Credentials.get()
.then(credentials => {
if (!credentials) return null;
logger.debug('set credentials for analytics', that._config.credentials);
logger.debug('set credentials for analytics', this._config.credentials);
return Credentials.shear(credentials);
})
.catch(err => {
Expand Down
3 changes: 2 additions & 1 deletion packages/analytics/src/Providers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import AWSPinpointProvider from './AWSPinpointProvider';
import AWSKinesisProvider from './AWSKinesisProvider';
import AWSKinesisFirehoseProvider from './AWSKinesisFirehoseProvider';
import AmazonPersonalizeProvider from './AmazonPersonalizeProvider';

export { AWSPinpointProvider, AWSKinesisProvider, AmazonPersonalizeProvider };
export { AWSPinpointProvider, AWSKinesisProvider, AWSKinesisFirehoseProvider, AmazonPersonalizeProvider };
2 changes: 2 additions & 0 deletions packages/aws-amplify/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Analytics, {
AnalyticsProvider,
AWSPinpointProvider,
AWSKinesisProvider,
AWSKinesisFirehoseProvider,
AmazonPersonalizeProvider,
} from '@aws-amplify/analytics';
import Auth, { AuthClass } from '@aws-amplify/auth';
Expand Down Expand Up @@ -87,6 +88,7 @@ export {
AnalyticsProvider,
AWSPinpointProvider,
AWSKinesisProvider,
AWSKinesisFirehoseProvider,
AmazonPersonalizeProvider,
};
export { graphqlOperation };

0 comments on commit 29103ae

Please sign in to comment.