Skip to content

Commit

Permalink
fix(platform/messaging): assert topic not to contain empty segments
Browse files Browse the repository at this point in the history
We also moved topic validation to the host so that it is effective for older clients as well.
  • Loading branch information
danielwiehl authored and Marcarrian committed Nov 23, 2022
1 parent daff4f0 commit f8c47e3
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1121,41 +1121,6 @@ describe('Messaging', () => {
expect(headersCaptor.getLastValue().get(MessageHeaders.AppSymbolicName)).toEqual('host-app');
});

it('should throw if the topic of a message to publish is empty, `null` or `undefined`, or contains wildcard segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

expect(() => Beans.get(MessageClient).publish('myhome/:room/temperature')).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).publish(null)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).publish(undefined)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).publish('')).toThrowError(/IllegalTopicError/);
});

it('should throw if the topic of a request is empty, `null` or `undefined`, or contains wildcard segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

expect(() => Beans.get(MessageClient).request$('myhome/:room/temperature')).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).request$(null)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).request$(undefined)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).request$('')).toThrowError(/IllegalTopicError/);
});

it('should throw if the topic to observe the subscriber count is empty, `null` or `undefined`, or contains wildcard segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

expect(() => Beans.get(MessageClient).subscriberCount$('myhome/:room/temperature')).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).subscriberCount$(null)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).subscriberCount$(undefined)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).subscriberCount$('')).toThrowError(/IllegalTopicError/);
});

it('should throw if the topic to observe is empty, `null` or `undefined`', async () => {
await MicrofrontendPlatform.startHost({applications: []});

expect(() => Beans.get(MessageClient).observe$(null)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).observe$(undefined)).toThrowError(/IllegalTopicError/);
expect(() => Beans.get(MessageClient).observe$('')).toThrowError(/IllegalTopicError/);
});

it('should throw if the qualifier of an intent contains wildcard characters', async () => {
await MicrofrontendPlatform.startHost({applications: []});

Expand Down Expand Up @@ -1680,13 +1645,199 @@ describe('Messaging', () => {
it('should not error if no subscriber is found', async () => {
await MicrofrontendPlatform.startHost({applications: []});

// Send intent.
// Send message.
const whenPublished = Beans.get(MessageClient).publish('myhome/temperature/kitchen', '20°C');
await expectAsync(whenPublished).toBeResolved();
});

it('should error if publish topic is "empty", "null" or "undefined"', async () => {
await MicrofrontendPlatform.startHost({applications: []});

await expectAsync(Beans.get(MessageClient).publish('')).toBeRejectedWithError(/IllegalTopicError/);
await expectAsync(Beans.get(MessageClient).publish(null)).toBeRejectedWithError(/IllegalTopicError/);
await expectAsync(Beans.get(MessageClient).publish(undefined)).toBeRejectedWithError(/IllegalTopicError/);
});

it('should error if publish topic contains empty segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

await expectAsync(Beans.get(MessageClient).publish('/myhome/kitchen/')).toBeRejectedWithError(/IllegalTopicError/);
await expectAsync(Beans.get(MessageClient).publish('/myhome/kitchen')).toBeRejectedWithError(/IllegalTopicError/);
await expectAsync(Beans.get(MessageClient).publish('myhome/kitchen/')).toBeRejectedWithError(/IllegalTopicError/);
await expectAsync(Beans.get(MessageClient).publish('/myhome//temperature')).toBeRejectedWithError(/IllegalTopicError/);
await expectAsync(Beans.get(MessageClient).publish('/')).toBeRejectedWithError(/IllegalTopicError/);
});

it('should error if publish topic contains wildcard segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

await expectAsync(Beans.get(MessageClient).publish('myhome/:room/temperature')).toBeRejectedWithError(/IllegalTopicError/);
});

it('should error if observe topic is "empty", "null" or "undefined"', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor1 = new ObserveCaptor();
Beans.get(MessageClient).observe$('').subscribe(captor1);
await captor1.waitUntilCompletedOrErrored();
expect(captor1.getError()).toMatch(/IllegalTopicError/);

const captor2 = new ObserveCaptor();
Beans.get(MessageClient).observe$(null).subscribe(captor2);
await captor2.waitUntilCompletedOrErrored();
expect(captor2.getError()).toMatch(/IllegalTopicError/);

const captor3 = new ObserveCaptor();
Beans.get(MessageClient).observe$(undefined).subscribe(captor3);
await captor3.waitUntilCompletedOrErrored();
expect(captor3.getError()).toMatch(/IllegalTopicError/);
});

it('should error if observe topic contains empty segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor1 = new ObserveCaptor();
Beans.get(MessageClient).observe$('/myhome/kitchen/').subscribe(captor1);
await captor1.waitUntilCompletedOrErrored();
expect(captor1.getError()).toMatch(/IllegalTopicError/);

const captor2 = new ObserveCaptor();
Beans.get(MessageClient).observe$('/myhome/kitchen').subscribe(captor2);
await captor2.waitUntilCompletedOrErrored();
expect(captor2.getError()).toMatch(/IllegalTopicError/);

const captor3 = new ObserveCaptor();
Beans.get(MessageClient).observe$('myhome/kitchen/').subscribe(captor3);
await captor3.waitUntilCompletedOrErrored();
expect(captor3.getError()).toMatch(/IllegalTopicError/);

const captor4 = new ObserveCaptor();
Beans.get(MessageClient).observe$('/myhome//temperature').subscribe(captor4);
await captor4.waitUntilCompletedOrErrored();
expect(captor4.getError()).toMatch(/IllegalTopicError/);

const captor5 = new ObserveCaptor();
Beans.get(MessageClient).observe$('/').subscribe(captor5);
await captor5.waitUntilCompletedOrErrored();
expect(captor5.getError()).toMatch(/IllegalTopicError/);
});

