Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DSC-54] Integration/subscription filters #1306

Merged
merged 2 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,16 @@
modes?: ChannelModes;
}

/**
* Passes additional properties to a {@link RealtimeChannelBase} name to produce a new derived channel
*/
interface DeriveOptions {
/**
* The JMESPath Query filter string to be used to derive new channel.
*/
filter?: string;
}

/**
* The `RestHistoryParams` interface describes the parameters accepted by the following methods:
*
Expand Down Expand Up @@ -2752,6 +2762,18 @@
* @returns A {@link ChannelBase} or {@link RealtimeChannelBase} object.
*/
get(name: string, channelOptions?: ChannelOptions): T;
/**
* @experimental This is a preview feature and may change in a future non-major release.

Check warning on line 2766 in ably.d.ts

View workflow job for this annotation

GitHub Actions / lint

Invalid JSDoc tag name "experimental"
*

Check warning on line 2767 in ably.d.ts

View workflow job for this annotation

GitHub Actions / lint

Expected no lines between tags
* Creates a new {@link ChannelBase} or {@link RealtimeChannelBase} object, with the specified channel {@link DeriveOptions}
* and {@link ChannelOptions}, or returns the existing channel object.
*
* @param name - The channel name.
* @param deriveOptions - A {@link DeriveOptions} object.
* @param channelOptions - A {@link ChannelOptions} object.
* @returns A {@link RealtimeChannelBase} object.
*/
getDerived(name: string, deriveOptions: DeriveOptions, channelOptions?: ChannelOptions): T;
/**
* Releases a {@link ChannelBase} or {@link RealtimeChannelBase} object, deleting it, and enabling it to be garbage collected. It also removes any listeners associated with the channel. To release a channel, the {@link ChannelState} must be `INITIALIZED`, `DETACHED`, or `FAILED`.
*
Expand Down
12 changes: 2 additions & 10 deletions src/common/lib/client/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import Multicaster from '../util/multicaster';
import ErrorInfo, { IPartialErrorInfo } from '../types/errorinfo';
import HmacSHA256 from 'crypto-js/build/hmac-sha256';
import { stringify as stringifyBase64 } from 'crypto-js/build/enc-base64';
import { parse as parseUtf8 } from 'crypto-js/build/enc-utf8';
import { createHmac } from 'crypto';
import { ErrnoException, RequestCallback, RequestParams } from '../../types/http';
import * as API from '../../../../ably';
Expand Down Expand Up @@ -43,13 +42,6 @@ function normaliseAuthcallbackError(err: any) {
return err;
}

let toBase64 = (str: string): string => {
if (Platform.Config.createHmac) {
return Buffer.from(str, 'ascii').toString('base64');
}
return stringifyBase64(parseUtf8(str));
};

let hmac = (text: string, key: string): string => {
if (Platform.Config.createHmac) {
const inst = (Platform.Config.createHmac as typeof createHmac)('SHA256', key);
Expand Down Expand Up @@ -886,7 +878,7 @@ class Auth {
if (!tokenDetails) {
throw new Error('Auth.getAuthParams(): _ensureValidAuthCredentials returned no error or tokenDetails');
}
callback(null, { authorization: 'Bearer ' + toBase64(tokenDetails.token) });
callback(null, { authorization: 'Bearer ' + Utils.toBase64(tokenDetails.token) });
});
}
}
Expand Down Expand Up @@ -916,7 +908,7 @@ class Auth {
_saveBasicOptions(authOptions: API.Types.AuthOptions) {
this.method = 'basic';
this.key = authOptions.key;
this.basicKey = toBase64(authOptions.key as string);
this.basicKey = Utils.toBase64(authOptions.key as string);
this.authOptions = authOptions || {};
if ('clientId' in authOptions) {
this._userSetClientId(authOptions.clientId);
Expand Down
9 changes: 9 additions & 0 deletions src/common/lib/client/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ class Channels extends EventEmitter {
return channel;
}

getDerived(name: string, deriveOptions: API.Types.DeriveOptions, channelOptions?: ChannelOptions) {
if (deriveOptions.filter) {
const filter = Utils.toBase64(deriveOptions.filter);
const match = Utils.matchDerivedChannel(name);
name = `[filter=${filter}${match.qualifierParam}]${match.channelName}`;
}
return this.get(name, channelOptions);
}

/* Included to support certain niche use-cases; most users should ignore this.
* Please do not use this unless you know what you're doing */
release(name: string) {
Expand Down
35 changes: 35 additions & 0 deletions src/common/lib/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import Platform from 'common/platform';
import Defaults, { getAgentString } from './defaults';
import ErrorInfo, { PartialErrorInfo } from 'common/lib/types/errorinfo';
import { NormalisedClientOptions } from 'common/types/ClientOptions';
import { stringify as stringifyBase64 } from 'crypto-js/build/enc-base64';
import { parse as parseUtf8 } from 'crypto-js/build/enc-utf8';

function randomPosn(arrOrStr: Array<unknown> | string) {
return Math.floor(Math.random() * arrOrStr.length);
Expand Down Expand Up @@ -555,3 +557,36 @@ export function shallowEquals(source: Record<string, unknown>, target: Record<st
Object.keys(target).every((key) => target[key] === source[key])
);
}

export function matchDerivedChannel(name: string) {
/**
* This regex check is to retain existing channel params if any e.g [?rewind=1]foo to
* [filter=xyz?rewind=1]foo. This is to keep channel compatibility around use of
* channel params that work with derived channels.
*
* This eslint unsafe regex warning is triggered because the RegExp uses nested quantifiers,
* but it does not create any situation where the regex engine has to
* explore a large number of possible matches so it’s safe to ignore
*/
const regex = /^(\[([^?]*)(?:(.*))\])?(.+)$/; // eslint-disable-line
const match = name.match(regex);
if (!match || !match.length || match.length < 5) {
throw new ErrorInfo('regex match failed', 400, 40010);
}
// Fail if there is already a channel qualifier, eg [meta]foo should fail instead of just overriding with [filter=xyz]foo
if (match![2]) {
throw new ErrorInfo(`cannot use a derived option with a ${match[2]} channel`, 400, 40010);
}
// Return match values to be added to derive channel quantifier.
return {
qualifierParam: match[3] || '',
channelName: match[4],
};
}

export function toBase64(str: string) {
if (Platform.Config.createHmac) {
return Buffer.from(str, 'ascii').toString('base64');
}
return stringifyBase64(parseUtf8(str));
}
2 changes: 1 addition & 1 deletion test/common/ably-common
Submodule ably-common updated 48 files
+19 −0 .editorconfig
+25 −0 .eslintrc.js
+44 −0 .github/workflows/check.yml
+33 −0 .github/workflows/publish-json-schemas.yml
+38 −0 .github/workflows/publish.yml
+4 −0 .gitignore
+9 −0 .markdownlint-cli2.yaml
+2 −0 .tool-versions
+45 −0 CONTRIBUTING.md
+1 −0 COPYRIGHT
+176 −0 LICENSE
+6 −0 MAINTAINERS.md
+34 −0 README.md
+3 −0 go.mod
+110 −0 go/cmd/ablyagent/main.go
+36 −0 json-schemas/README.md
+69 −0 json-schemas/publish.js
+852 −0 json-schemas/src/account-stats.json
+42 −0 json-schemas/src/agents.json
+802 −0 json-schemas/src/app-stats.json
+31 −0 json-schemas/src/channel-lifecycle.json
+81 −0 json-schemas/src/client-events-api-requests.json
+121 −0 json-schemas/src/client-events-connections.json
+8 −0 json-schemas/versions.json
+29 −0 network/README.md
+935 −0 network/aws-edge-locations.json
+8 −0 network/datacenter-locations.csv
+13,776 −0 package-lock.json
+33 −0 package.json
+1 −0 protocol/.ruby-version
+7 −0 protocol/Gemfile
+28 −0 protocol/Gemfile.lock
+146 −1 protocol/README.md
+30 −0 protocol/Rakefile
+257 −0 protocol/agents.json
+169 −162 protocol/errors.json
+59 −34 protocol/errorsHelp.json
+33 −0 scripts/build-go.sh
+55 −0 scripts/publish-go.sh
+6 −0 templates/README.md
+20 −0 templates/sdk-contributing.md
+94 −0 templates/sdk-readme.md
+2 −0 test-resources/README.md
+20 −20 test-resources/messages-encoding.json
+75 −0 test-resources/msgpack_test_fixtures.json
+25 −25 test-resources/presence-messages-encoding.json
+21 −5 test-resources/test-app-setup.json
+18 −0 test/agents.test.js
140 changes: 140 additions & 0 deletions test/realtime/message.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1183,5 +1183,145 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
});
});
}

