Skip to content

Commit

Permalink
76 pub sub (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
if0s committed Dec 21, 2023
1 parent 61841af commit 507e7b8
Show file tree
Hide file tree
Showing 11 changed files with 1,662 additions and 337 deletions.
10 changes: 5 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2.1
parameters:
node-version:
type: string
default: "16.13.2"
default: "18.13.0"
orbs:
node: circleci/node@5.0.0
slack: circleci/slack@4.5.3
Expand Down Expand Up @@ -72,7 +72,7 @@ commands:
jobs:
test:
docker:
- image: circleci/node:16-stretch
- image: cimg/node:18.13.0
steps:
- checkout
- node/install:
Expand All @@ -82,13 +82,13 @@ jobs:
override-ci-command: npm install
- run:
name: Audit Dependencies
command: npm run audit
command: npm audit --production --audit-level=high
- run:
name: Running Mocha Tests
command: npm test
build:
docker:
- image: circleci/node:16-stretch
- image: cimg/node:18.13.0
user: root
steps:
- checkout
Expand Down Expand Up @@ -124,4 +124,4 @@ workflows:
branches:
ignore: /.*/
tags:
only: /^([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$/
only: /^([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$/
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2.8.0 (December 21, 2023)
* Added new `Subscribe to PubSub` trigger

## 2.7.3 (November 09, 2023)
* Fixed [issue](https://github.com/elasticio/salesforce-component-v2/issues/72) when real-time flows have authentication errors sometimes

Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [Get Updated Objects Polling](#get-updated-objects-polling)
* [Query Trigger](#query-trigger)
* [Subscribe to platform events (REALTIME FLOWS ONLY)](#subscribe-to-platform-events-realtime-flows-only)
* [Subscribe to PubSub](#subscribe-to-pubsub)
* [Deprecated triggers](#deprecated-triggers)
* [Actions](#actions)
* [Bulk Create/Update/Delete/Upsert](#bulk-createupdatedeleteupsert)
Expand Down Expand Up @@ -122,6 +123,30 @@ You can find more detail information in the [Platform Events Intro Documentation
#### Limitations:
At the moment this trigger can be used only for **"Realtime"** flows.

### Subscribe to PubSub
This trigger will subscribe for any platform Event using [Pub/Sub API](https://developer.salesforce.com/docs/platform/pub-sub-api/overview).

#### Configuration Fields
* **Event object name** - (dropdown, required): Input field where you should select the type of platform event to which you want to subscribe E.g. `My platform event`
* **Pub/Sub API Endpoint** - (string, optional): You can set Pub/Sub API Endpoint manually or leave it blank for default: `api.pubsub.salesforce.com:7443`. Details about Pub/Sub API Endpoints can be found [here](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/pub-sub-endpoints.html)
* **Number of events per request** - (positive integer, optional, defaults to 10, max 100): Salesforce uses batches of events to deliver to the component, the bigger number may increase processing speed, but if the batch size is too big, you can get out of memory error. If there are fewer events ready than the batch size, they will be delivered anyway.
* **Start from Replay Id** - (positive integer, optional): In the Salesforce platform events and change data capture events are retained in the event bus for 3 days and you can subscribe at any position in the stream by providing here Replay Id from the last event. This field is used only for the first execution, following executions will use the Replay Id from the latest event to get a new one.

#### Input Metadata

None.

#### Output Metadata

* **event** - (object, required): Store `replayId` of this message which can be used to retrieve records that were created after (using it as `Start from Replay Id` in configuration)
* **payload** - (object, required): Dynamically generated content of the event

#### Limitations:
* If you use **"Ordinary"** flow:
* Make sure that you execute it at least once per 3 days - according to the [documentation](https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html#replaying-an-event-stream) Salesforce stores events for up to 3 days.
* The component starts tracking changes after the first execution
* To `Retrieve new sample from Salesforce v2` you need to trigger an event on Salesforce side or provide a sample manually

### Deprecated triggers

<details>
Expand Down
69 changes: 67 additions & 2 deletions component.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "Customer relationship management (CRM) software & cloud computing from the leader in CRM solutions for businesses large & small.",
"docsUrl": "https://github.com/elasticio/salesforce-component-v2",
"url": "http://www.salesforce.com/",
"version": "2.7.3",
"version": "2.8.0",
"authClientTypes": [
"oauth2"
],
Expand Down Expand Up @@ -253,6 +253,71 @@
"prompt": "Please select a Event object"
}
}
},
"streamPlatformEventsPubSub": {
"title": "Subscribe to PubSub",
"order": 97,
"help": {
"description": "Can be used for subscription to the specified in the configuration Platform Event object",
"link": "/components/salesforce/index.html#subscribe-to-pubsub"
},
"main": "./lib/triggers/streamPlatformEventsPubSub.js",
"type": "polling",
"fields": {
"object": {
"viewClass": "SelectView",
"label": "Event object name",
"required": true,
"model": "objectTypes",
"prompt": "Please select Event object"
},
"pubsubEndpoint": {
"label": "Pub/Sub API Endpoint",
"viewClass": "TextFieldView",
"required": false,
"placeholder": "api.pubsub.salesforce.com:7443",
"help": {
"description": "You can set Pub/Sub API Endpoint manually or leave it blank for default: \"api.pubsub.salesforce.com:7443\""
}
},
"eventCountPerRequest": {
"label": "Number of events per request",
"viewClass": "TextFieldView",
"required": false,
"placeholder": "10",
"help": {
"description": "Salesforce uses batches of events to deliver to the component, the bigger number may increase processing speed, but if the batch size is too big, you can get out of memory error. If there are fewer events ready than the batch size, they will be delivered anyway. Positive integer, defaults to 10, max 100"
}
},
"initialReplayId": {
"label": "Start from Replay Id",
"viewClass": "TextFieldView",
"required": false,
"placeholder": "31142963",
"help": {
"description": "In the Salesforce platform events and change data capture events are retained in the event bus for 3 days and you can subscribe at any position in the stream by providing here Replay Id from the last event. This field is used only for the first execution, following executions will use the Replay Id from the latest event to get a new one. Positive integer"
}
}
},
"metadata": {
"out": {
"type": "object",
"properties": {
"event": {
"type": "object",
"properties": {
"replayId": {
"type": "number"
}
}
},
"payload": {
"type": "object",
"properties": {}
}
}
}
}
}
},
"actions": {
Expand Down Expand Up @@ -638,4 +703,4 @@
}
}
}
}
}
196 changes: 196 additions & 0 deletions lib/PubSubClient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/* eslint-disable no-param-reassign, no-nested-ternary, no-undef */
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const certifi = require('certifi');
const avro = require('avro-js');
const { EventEmitter } = require('events');
const fs = require('fs').promises;

const DEFAULT_PUBSUB_API_ENDPOINT = 'api.pubsub.salesforce.com:7443';
const CUSTOM_LONG_AVRO_TYPE = avro.types.LongType.using({
fromBuffer: (buf) => {
const big = buf.readBigInt64LE();
if (big < Number.MIN_SAFE_INTEGER || big > Number.MAX_SAFE_INTEGER) {
return big;
}
return Number(BigInt.asIntN(64, big));
},
toBuffer: (n) => {
const buf = Buffer.allocUnsafe(8);
if (n instanceof BigInt) {
buf.writeBigInt64LE(n);
} else {
buf.writeBigInt64LE(BigInt(n));
}
return buf;
},
fromJSON: BigInt,
toJSON: Number,
isValid: (n) => {
const type = typeof n;
return (type === 'number' && n % 1 === 0) || type === 'bigint';
},
compare: (n1, n2) => (n1 === n2 ? 0 : n1 < n2 ? -1 : 1),
});

class PubSubClient {
logger;

cfg;

topicName;

client;

schema;

lastKeepAlive;

subscription;

eventEmitter;

constructor(logger, cfg) {
this.logger = logger;
this.cfg = cfg;
this.topicName = `/event/${this.cfg.object}`;
}

async connect(accessToken, instanceUrl) {
const rootCert = await fs.readFile(certifi);
const tenantId = accessToken.split('!')[0];
const packageDefinition = await protoLoader.load(`${__dirname}/helpers/pubsub_api.proto`);
const grpcObj = grpc.loadPackageDefinition(packageDefinition);
const sfdcPackage = grpcObj.eventbus.v1;
const metaCallback = (_params, callback) => {
const meta = new grpc.Metadata();
meta.add('accesstoken', accessToken);
meta.add('instanceurl', instanceUrl);
meta.add('tenantid', tenantId);
callback(null, meta);
};
const callCreds = grpc.credentials.createFromMetadataGenerator(metaCallback);
const combCreds = grpc.credentials.combineChannelCredentials(grpc.credentials.createSsl(rootCert), callCreds);
this.client = new sfdcPackage.PubSub(
this.cfg.pubsubEndpoint || DEFAULT_PUBSUB_API_ENDPOINT,
combCreds,
);
this.logger.info('Connected to Pub/Sub API endpoint');
try {
await this.loadSchema();
this.logger.info('Schema loaded');
} catch (err) {
throw new Error(`Failed to load schema: ${err.message}`);
}
}

setLogger(logger) { this.logger = logger; }

async loadSchema() {
if (this.schema) return;
this.schema = await new Promise((resolve, reject) => {
this.client.GetTopic({ topicName: this.topicName }, (topicError, response) => {
if (topicError) {
reject(topicError);
} else {
const { schemaId } = response;
this.client.GetSchema({ schemaId }, (schemaError, res) => {
if (schemaError) {
reject(schemaError);
} else {
const schemaType = avro.parse(res.schemaJson, { registry: { long: CUSTOM_LONG_AVRO_TYPE } });
resolve({
id: schemaId,
type: schemaType,
});
}
});
}
});
});
}

flattenSinglePropertyObjects(theObject) {
Object.entries(theObject).forEach(([key, value]) => {
if (key !== 'ChangeEventHeader' && value && typeof value === 'object') {
const subKeys = Object.keys(value);
if (subKeys.length === 1) {
const subValue = value[subKeys[0]];
theObject[key] = subValue;
if (subValue && typeof subValue === 'object') {
this.flattenSinglePropertyObjects(theObject[key]);
}
}
}
});
}

async subscribe(numRequested, fromReplayId) {
const eventEmitter = new EventEmitter();
this.subscription = this.client.Subscribe();
const writeOptions = { topicName: this.topicName, numRequested };

if (fromReplayId) {
const buf = Buffer.allocUnsafe(8);
buf.writeBigUInt64BE(BigInt(fromReplayId), 0);
writeOptions.replayPreset = 2;
writeOptions.replayId = buf;
}
this.logger.info(`Requesting first ${numRequested} records`);
this.subscription.write(writeOptions);

this.subscription.on('data', (data) => {
const latestReplayId = Number(data.latestReplayId.readBigUInt64BE());
if (data.events) {
for (const event of data.events) {
const replayId = Number(event.replayId.readBigUInt64BE());
const payload = this.schema.type.fromBuffer(event.event.payload);
this.flattenSinglePropertyObjects(payload);
eventEmitter.emit('data', { event: { replayId }, payload });
}
if (!data.pendingNumRequested) {
this.logger.info(`requesting ${numRequested} more records`);
this.subscription.write({ topicName: this.topicName, numRequested });
}
} else {
this.logger.debug(`Received keepalive message. Latest replay ID: ${latestReplayId}`);
data.latestReplayId = latestReplayId;
this.lastKeepAlive = new Date().getTime();
eventEmitter.emit('keepalive', data);
}
});
this.subscription.on('end', () => {
this.logger.warn('gRPC stream ended');
eventEmitter.emit('end');
});
this.subscription.on('error', (error) => {
this.logger.error(`gRPC stream error: ${JSON.stringify(error)}`);
eventEmitter.emit('error', error);
});
this.subscription.on('status', (status) => {
this.logger.warn(`gRPC stream status: ${JSON.stringify(status)}`);
eventEmitter.emit('status', status);
});
this.eventEmitter = eventEmitter;

return this.eventEmitter;
}

getLastKeepAlive() {
return this.lastKeepAlive;
}

close() {
try {
this.eventEmitter.removeAllListeners();
this.eventEmitter = null;
this.subscription.removeAllListeners();
this.subscription = null;
this.client.close();
this.client = null;
// eslint-disable-next-line no-empty
} catch (_e) {}
}
}

module.exports.PubSubClient = PubSubClient;
Loading

0 comments on commit 507e7b8

Please sign in to comment.