Skip to content

Commit

Permalink
Fix a bug unable to receive Salesforce Platform Events (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
if0s committed Oct 9, 2023
1 parent 176defa commit d5ef807
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 43 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.7.2 (September 28, 2023)
* Improvements in `Subscribe to platform events` trigger:
* fixed duplicates retries on connections lost
* fixed incorrect behavior with AuthFailure

## 2.7.1 (September 21, 2023)
* Improvements in `Subscribe to platform events` trigger:
* Added retry on connections lost
Expand Down
2 changes: 1 addition & 1 deletion component.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
"url": "http://www.salesforce.com/",
"version": "2.7.1",
"version": "2.7.2",
"authClientTypes": [
"oauth2"
],
Expand Down
86 changes: 44 additions & 42 deletions lib/triggers/streamPlatformEvents.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-empty */
const jsforce = require('jsforce');
const { messages } = require('elasticio-node');
const { callJSForceMethod } = require('../helpers/wrapper');
Expand All @@ -6,7 +7,8 @@ const { SALESFORCE_API_VERSION } = require('../common.js').globalConsts;

let fayeClient;
let context;
let status;
let refreshTokenNeeded = false;
let creationInProgress = false;
/**
* This method will be called from elastic.io platform providing following data
*
Expand All @@ -15,17 +17,28 @@ let status;
*/
async function processTrigger(msg, configuration) {
context = this;
if (status === 'online') {
if (fayeClient) {
this.logger.info('Subscription is still running, waiting for new messages');
return;
}
if (creationInProgress) {
this.logger.info('Subscription recreate in progress');
return;
}
if (fayeClient) fayeClient = undefined;
creationInProgress = true;
context.logger.info('Starting Subscribe to platform events Trigger');

const { secretId } = configuration;
if (!secretId) {
context.logger.error('secretId is missing in configuration, credentials cannot be fetched');
creationInProgress = false;
throw new Error('secretId is missing in configuration, credentials cannot be fetched');
}
if (refreshTokenNeeded) {
await refreshToken(context, secretId, msg.id);
refreshTokenNeeded = false;
}
context.logger.debug('Fetching credentials by secretId');
const { credentials } = await getSecret(this, secretId, msg.id);
const accessToken = credentials.access_token;
Expand All @@ -39,53 +52,42 @@ async function processTrigger(msg, configuration) {
const topic = `/event/${configuration.object}`;
const replayId = -1;
context.logger.debug('Creating streaming client');
if (!fayeClient || status === 'down') {
fayeClient = connection.streaming.createClient([
new jsforce.StreamingExtension.Replay(topic, replayId),
new jsforce.StreamingExtension.AuthFailure(async (err) => {
context.logger.trace('AuthFailure error occurred');
if (err.ext && err.ext.sfdc && err.ext.sfdc.failureReason && (err.ext.sfdc.failureReason === '401::Authentication invalid')) {
try {
context.logger.debug('Session is expired, trying to refresh token');
await refreshToken(context, secretId, msg.id);
context.logger.debug('Token is successfully refreshed');
} catch (error) {
context.logger.error('Failed to fetch refresh token');
throw new Error('Failed to fetch refresh token');
}
fayeClient = undefined;
context.logger.info('Lets call processTrigger one more time');
await processTrigger.call(context, msg, configuration);
} else {
context.logger.error('AuthFailure extension error occurred');
throw err;
}
}),
]);

fayeClient.on('transport:down', () => {
context.logger.error('Client is offline');
context.emit('error', 'Client is offline');
status = 'down';
fayeClient = connection.streaming.createClient([
new jsforce.StreamingExtension.Replay(topic, replayId),
new jsforce.StreamingExtension.AuthFailure((err) => {
let errMsg = '';
try {
errMsg = JSON.stringify(err);
} catch (e) {
errMsg = err;
}
context.logger.warn(`AuthFailure error occurred ${errMsg}`);
refreshTokenNeeded = true;
fayeClient = undefined;
creationInProgress = false;
context.logger.info('Lets call processTrigger one more time');
processTrigger.call(context, msg, configuration);
});
}),
]);

fayeClient.on('transport:up', () => {
context.logger.info('Client is online');
});
fayeClient.on('transport:down', () => {
context.logger.error('Client is offline');
});

await fayeClient.subscribe(topic, async (message) => {
context.logger.info('Incoming message found, going to emit...');
await context.emit('data', messages.newMessageWithBody(message));
});
status = 'online';
fayeClient.on('transport:up', () => {
context.logger.info('Client is online');
});

context.logger.info('Subscribed to PushTopic successfully');
context.logger.trace(`Subscribed to PushTopic: ${topic}`);
await fayeClient.subscribe(topic, async (message) => {
context.logger.info('Incoming message found, going to emit...');
await context.emit('data', messages.newMessageWithBody(message));
});
creationInProgress = false;
context.logger.info('Subscribed to PushTopic successfully');
context.logger.trace(`Subscribed to PushTopic: ${topic}`);

context.logger.info('Streaming client created and ready');
}
context.logger.info('Streaming client created and ready');
}

/**
Expand Down

0 comments on commit d5ef807

Please sign in to comment.