Skip to content

Commit

Permalink
perf(platform): increase broker gateway message throughput
Browse files Browse the repository at this point in the history
Previously, performance degraded significantly when sending more than 1000 messages simultaneously,
since filtering of message acknowledgements per-subscriber scaled worse than linearly. This change
introduces a message selector to quickly filter many messages from many subscribers. Instead of a predicate,
a key is used to dispatch messages with O(1) complexity to the subscribers. With this optimization
the broker gateway is capable of handling 10000 messages under 5 seconds.

closes #90
  • Loading branch information
k-genov authored and danielwiehl committed Mar 9, 2023
1 parent 2f6766e commit 579c125
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import {AsyncSubject, EMPTY, firstValueFrom, fromEvent, lastValueFrom, merge, MonoTypeOperatorFunction, NEVER, noop, Observable, Observer, of, ReplaySubject, Subject, TeardownLogic, throwError, timeout, timer} from 'rxjs';
import {ConnackMessage, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, SubscribeCommand, UnsubscribeCommand} from '../../ɵmessaging.model';
import {finalize, map, mergeMap, take, takeUntil, tap} from 'rxjs/operators';
import {filterByChannel, filterByMessageHeader, filterByOrigin, filterByTopicChannel, filterByTransport, filterByWindow, pluckMessage} from '../../operators';
import {filterByChannel, filterByOrigin, filterByTopicChannel, filterByTransport, filterByWindow, pluckMessage} from '../../operators';
import {UUID} from '@scion/toolkit/uuid';
import {IntentMessage, Message, MessageHeaders, ResponseStatusCodes, TopicMessage} from '../../messaging.model';
import {Logger, NULL_LOGGER} from '../../logger';
Expand All @@ -21,10 +21,11 @@ import {ɵVERSION, ɵWINDOW_TOP} from '../../ɵplatform.model';
import {PlatformState} from '../../platform-state';
import {ConnectOptions} from '../connect-options';
import {MicrofrontendPlatform} from '../../microfrontend-platform';
import {MessageClient} from '../../client/messaging/message-client';
import {MessageClient} from './message-client';
import {runSafe} from '../../safe-runner';
import {stringifyError} from '../../error.util';
import {decorateObservable} from '../../observable-decorator';
import {MessageSelector} from './message-selector';