it('should error if subscriber count topic is empty', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor1 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$('').subscribe(captor1);
await captor1.waitUntilCompletedOrErrored();
expect(captor1.getError()).toMatch(/IllegalTopicError/);

const captor2 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$(null).subscribe(captor2);
await captor2.waitUntilCompletedOrErrored();
expect(captor2.getError()).toMatch(/IllegalTopicError/);

const captor3 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$(undefined).subscribe(captor3);
await captor3.waitUntilCompletedOrErrored();
expect(captor3.getError()).toMatch(/IllegalTopicError/);
});

it('should error if subscriber count topic contains empty segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor1 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$('/myhome/kitchen/').subscribe(captor1);
await captor1.waitUntilCompletedOrErrored();
expect(captor1.getError()).toMatch(/IllegalTopicError/);

const captor2 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$('/myhome/kitchen').subscribe(captor2);
await captor2.waitUntilCompletedOrErrored();
expect(captor2.getError()).toMatch(/IllegalTopicError/);

const captor3 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$('myhome/kitchen/').subscribe(captor3);
await captor3.waitUntilCompletedOrErrored();
expect(captor3.getError()).toMatch(/IllegalTopicError/);

const captor4 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$('/myhome//temperature').subscribe(captor4);
await captor4.waitUntilCompletedOrErrored();
expect(captor4.getError()).toMatch(/IllegalTopicError/);

const captor5 = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$('/').subscribe(captor5);
await captor5.waitUntilCompletedOrErrored();
expect(captor5.getError()).toMatch(/IllegalTopicError/);
});

it('should error if subscriber count topic contains wildcard segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor = new ObserveCaptor();
Beans.get(MessageClient).subscriberCount$('myhome/:room/temperature').subscribe(captor);
await captor.waitUntilCompletedOrErrored();
expect(captor.getError()).toMatch(/IllegalTopicError/);
});

describe('request', () => {

it('should error if request topic is empty', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor1 = new ObserveCaptor();
Beans.get(MessageClient).request$('').subscribe(captor1);
await captor1.waitUntilCompletedOrErrored();
expect(captor1.getError()).toMatch(/IllegalTopicError/);

const captor2 = new ObserveCaptor();
Beans.get(MessageClient).request$(null).subscribe(captor2);
await captor2.waitUntilCompletedOrErrored();
expect(captor2.getError()).toMatch(/IllegalTopicError/);

const captor3 = new ObserveCaptor();
Beans.get(MessageClient).request$(undefined).subscribe(captor3);
await captor3.waitUntilCompletedOrErrored();
expect(captor3.getError()).toMatch(/IllegalTopicError/);
});

it('should error if request topic contains empty segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor1 = new ObserveCaptor();
Beans.get(MessageClient).request$('/myhome/kitchen/').subscribe(captor1);
await captor1.waitUntilCompletedOrErrored();
expect(captor1.getError()).toMatch(/IllegalTopicError/);

const captor2 = new ObserveCaptor();
Beans.get(MessageClient).request$('/myhome/kitchen').subscribe(captor2);
await captor2.waitUntilCompletedOrErrored();
expect(captor2.getError()).toMatch(/IllegalTopicError/);

const captor3 = new ObserveCaptor();
Beans.get(MessageClient).request$('myhome/kitchen/').subscribe(captor3);
await captor3.waitUntilCompletedOrErrored();
expect(captor3.getError()).toMatch(/IllegalTopicError/);

const captor4 = new ObserveCaptor();
Beans.get(MessageClient).request$('/myhome//temperature').subscribe(captor4);
await captor4.waitUntilCompletedOrErrored();
expect(captor4.getError()).toMatch(/IllegalTopicError/);

const captor5 = new ObserveCaptor();
Beans.get(MessageClient).request$('/').subscribe(captor5);
await captor5.waitUntilCompletedOrErrored();
expect(captor5.getError()).toMatch(/IllegalTopicError/);
});

