From d5ef807f7a133ebd7437679a7d5ae54ee90413fe Mon Sep 17 00:00:00 2001 From: Alex Motsak Date: Mon, 9 Oct 2023 14:23:29 +0300 Subject: [PATCH] Fix a bug unable to receive Salesforce Platform Events (#71) --- CHANGELOG.md | 5 ++ component.json | 2 +- lib/triggers/streamPlatformEvents.js | 86 ++++++++++++++-------------- 3 files changed, 50 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df58f54..1782f13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 2.7.2 (September 28, 2023) +* Improvements in `Subscribe to platform events` trigger: + * fixed duplicates retries on connections lost + * fixed incorrect behavior with AuthFailure + ## 2.7.1 (September 21, 2023) * Improvements in `Subscribe to platform events` trigger: * Added retry on connections lost diff --git a/component.json b/component.json index f2f1aa4..a5f226d 100644 --- a/component.json +++ b/component.json @@ -3,7 +3,7 @@ "description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.", "docsUrl": "https://github.com/elasticio/salesforce-component-v2", "url": "http://www.salesforce.com/", - "version": "2.7.1", + "version": "2.7.2", "authClientTypes": [ "oauth2" ], diff --git a/lib/triggers/streamPlatformEvents.js b/lib/triggers/streamPlatformEvents.js index e049836..d38ed05 100644 --- a/lib/triggers/streamPlatformEvents.js +++ b/lib/triggers/streamPlatformEvents.js @@ -1,3 +1,4 @@ +/* eslint-disable no-empty */ const jsforce = require('jsforce'); const { messages } = require('elasticio-node'); const { callJSForceMethod } = require('../helpers/wrapper'); @@ -6,7 +7,8 @@ const { SALESFORCE_API_VERSION } = require('../common.js').globalConsts; let fayeClient; let context; -let status; +let refreshTokenNeeded = false; +let creationInProgress = false; /** * This method will be called from elastic.io platform providing following data * @@ -15,17 +17,28 @@ let status; */ async function processTrigger(msg, configuration) { context = this; - if (status === 'online') { + if (fayeClient) { this.logger.info('Subscription is still running, waiting for new messages'); return; } + if (creationInProgress) { + this.logger.info('Subscription recreate in progress'); + return; + } + if (fayeClient) fayeClient = undefined; + creationInProgress = true; context.logger.info('Starting Subscribe to platform events Trigger'); const { secretId } = configuration; if (!secretId) { context.logger.error('secretId is missing in configuration, credentials cannot be fetched'); + creationInProgress = false; throw new Error('secretId is missing in configuration, credentials cannot be fetched'); } + if (refreshTokenNeeded) { + await refreshToken(context, secretId, msg.id); + refreshTokenNeeded = false; + } context.logger.debug('Fetching credentials by secretId'); const { credentials } = await getSecret(this, secretId, msg.id); const accessToken = credentials.access_token; @@ -39,53 +52,42 @@ async function processTrigger(msg, configuration) { const topic = `/event/${configuration.object}`; const replayId = -1; context.logger.debug('Creating streaming client'); - if (!fayeClient || status === 'down') { - fayeClient = connection.streaming.createClient([ - new jsforce.StreamingExtension.Replay(topic, replayId), - new jsforce.StreamingExtension.AuthFailure(async (err) => { - context.logger.trace('AuthFailure error occurred'); - if (err.ext && err.ext.sfdc && err.ext.sfdc.failureReason && (err.ext.sfdc.failureReason === '401::Authentication invalid')) { - try { - context.logger.debug('Session is expired, trying to refresh token'); - await refreshToken(context, secretId, msg.id); - context.logger.debug('Token is successfully refreshed'); - } catch (error) { - context.logger.error('Failed to fetch refresh token'); - throw new Error('Failed to fetch refresh token'); - } - fayeClient = undefined; - context.logger.info('Lets call processTrigger one more time'); - await processTrigger.call(context, msg, configuration); - } else { - context.logger.error('AuthFailure extension error occurred'); - throw err; - } - }), - ]); - fayeClient.on('transport:down', () => { - context.logger.error('Client is offline'); - context.emit('error', 'Client is offline'); - status = 'down'; + fayeClient = connection.streaming.createClient([ + new jsforce.StreamingExtension.Replay(topic, replayId), + new jsforce.StreamingExtension.AuthFailure((err) => { + let errMsg = ''; + try { + errMsg = JSON.stringify(err); + } catch (e) { + errMsg = err; + } + context.logger.warn(`AuthFailure error occurred ${errMsg}`); + refreshTokenNeeded = true; + fayeClient = undefined; + creationInProgress = false; context.logger.info('Lets call processTrigger one more time'); processTrigger.call(context, msg, configuration); - }); + }), + ]); - fayeClient.on('transport:up', () => { - context.logger.info('Client is online'); - }); + fayeClient.on('transport:down', () => { + context.logger.error('Client is offline'); + }); - await fayeClient.subscribe(topic, async (message) => { - context.logger.info('Incoming message found, going to emit...'); - await context.emit('data', messages.newMessageWithBody(message)); - }); - status = 'online'; + fayeClient.on('transport:up', () => { + context.logger.info('Client is online'); + }); - context.logger.info('Subscribed to PushTopic successfully'); - context.logger.trace(`Subscribed to PushTopic: ${topic}`); + await fayeClient.subscribe(topic, async (message) => { + context.logger.info('Incoming message found, going to emit...'); + await context.emit('data', messages.newMessageWithBody(message)); + }); + creationInProgress = false; + context.logger.info('Subscribed to PushTopic successfully'); + context.logger.trace(`Subscribed to PushTopic: ${topic}`); - context.logger.info('Streaming client created and ready'); - } + context.logger.info('Streaming client created and ready'); } /**