/**
* The gateway is responsible for dispatching messages between the client and the broker.
Expand Down Expand Up @@ -113,6 +114,15 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
private _session$ = new AsyncSubject<Session>();
private _message$ = new Subject<MessageEvent<MessageEnvelope>>();

private _selectMessagesByTopic = new MessageSelector({
source$: this._message$.pipe(filterByChannel<TopicMessage>(MessagingChannel.Topic)),
keySelector: event => event.data.message.topic,
});
private _selectMessagesBySubscriberIdHeader = new MessageSelector({
source$: this._message$,
keySelector: event => event.data.message.headers.get(MessageHeaders.ɵSubscriberId),
});

constructor(connectOptions?: ConnectOptions) {
this._appSymbolicName = Beans.get<string>(APP_IDENTITY);
this._brokerDiscoverTimeout = connectOptions?.brokerDiscoverTimeout ?? 10_000;
Expand Down Expand Up @@ -166,9 +176,8 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
// Install Promise that resolves once the broker has acknowledged the message, or that rejects otherwise.
const postError$ = new Subject<never>();
const whenPosted = new Promise<void>((resolve, reject) => {
merge(this._message$, postError$)
merge(this._selectMessagesByTopic.select$<MessageEvent<MessageEnvelope<TopicMessage<MessageDeliveryStatus>>>>(messageId), postError$)
.pipe(
filterByTopicChannel<MessageDeliveryStatus>(messageId),
take(1),
pluckMessage(),
timeout({first: this._messageDeliveryTimeout, with: () => throwError(() => GatewayErrors.MESSAGE_DISPATCH_ERROR(this._messageDeliveryTimeout, envelope))}),
Expand Down Expand Up @@ -208,10 +217,9 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
.set(MessageHeaders.ɵSubscriberId, subscriberId); // message header to subscribe for replies

// Receive replies sent to the reply topic.
merge(this._message$, requestError$)
merge(this._selectMessagesBySubscriberIdHeader.select$<MessageEvent<MessageEnvelope<TopicMessage>>>(subscriberId), requestError$)
.pipe(
filterByChannel<TopicMessage<T>>(MessagingChannel.Topic),
filterByMessageHeader({name: MessageHeaders.ɵSubscriberId, value: subscriberId}),
pluckMessage(),
decorateObservable(),
takeUntil(merge(this._platformStopping$, unsubscribe$)),
Expand Down Expand Up @@ -245,10 +253,9 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
const subscribeError$ = new Subject<never>();

// Receive messages of given subscription.
merge(this._message$, subscribeError$)
merge(this._selectMessagesBySubscriberIdHeader.select$<MessageEvent<MessageEnvelope<T>>>(subscriberId), subscribeError$)
.pipe(
filterByChannel<T>(messageChannel),
filterByMessageHeader({name: MessageHeaders.ɵSubscriberId, value: subscriberId}),
pluckMessage(),
decorateObservable(),
takeUntil(merge(this._platformStopping$, unsubscribe$)),
Expand Down Expand Up @@ -442,6 +449,8 @@ export class ɵBrokerGateway implements BrokerGateway, PreDestroy, Initializer {
public preDestroy(): void {
this.disconnectFromBroker();
this._platformStopping$.next();
this._selectMessagesByTopic.disconnect();
this._selectMessagesBySubscriberIdHeader.disconnect();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) 2018-2023 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

import {identity, NEVER, Subject} from 'rxjs';
import {ObserveCaptor} from '@scion/toolkit/testing';
import {MessageSelector} from './message-selector';

describe('Message Selector', () => {

it('should select message with id `2`', async () => {
// GIVEN
const messages$ = new Subject<{id: string; text: string}>();
const captor = new ObserveCaptor();

// WHEN
const selectMessagesById = new MessageSelector({source$: messages$, keySelector: message => message.id});
selectMessagesById.select$('2').subscribe(captor);
messages$.next({id: '1', text: 'foo'});
messages$.next({id: '2', text: 'bar'});
messages$.next({id: '3', text: 'foobar'});

// THEN
expect(captor.getValues()).toEqual([{id: '2', text: 'bar'}]);
expect(captor.hasCompleted()).toBeFalse();
});

it('should select messages with id `2`', async () => {
// GIVEN
const messages$ = new Subject<{id: string; text: string}>();
const captor = new ObserveCaptor();

// WHEN
const selectMessagesById = new MessageSelector({source$: messages$, keySelector: message => message.id});
selectMessagesById.select$('2').subscribe(captor);
messages$.next({id: '1', text: 'msg-1'});
messages$.next({id: '2', text: 'msg-2'});
messages$.next({id: '1', text: 'msg-3'});
messages$.next({id: '2', text: 'msg-4'});

// THEN
expect(captor.getValues()).toEqual([{id: '2', text: 'msg-2'}, {id: '2', text: 'msg-4'}]);
expect(captor.hasCompleted()).toBeFalse();
});

it('should support multiple subscribers on the same key', async () => {
// GIVEN
const messages$ = new Subject<{id: string; text: string}>();
const captor1 = new ObserveCaptor();
const captor2 = new ObserveCaptor();

// WHEN
const selectMessagesById = new MessageSelector({source$: messages$, keySelector: message => message.id});
selectMessagesById.select$('1').subscribe(captor1);
selectMessagesById.select$('1').subscribe(captor2);

messages$.next({id: '1', text: 'foo'});
messages$.next({id: '2', text: 'bar'});

// THEN
expect(captor1.getValues()).toEqual([{id: '1', text: 'foo'}]);
expect(captor1.hasCompleted()).toBeFalse();

expect(captor2.getValues()).toEqual([{id: '1', text: 'foo'}]);
expect(captor2.hasCompleted()).toBeFalse();
});

it('should not buffer messages sent before subscription', async () => {
// GIVEN
const messages$ = new Subject<{id: string; text: string}>();
const captor = new ObserveCaptor();

// WHEN
const selectMessagesById = new MessageSelector({source$: messages$, keySelector: message => message.id});
messages$.next({id: '1', text: 'foo'});
selectMessagesById.select$('1').subscribe(captor);

// THEN
expect(captor.getValues()).toEqual([]);
expect(captor.hasCompleted()).toBeFalse();
});

it('should complete when disconnected', async () => {
// GIVEN
const messages$ = new Subject<string>();
const captor = new ObserveCaptor();

// WHEN
const selectMessagesById = new MessageSelector({source$: messages$, keySelector: identity});
selectMessagesById.select$('foo').subscribe(captor);
selectMessagesById.disconnect();
messages$.next('foo');

// THEN
expect(captor.getValues()).toEqual([]);
expect(captor.hasCompleted()).toBeTrue();
expect(selectMessagesById.ɵsubscriberCount()).toBe(0);
});

it('should complete when the source completes', async () => {
// GIVEN
const source$ = new Subject<string>();
const captor = new ObserveCaptor();

const selectMessagesById = new MessageSelector({source$, keySelector: identity});
selectMessagesById.select$('1').subscribe(captor);
expect(captor.hasCompleted()).toBeFalse();

// WHEN completing the source
source$.complete();

// THEN expect source to be completed
expect(captor.hasCompleted()).toBeTrue();
expect(selectMessagesById.ɵsubscriberCount()).toBe(0);
});

it('should error when the source errors', async () => {
// GIVEN
const source$ = new Subject<string>();
const captor = new ObserveCaptor();
const selectMessagesById = new MessageSelector({source$, keySelector: identity});
selectMessagesById.select$('1').subscribe(captor);

// WHEN erroring the source
source$.error('error');

// THEN expect source to be errored
expect(captor.hasErrored()).toBeTrue();
expect(selectMessagesById.ɵsubscriberCount()).toBe(0);
});

it('should unsubscribe when unsubscribing a subscriber', async () => {
// GIVEN
const messageSelector = new MessageSelector({source$: NEVER, keySelector: identity});

// WHEN subscribing subscriber 1
const subscription1 = messageSelector.select$('1').subscribe();
// THEN
expect(messageSelector.ɵsubscriberCount()).toBe(1);

// WHEN subscribing subscriber 2
const subscription2 = messageSelector.select$('1').subscribe();
// THEN
expect(messageSelector.ɵsubscriberCount()).toBe(2);

// WHEN subscribing subscriber 3
const subscription3 = messageSelector.select$('2').subscribe();
// THEN
expect(messageSelector.ɵsubscriberCount()).toBe(3);

// WHEN unsubscribing subscriber 1
subscription1.unsubscribe();
// THEN
expect(messageSelector.ɵsubscriberCount()).toBe(2);

// WHEN unsubscribing subscriber 2
subscription2.unsubscribe();
// THEN
expect(messageSelector.ɵsubscriberCount()).toBe(1);

// WHEN unsubscribing subscriber 3
subscription3.unsubscribe();
// THEN
expect(messageSelector.ɵsubscriberCount()).toBe(0);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (c) 2018-2023 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

import {merge, Observable, Subject} from 'rxjs';
import {takeUntil} from 'rxjs/operators';
import {Maps} from '@scion/toolkit/util';

/**
* Selects items emitted by an Observable according to a static criterion.
*
* This selector was introduced to quickly filter many messages from many subscribers.
* Instead of a predicate, a key is used to dispatch messages with O(1) complexity to the subscribers.
*
* Prior to this selector, performance degraded significantly when sending more than 1000 messages
* simultaneously, since filtering of message acknowledgements per-subscriber scaled worse than linearly.
*
* ---
* ### Usage
*
* ```ts
* // Create message source.
* const messages$ = new Subject<{id: string; text: string}>();
*
* // Create selector to filter messages by id.
* const selectMessagesById = new MessageSelector({source$: messages$, keySelector: message => message.id});
*
* // Receive only messages with id '1'.
* selectMessagesById.select$('1').subscribe(msg => {
* // do something
* });
*
* // Emit messages.
* messages$.next({id: '1', text: 'foo'});
* messages$.next({id: '2', text: 'bar'});
* ```
*
* @internal
*/
export class MessageSelector<T> {

private _selectors = new Map<string, Array<Subject<T>>>;
private _sourceError$ = new Subject<never>();
private _sourceComplete$ = new Subject<void>();
private _destroy$ = new Subject<void>();

/**
* @param config - Controls how to select messages.
*/
constructor(config: SelectorConfig<T>) {
const {source$, keySelector} = config;

source$
.pipe(takeUntil(this._destroy$))
.subscribe({
next: item => {
const key = keySelector(item);
this._selectors.get(key)?.forEach(selector => selector.next(item));
},
error: error => this._sourceError$.error(error),
complete: () => this._sourceComplete$.next(),
});
}

/**
* Selects items emitted by the source Observable that match the given key.
*
* @param key - Specifies the key to select items.
*/
public select$<R extends T>(key: string): Observable<R> {
return new Observable(observer => {
const selector$ = new Subject<any>();
Maps.addListValue(this._selectors, key, selector$);
const subscription = merge(selector$, this._sourceError$)
.pipe(takeUntil(this._sourceComplete$))
.subscribe(observer);

return () => {
Maps.removeListValue(this._selectors, key, selector$);
subscription.unsubscribe();
};
});
}

/**
* Returns the current subscriber count.
*/
public ɵsubscriberCount(): number {
return Array.from(this._selectors.values()).reduce((count, selectors) => count + selectors.length, 0);
}

/**
* Disconnects this selector from the source Observable.
*/
public disconnect(): void {
this._destroy$.next();
}
}

/**
* Controls how to select items.
*
* @internal
*/
export interface SelectorConfig<T> {
/**
* Specifies the Observable to select items from.
*/
source$: Observable<T>;
/**
* Specifies the function to compute the key of an item.
*
* A selector can then subscribe for items matching the key.
*/
keySelector: (item: T) => string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,17 @@ describe('Messaging', () => {
expect(headersCaptor.getLastValue().get(MessageHeaders.AppSymbolicName)).not.toEqual('should-not-be-set');
});

it('should process 10_000 messages under 5 seconds', async () => {
await MicrofrontendPlatformHost.start({applications: []});
const startTime = Date.now();
const acknowledgments = [];
for (let i = 0; i < 10_000; i++) {
acknowledgments.push(Beans.get(MessageClient).publish('topic'));
}
await Promise.all(acknowledgments);
expect(Date.now() - startTime).toBeLessThan(5000);
});

describe('takeUntilUnsubscribe operator', () => {

it('should complete the source observable when all subscribers unsubscribed', async () => {
Expand Down
Loading

0 comments on commit 579c125

Please sign in to comment.