it('should error if request topic contains wildcard segments', async () => {
await MicrofrontendPlatform.startHost({applications: []});

const captor = new ObserveCaptor();
Beans.get(MessageClient).request$('myhome/:room/temperature').subscribe(captor);
await captor.waitUntilCompletedOrErrored();
expect(captor.getError()).toMatch(/IllegalTopicError/);
});

it('should error if no replier is found', async () => {
await MicrofrontendPlatform.startHost({applications: []});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {defer, noop, Observable, Subject, Subscription} from 'rxjs';
import {IntentMessage, mapToBody, throwOnErrorStatus, TopicMessage} from '../../messaging.model';
import {BrokerGateway} from './broker-gateway';
import {MessagingChannel, PlatformTopics, TopicSubscribeCommand} from '../../ɵmessaging.model';
import {Topics} from '../../topics.util';
import {MessageClient} from './message-client';
import {Beans} from '@scion/toolkit/bean-manager';
import {MessageHandler} from './message-handler';
Expand All @@ -23,7 +22,6 @@ export class ɵMessageClient implements MessageClient {
private readonly _brokerGateway = Beans.get(BrokerGateway);

public publish<T = any>(topic: string, message?: T, options?: PublishOptions): Promise<void> {
assertTopic(topic, {allowWildcardSegments: false});
const topicMessage: TopicMessage = {
topic,
retain: options?.retain ?? false,
Expand All @@ -34,7 +32,6 @@ export class ɵMessageClient implements MessageClient {
}

public request$<T>(topic: string, request?: any, options?: RequestOptions): Observable<TopicMessage<T>> {
assertTopic(topic, {allowWildcardSegments: false});
// IMPORTANT:
// When sending a request, the platform adds various headers to the message. Therefore, to support multiple subscriptions
// to the returned Observable, each subscription must have its individual message instance and headers map.
Expand All @@ -52,7 +49,6 @@ export class ɵMessageClient implements MessageClient {
}

public observe$<T>(topic: string): Observable<TopicMessage<T>> {
assertTopic(topic, {allowWildcardSegments: true});
return this._brokerGateway.subscribe$({
messageChannel: MessagingChannel.Topic,
subscribeChannel: MessagingChannel.TopicSubscribe,
Expand All @@ -66,7 +62,6 @@ export class ɵMessageClient implements MessageClient {
}

public subscriberCount$(topic: string): Observable<number> {
assertTopic(topic, {allowWildcardSegments: false});
return new Observable<number>(observer => {
const unsubscribe$ = new Subject<void>();
this.request$<number>(PlatformTopics.RequestSubscriberCount, topic)
Expand All @@ -84,16 +79,6 @@ export class ɵMessageClient implements MessageClient {
}
}

function assertTopic(topic: string, options: {allowWildcardSegments: boolean}): void {
if (topic === undefined || topic === null || topic.length === 0) {
throw Error('[IllegalTopicError] Topic must not be `null`, `undefined` or empty');
}

if (!options.allowWildcardSegments && Topics.containsWildcardSegments(topic)) {
throw Error(`[IllegalTopicError] Topic not allowed to contain wildcard segments. [topic='${topic}']`);
}
}

function setBodyIfDefined<T>(message: TopicMessage<T> | IntentMessage<T>, body?: T): void {
if (body !== undefined) {
message.body = body;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
import {EMPTY, from, fromEvent, merge, MonoTypeOperatorFunction, Observable, of, Subject} from 'rxjs';
import {catchError, filter, mergeMap, share, takeUntil} from 'rxjs/operators';
import {IntentMessage, Message, MessageHeaders, TopicMessage} from '../../messaging.model';
import {IntentMessage, Message, MessageHeaders, ResponseStatusCodes, TopicMessage} from '../../messaging.model';
import {ConnackMessage, IntentSubscribeCommand, MessageDeliveryStatus, MessageEnvelope, MessagingChannel, MessagingTransport, PlatformTopics, TopicSubscribeCommand, UnsubscribeCommand} from '../../ɵmessaging.model';
import {ApplicationRegistry} from '../application-registry';
import {ManifestRegistry} from '../manifest-registry/manifest-registry';
Expand All @@ -33,6 +33,7 @@ import {TopicMatcher} from '../../topic-matcher.util';
import {Defined, Maps} from '@scion/toolkit/util';
import {MessageClient} from '../../client/messaging/message-client';
import {Predicates} from './predicates.util';
import {Topics} from '../../topics.util';

/**
* The broker is responsible for receiving all messages, filtering the messages, determining who is
Expand Down Expand Up @@ -221,15 +222,18 @@ export class MessageBroker implements Initializer, PreDestroy {
private installTopicSubscriberCountObserver(): void {
Beans.get(MessageClient).observe$<string>(PlatformTopics.RequestSubscriberCount)
.pipe(takeUntil(this._destroy$))
.subscribe(request => {
.subscribe(request => runSafe(() => {
const topic = request.body!;
const replyTo = request.headers.get(MessageHeaders.ReplyTo);
const unsubscribe$ = this._topicSubscriptionRegistry.subscriptionCount$(replyTo).pipe(filter(count => count === 0));

this._topicSubscriptionRegistry.subscriptionCount$(topic)
.pipe(takeUntil(merge(this._destroy$, unsubscribe$)))
.subscribe(count => Beans.get(MessageClient).publish(replyTo, count)); // eslint-disable-line rxjs/no-nested-subscribe
});
.subscribe({ // eslint-disable-line rxjs/no-nested-subscribe
next: count => Beans.get(MessageClient).publish(replyTo, count),
error: error => Beans.get(MessageClient).publish(replyTo, stringifyError(error), {headers: new Map().set(MessageHeaders.Status, ResponseStatusCodes.ERROR)}),
});
}));
}

/**
Expand All @@ -246,9 +250,9 @@ export class MessageBroker implements Initializer, PreDestroy {
const message = event.data.message;
const messageId = message.headers.get(MessageHeaders.MessageId);

if (!message.topic) {
const error = '[MessagingError] Missing message property: topic';
sendDeliveryStatusError(client, messageId, error);
const illegalTopicError = Topics.validateTopic(message.topic, {exactTopic: true});
if (illegalTopicError) {
sendDeliveryStatusError(client, messageId, illegalTopicError);
return;
}

Expand Down Expand Up @@ -407,10 +411,16 @@ export class MessageBroker implements Initializer, PreDestroy {
const client = getSendingClient(event);
const envelope = event.data;
const messageId = envelope.message.headers.get(MessageHeaders.MessageId);
const topic = envelope.message.topic;

const illegalTopicError = Topics.validateTopic(topic, {exactTopic: false});
if (illegalTopicError) {
sendDeliveryStatusError(client, messageId, illegalTopicError);
return;
}

try {
const subscriberId = Defined.orElseThrow(envelope.message.subscriberId, () => Error('[TopicSubscribeError] Missing property: subscriberId'));
const topic = Defined.orElseThrow(envelope.message.topic, () => Error('[TopicSubscribeError] Missing property: topic'));
this._topicSubscriptionRegistry.register(new TopicSubscription(topic, subscriberId, client));
sendDeliveryStatusSuccess(client, messageId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ describe('TopicSubscriptionRegistry', () => {
await expectEmissions(subscriptionCountCaptor).toEqual([0, 1, 2, 3, 2, 1, 0]);
});

it('should throw if trying to observe a non-exact topic', async () => {
const testee = Beans.get(TopicSubscriptionRegistry);
await expect(() => testee.subscriptionCount$('myhome/livingroom/:measurement')).toThrowError(/TopicObserveError/);
});

it('should allow multiple subscriptions on different topics of the same client', async () => {
const testee = Beans.get(TopicSubscriptionRegistry);
const client = newClient({id: 'client'});
Expand Down
Loading

0 comments on commit f8c47e3

Please sign in to comment.