it('subscribes to filtered channel', function (done) {
var testData = [
{
name: 'filtered',
data: 'These headers match the JMESPath expression so this message should not be filtered',
extras: {
headers: {
name: 'Lorem Choo',
number: 26095,
bool: true,
},
},
},
{
name: 'filtered',
data: 'Random data with no extras for filter',
},
{
name: 'filtered',
data: 'This message should also not be filtered',
extras: {
headers: {
name: 'John Bull',
number: 26095,
bool: false,
},
},
},
{
name: 'filtered',
data: 'No header on message',
},
{
name: 'filtered',
data: 'Can be filtered because it does not meet filter condition on headers.number',
extras: {
headers: {
name: 'John Bull',
number: 12345,
bool: false,
},
},
},
{
name: 'end',
data: 'last message check',
},
];

var filterOption = {
filter: 'name == `"filtered"` && headers.number == `26095`',
};

try {
/* set up realtime */
var realtime = helper.AblyRealtime({ key: helper.getTestApp().keys[5].keyStr });
var rest = helper.AblyRest();

realtime.connection.on('connected', function () {
var rtFilteredChannel = realtime.channels.getDerived('chan', filterOption);
var rtUnfilteredChannel = realtime.channels.get('chan');

var filteredMessages = [];
var unFilteredMessages = [];
/* subscribe to event */
rtFilteredChannel.attach(function (err) {
if (err) {
closeAndFinish(done, realtime, err);
return;
}
rtFilteredChannel.subscribe(function (msg) {
try {
// Push received filtered messages into an array
filteredMessages.push(msg);
} catch (err) {
closeAndFinish(done, realtime, err);
return;
}
});

rtUnfilteredChannel.attach(function (err) {
if (err) {
closeAndFinish(done, realtime, err);
return;
}

rtUnfilteredChannel.subscribe(function (msg) {
try {
// Push received unfiltered messages into an array
unFilteredMessages.push(msg);
} catch (err) {
closeAndFinish(done, realtime, err);
return;
}
});

// Subscription to check all messages were received as expected
rtUnfilteredChannel.subscribe('end', function (msg) {
try {
expect(msg.data).to.equal(testData[testData.length - 1].data, 'Unexpected msg data received');

// Check that we receive expected messages on filtered channel
expect(filteredMessages.length).to.equal(2, 'Expect only two filtered message to be received');
expect(filteredMessages[0].data).to.equal(testData[0].data, 'Unexpected data received');
expect(filteredMessages[1].data).to.equal(testData[2].data, 'Unexpected data received');
expect(filteredMessages[0].extras.headers.name).to.equal(
testData[0].extras.headers.name,
'Unexpected header value received'
);
expect(filteredMessages[1].extras.headers.name).to.equal(
testData[2].extras.headers.name,
'Unexpected header value received'
);
// Check that message with header that doesn't meet filtering condition is not received.
for (msg of filteredMessages) {
expect(msg.extras.headers.number).to.equal(26095, 'Unexpected header filtering value received');
}

// Check that we receive expected messages on unfiltered channel, including the `end` event message
expect(unFilteredMessages.length).to.equal(6, 'Expect only 6 unfiltered message to be received');
for (var i = 0; i < unFilteredMessages.length; i++) {
expect(unFilteredMessages[i].data).to.equal(testData[i].data, 'Unexpected data received');
}
} catch (err) {
closeAndFinish(done, realtime, err);
return;
}
closeAndFinish(done, realtime);
});
var restChannel = rest.channels.get('chan');
restChannel.publish(testData);
});
});
});
monitorConnection(done, realtime);
} catch (err) {
closeAndFinish(done, realtime, err);
}
});
});
});