Skip to content

Commit

Permalink
68 subscribe to platform events fix (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
if0s committed Sep 21, 2023
1 parent 37794b9 commit 176defa
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 35 deletions.
8 changes: 4 additions & 4 deletions .grype-ignore.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
ignore:
- vulnerability: CVE-2023-2650
- vulnerability: CVE-2023-4807
package:
name: libssl3
version: 3.1.0-r4
version: 3.1.2-r0

- vulnerability: CVE-2023-2650
- vulnerability: CVE-2023-4807
package:
name: libcrypto3
version: 3.1.0-r4
version: 3.1.2-r0
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.7.1 (September 21, 2023)
* Improvements in `Subscribe to platform events` trigger:
* Added retry on connections lost
* Changed the behavior where new logs would appear in the first execution regardless of which message they belonged to. Now, all messages will be displayed in their appropriate execution
* Logs with `Going to fetch secret` set to debug level

## 2.7.0 (June 29, 2023)
Added support for files attachment by providing a URL in the body for all actions where it is used

Expand Down
2 changes: 1 addition & 1 deletion component.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
"url": "http://www.salesforce.com/",
"version": "2.7.0",
"version": "2.7.1",
"authClientTypes": [
"oauth2"
],
Expand Down
71 changes: 44 additions & 27 deletions lib/triggers/streamPlatformEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,86 @@ const { getSecret, refreshToken } = require('../util');
const { SALESFORCE_API_VERSION } = require('../common.js').globalConsts;

let fayeClient;
let context;
let status;
/**
* This method will be called from elastic.io platform providing following data
*
* @param msg incoming message object that contains ``body`` with payload
* @param configuration configuration that is account information and configuration field values
*/
async function processTrigger(msg, configuration) {
this.logger.info('Starting Subscribe to platform events Trigger');
context = this;
if (status === 'online') {
this.logger.info('Subscription is still running, waiting for new messages');
return;
}
context.logger.info('Starting Subscribe to platform events Trigger');

const { secretId } = configuration;
if (!secretId) {
this.logger.error('secretId is missing in configuration, credentials cannot be fetched');
context.logger.error('secretId is missing in configuration, credentials cannot be fetched');
throw new Error('secretId is missing in configuration, credentials cannot be fetched');
}
this.logger.debug('Fetching credentials by secretId');
context.logger.debug('Fetching credentials by secretId');
const { credentials } = await getSecret(this, secretId, msg.id);
const accessToken = credentials.access_token;
const instanceUrl = credentials.undefined_params.instance_url;
this.logger.debug('Preparing SalesForce connection...');
context.logger.debug('Preparing SalesForce connection...');
const connection = new jsforce.Connection({
instanceUrl,
accessToken,
version: SALESFORCE_API_VERSION,
});
const topic = `/event/${configuration.object}`;
const replayId = -1;
this.logger.debug('Creating streaming client');
if (!fayeClient) {
context.logger.debug('Creating streaming client');
if (!fayeClient || status === 'down') {
fayeClient = connection.streaming.createClient([
new jsforce.StreamingExtension.Replay(topic, replayId),
new jsforce.StreamingExtension.AuthFailure(async (err) => {
this.logger.trace('AuthFailure error occurred');
context.logger.trace('AuthFailure error occurred');
if (err.ext && err.ext.sfdc && err.ext.sfdc.failureReason && (err.ext.sfdc.failureReason === '401::Authentication invalid')) {
try {
this.logger.debug('Session is expired, trying to refresh token');
await refreshToken(this, secretId, msg.id);
this.logger.debug('Token is successfully refreshed');
context.logger.debug('Session is expired, trying to refresh token');
await refreshToken(context, secretId, msg.id);
context.logger.debug('Token is successfully refreshed');
} catch (error) {
this.logger.error('Failed to fetch refresh token');
context.logger.error('Failed to fetch refresh token');
throw new Error('Failed to fetch refresh token');
}
fayeClient = undefined;
this.logger.info('Lets call processTrigger one more time');
await processTrigger.call(this, msg, configuration);
context.logger.info('Lets call processTrigger one more time');
await processTrigger.call(context, msg, configuration);
} else {
this.logger.error('AuthFailure extension error occurred');
context.logger.error('AuthFailure extension error occurred');
throw err;
}
}),
]);

fayeClient.subscribe(topic, async (message) => {
this.logger.info('Incoming message found, going to emit...');
await this.emit('data', messages.newMessageWithBody(message));
})
.then(() => {
this.logger.info('Subscribed to PushTopic successfully');
this.logger.trace(`Subscribed to PushTopic: ${topic}`);
},
(err) => {
this.logger.error('Subscriber error occurred');
throw err;
});
this.logger.info('Streaming client created and ready');
fayeClient.on('transport:down', () => {
context.logger.error('Client is offline');
context.emit('error', 'Client is offline');
status = 'down';
context.logger.info('Lets call processTrigger one more time');
processTrigger.call(context, msg, configuration);
});

fayeClient.on('transport:up', () => {
context.logger.info('Client is online');
});

await fayeClient.subscribe(topic, async (message) => {
context.logger.info('Incoming message found, going to emit...');
await context.emit('data', messages.newMessageWithBody(message));
});
status = 'online';

context.logger.info('Subscribed to PushTopic successfully');
context.logger.trace(`Subscribed to PushTopic: ${topic}`);

context.logger.info('Streaming client created and ready');
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function getSecretUri(secretId, isRefresh) {

module.exports.getSecret = async (emitter, secretId, msgId) => {
const secretUri = getSecretUri(secretId);
emitter.logger.info('Going to fetch secret');
emitter.logger.debug('Going to fetch secret');
const ax = axios.create();
addRetryCountInterceptorToAxios(ax);
const secret = await ax.get(secretUri, {
Expand All @@ -62,7 +62,7 @@ module.exports.getSecret = async (emitter, secretId, msgId) => {
},
});
const parsedSecret = secret.data.data.attributes;
emitter.logger.info('Got secret');
emitter.logger.debug('Got secret');
return parsedSecret;
};

Expand Down
2 changes: 1 addition & 1 deletion spec/triggers/streamPlatformEvents.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('streamPlatformEvents trigger', () => {
describe('should emit message', async () => {
before(() => {
sinon.stub(jsforce, 'Connection').callsFake(() => ({
streaming: { createClient: () => ({ subscribe: async (_topic, emit) => { emit('some message'); } }) },
streaming: { createClient: () => ({ subscribe: async (_topic, emit) => { emit('some message'); }, on: () => {} }) },
StreamingExtension: { Replay: () => {}, AuthFailure: () => {} },
}));
});
Expand Down

0 comments on commit 176defa

Please sign in to comment.