From e648f5dfe43bea7e1d652c67417b328f93b2f1f2 Mon Sep 17 00:00:00 2001 From: moyosore Date: Wed, 12 Apr 2023 14:07:56 +0100 Subject: [PATCH 1/2] realtime: add subscription filters support This commit adds a new getDerived method for obtaining an instance of a "derived channel" which applies a filter to messages in the upstream channel. The channel filter is specified via a base64 encoded JMESPath expression which is given as the value of a new channel qualifier called "filter". This feature is currently only available in preview and a feature flag must be enabled on your Ably account to use it. --- ably.d.ts | 20 +++++ src/common/lib/client/auth.ts | 12 +-- src/common/lib/client/realtime.ts | 9 ++ src/common/lib/util/utils.ts | 35 ++++++++ test/common/ably-common | 2 +- test/realtime/message.test.js | 140 ++++++++++++++++++++++++++++++ 6 files changed, 207 insertions(+), 11 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index 1627a9b540..3f8774070c 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -986,6 +986,16 @@ declare namespace Types { modes?: ChannelModes; } + /** + * Passes additional properties to a {@link RealtimeChannelBase} name to produce a new derived channel + */ + interface DeriveOptions { + /** + * The JMESPath Query filter string to be used to derive new channel. + */ + filter?: string; + } + /** * The `RestHistoryParams` interface describes the parameters accepted by the following methods: * @@ -2752,6 +2762,16 @@ declare namespace Types { * @returns A {@link ChannelBase} or {@link RealtimeChannelBase} object. */ get(name: string, channelOptions?: ChannelOptions): T; + /** + * Creates a new {@link ChannelBase} or {@link RealtimeChannelBase} object, with the specified channel {@link DeriveOptions} + * and {@link ChannelOptions}, or returns the existing channel object. + * + * @param name - The channel name. + * @param deriveOptions - A {@link DeriveOptions} object. + * @param channelOptions - A {@link ChannelOptions} object. + * @returns A {@link RealtimeChannelBase} object. + */ + getDerived(name: string, deriveOptions: DeriveOptions, channelOptions?: ChannelOptions): T; /** * Releases a {@link ChannelBase} or {@link RealtimeChannelBase} object, deleting it, and enabling it to be garbage collected. It also removes any listeners associated with the channel. To release a channel, the {@link ChannelState} must be `INITIALIZED`, `DETACHED`, or `FAILED`. * diff --git a/src/common/lib/client/auth.ts b/src/common/lib/client/auth.ts index 7178d4b6fc..9be9b824a6 100644 --- a/src/common/lib/client/auth.ts +++ b/src/common/lib/client/auth.ts @@ -4,7 +4,6 @@ import Multicaster from '../util/multicaster'; import ErrorInfo, { IPartialErrorInfo } from '../types/errorinfo'; import HmacSHA256 from 'crypto-js/build/hmac-sha256'; import { stringify as stringifyBase64 } from 'crypto-js/build/enc-base64'; -import { parse as parseUtf8 } from 'crypto-js/build/enc-utf8'; import { createHmac } from 'crypto'; import { ErrnoException, RequestCallback, RequestParams } from '../../types/http'; import * as API from '../../../../ably'; @@ -43,13 +42,6 @@ function normaliseAuthcallbackError(err: any) { return err; } -let toBase64 = (str: string): string => { - if (Platform.Config.createHmac) { - return Buffer.from(str, 'ascii').toString('base64'); - } - return stringifyBase64(parseUtf8(str)); -}; - let hmac = (text: string, key: string): string => { if (Platform.Config.createHmac) { const inst = (Platform.Config.createHmac as typeof createHmac)('SHA256', key); @@ -886,7 +878,7 @@ class Auth { if (!tokenDetails) { throw new Error('Auth.getAuthParams(): _ensureValidAuthCredentials returned no error or tokenDetails'); } - callback(null, { authorization: 'Bearer ' + toBase64(tokenDetails.token) }); + callback(null, { authorization: 'Bearer ' + Utils.toBase64(tokenDetails.token) }); }); } } @@ -916,7 +908,7 @@ class Auth { _saveBasicOptions(authOptions: API.Types.AuthOptions) { this.method = 'basic'; this.key = authOptions.key; - this.basicKey = toBase64(authOptions.key as string); + this.basicKey = Utils.toBase64(authOptions.key as string); this.authOptions = authOptions || {}; if ('clientId' in authOptions) { this._userSetClientId(authOptions.clientId); diff --git a/src/common/lib/client/realtime.ts b/src/common/lib/client/realtime.ts index 9387ccfcf5..2098f5643e 100644 --- a/src/common/lib/client/realtime.ts +++ b/src/common/lib/client/realtime.ts @@ -161,6 +161,15 @@ class Channels extends EventEmitter { return channel; } + getDerived(name: string, deriveOptions: API.Types.DeriveOptions, channelOptions?: ChannelOptions) { + if (deriveOptions.filter) { + const filter = Utils.toBase64(deriveOptions.filter); + const match = Utils.matchDerivedChannel(name); + name = `[filter=${filter}${match.qualifierParam}]${match.channelName}`; + } + return this.get(name, channelOptions); + } + /* Included to support certain niche use-cases; most users should ignore this. * Please do not use this unless you know what you're doing */ release(name: string) { diff --git a/src/common/lib/util/utils.ts b/src/common/lib/util/utils.ts index eecedc55be..6c76ce7f85 100644 --- a/src/common/lib/util/utils.ts +++ b/src/common/lib/util/utils.ts @@ -2,6 +2,8 @@ import Platform from 'common/platform'; import Defaults, { getAgentString } from './defaults'; import ErrorInfo, { PartialErrorInfo } from 'common/lib/types/errorinfo'; import { NormalisedClientOptions } from 'common/types/ClientOptions'; +import { stringify as stringifyBase64 } from 'crypto-js/build/enc-base64'; +import { parse as parseUtf8 } from 'crypto-js/build/enc-utf8'; function randomPosn(arrOrStr: Array | string) { return Math.floor(Math.random() * arrOrStr.length); @@ -555,3 +557,36 @@ export function shallowEquals(source: Record, target: Record target[key] === source[key]) ); } + +export function matchDerivedChannel(name: string) { + /** + * This regex check is to retain existing channel params if any e.g [?rewind=1]foo to + * [filter=xyz?rewind=1]foo. This is to keep channel compatibility around use of + * channel params that work with derived channels. + * + * This eslint unsafe regex warning is triggered because the RegExp uses nested quantifiers, + * but it does not create any situation where the regex engine has to + * explore a large number of possible matches so it’s safe to ignore + */ + const regex = /^(\[([^?]*)(?:(.*))\])?(.+)$/; // eslint-disable-line + const match = name.match(regex); + if (!match || !match.length || match.length < 5) { + throw new ErrorInfo('regex match failed', 400, 40010); + } + // Fail if there is already a channel qualifier, eg [meta]foo should fail instead of just overriding with [filter=xyz]foo + if (match![2]) { + throw new ErrorInfo(`cannot use a derived option with a ${match[2]} channel`, 400, 40010); + } + // Return match values to be added to derive channel quantifier. + return { + qualifierParam: match[3] || '', + channelName: match[4], + }; +} + +export function toBase64(str: string) { + if (Platform.Config.createHmac) { + return Buffer.from(str, 'ascii').toString('base64'); + } + return stringifyBase64(parseUtf8(str)); +} diff --git a/test/common/ably-common b/test/common/ably-common index ca89c9d8d5..0b6eb25646 160000 --- a/test/common/ably-common +++ b/test/common/ably-common @@ -1 +1 @@ -Subproject commit ca89c9d8d5f2cdef40546cbc704b4e5052c94b06 +Subproject commit 0b6eb25646cf62d999fb42cda1fefb6af533b2e5 diff --git a/test/realtime/message.test.js b/test/realtime/message.test.js index 3fe47724dc..84e63d2135 100644 --- a/test/realtime/message.test.js +++ b/test/realtime/message.test.js @@ -1183,5 +1183,145 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }); }); } + + it('subscribes to filtered channel', function (done) { + var testData = [ + { + name: 'filtered', + data: 'These headers match the JMESPath expression so this message should not be filtered', + extras: { + headers: { + name: 'Lorem Choo', + number: 26095, + bool: true, + }, + }, + }, + { + name: 'filtered', + data: 'Random data with no extras for filter', + }, + { + name: 'filtered', + data: 'This message should also not be filtered', + extras: { + headers: { + name: 'John Bull', + number: 26095, + bool: false, + }, + }, + }, + { + name: 'filtered', + data: 'No header on message', + }, + { + name: 'filtered', + data: 'Can be filtered because it does not meet filter condition on headers.number', + extras: { + headers: { + name: 'John Bull', + number: 12345, + bool: false, + }, + }, + }, + { + name: 'end', + data: 'last message check', + }, + ]; + + var filterOption = { + filter: 'name == `"filtered"` && headers.number == `26095`', + }; + + try { + /* set up realtime */ + var realtime = helper.AblyRealtime({ key: helper.getTestApp().keys[5].keyStr }); + var rest = helper.AblyRest(); + + realtime.connection.on('connected', function () { + var rtFilteredChannel = realtime.channels.getDerived('chan', filterOption); + var rtUnfilteredChannel = realtime.channels.get('chan'); + + var filteredMessages = []; + var unFilteredMessages = []; + /* subscribe to event */ + rtFilteredChannel.attach(function (err) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + rtFilteredChannel.subscribe(function (msg) { + try { + // Push received filtered messages into an array + filteredMessages.push(msg); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + }); + + rtUnfilteredChannel.attach(function (err) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } + + rtUnfilteredChannel.subscribe(function (msg) { + try { + // Push received unfiltered messages into an array + unFilteredMessages.push(msg); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + }); + + // Subscription to check all messages were received as expected + rtUnfilteredChannel.subscribe('end', function (msg) { + try { + expect(msg.data).to.equal(testData[testData.length - 1].data, 'Unexpected msg data received'); + + // Check that we receive expected messages on filtered channel + expect(filteredMessages.length).to.equal(2, 'Expect only two filtered message to be received'); + expect(filteredMessages[0].data).to.equal(testData[0].data, 'Unexpected data received'); + expect(filteredMessages[1].data).to.equal(testData[2].data, 'Unexpected data received'); + expect(filteredMessages[0].extras.headers.name).to.equal( + testData[0].extras.headers.name, + 'Unexpected header value received' + ); + expect(filteredMessages[1].extras.headers.name).to.equal( + testData[2].extras.headers.name, + 'Unexpected header value received' + ); + // Check that message with header that doesn't meet filtering condition is not received. + for (msg of filteredMessages) { + expect(msg.extras.headers.number).to.equal(26095, 'Unexpected header filtering value received'); + } + + // Check that we receive expected messages on unfiltered channel, including the `end` event message + expect(unFilteredMessages.length).to.equal(6, 'Expect only 6 unfiltered message to be received'); + for (var i = 0; i < unFilteredMessages.length; i++) { + expect(unFilteredMessages[i].data).to.equal(testData[i].data, 'Unexpected data received'); + } + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + closeAndFinish(done, realtime); + }); + var restChannel = rest.channels.get('chan'); + restChannel.publish(testData); + }); + }); + }); + monitorConnection(done, realtime); + } catch (err) { + closeAndFinish(done, realtime, err); + } + }); }); }); From 6221fca581495dbfe5ed8f449b0b9495d5b7f590 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Fri, 26 May 2023 14:03:55 +0100 Subject: [PATCH 2/2] docs: mark getDerived method as experimental --- ably.d.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ably.d.ts b/ably.d.ts index 3f8774070c..7d40a6a19d 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2763,6 +2763,8 @@ declare namespace Types { */ get(name: string, channelOptions?: ChannelOptions): T; /** + * @experimental This is a preview feature and may change in a future non-major release. + * * Creates a new {@link ChannelBase} or {@link RealtimeChannelBase} object, with the specified channel {@link DeriveOptions} * and {@link ChannelOptions}, or returns the existing channel object. *