Skip to content
This repository has been archived by the owner on Apr 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #5 from eranor/master
Browse files Browse the repository at this point in the history
Matching for wildcards in topics
  • Loading branch information
davidyaha committed Nov 1, 2018
2 parents c5cbdf4 + 163a7f0 commit a8a8bd4
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 25 deletions.
64 changes: 46 additions & 18 deletions src/mqtt-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,40 @@ export interface PubSubMQTTOptions {

export class MQTTPubSub implements PubSubEngine {

private triggerTransform: TriggerTransform;
private onMQTTSubscribe: SubscribeHandler;
private subscribeOptionsResolver: SubscribeOptionsResolver;
private publishOptionsResolver: PublishOptionsResolver;
private mqttConnection: Client;
private subscriptionMap: { [subId: number]: [string, Function] };
private subsRefsMap: { [trigger: string]: Array<number> };
private currentSubscriptionId: number;
private parseMessageWithEncoding: string;

private static matches(pattern: string, topic: string) {
const patternSegments = pattern.split('/');
const topicSegments = topic.split('/');
const patternLength = patternSegments.length;

for (let i = 0; i < patternLength; i++) {
const currentPattern = patternSegments[i];
const currentTopic = topicSegments[i];
if (!currentTopic && !currentPattern) {
continue;
}
if (!currentTopic && currentPattern !== '#') {
return false;
}
if (currentPattern[0] === '#') {
return i === (patternLength - 1);
}
if (currentPattern[0] !== '+' && currentPattern !== currentTopic) {
return false;
}
}
return patternLength === (topicSegments.length);
}

constructor(options: PubSubMQTTOptions = {}) {
this.triggerTransform = options.triggerTransform || (trigger => trigger as string);

Expand All @@ -38,8 +72,8 @@ export class MQTTPubSub implements PubSubEngine {
this.subsRefsMap = {};
this.currentSubscriptionId = 0;
this.onMQTTSubscribe = options.onMQTTSubscribe || (() => null);
this.publishOptionsResolver = options.publishOptions || (() => Promise.resolve({}));
this.subscribeOptionsResolver = options.subscribeOptions || (() => Promise.resolve({}));
this.publishOptionsResolver = options.publishOptions || (() => Promise.resolve({} as IClientPublishOptions));
this.subscribeOptionsResolver = options.subscribeOptions || (() => Promise.resolve({} as IClientSubscribeOptions));
this.parseMessageWithEncoding = options.parseMessageWithEncoding;
}

Expand Down Expand Up @@ -94,8 +128,9 @@ export class MQTTPubSub implements PubSubEngine {
const [triggerName = null] = this.subscriptionMap[subId] || [];
const refs = this.subsRefsMap[triggerName];

if (!refs)
if (!refs) {
throw new Error(`There is no subscription of id "${subId}"`);
}

let newRefs;
if (refs.length === 1) {
Expand All @@ -104,7 +139,7 @@ export class MQTTPubSub implements PubSubEngine {

} else {
const index = refs.indexOf(subId);
if (index != -1) {
if (index > -1) {
newRefs = [...refs.slice(0, index), ...refs.slice(index + 1)];
}
}
Expand All @@ -118,12 +153,16 @@ export class MQTTPubSub implements PubSubEngine {
}

private onMessage(topic: string, message: Buffer) {
const subscribers = this.subsRefsMap[topic];
const subscribers = [].concat(
...Object.keys(this.subsRefsMap)
.filter((key) => MQTTPubSub.matches(key, topic))
.map((key) => this.subsRefsMap[key]),
);

// Don't work for nothing..
if (!subscribers || !subscribers.length)
if (!subscribers || !subscribers.length) {
return;

}
const messageString = message.toString(this.parseMessageWithEncoding);
let parsedMessage;
try {
Expand All @@ -137,17 +176,6 @@ export class MQTTPubSub implements PubSubEngine {
listener(parsedMessage);
}
}

private triggerTransform: TriggerTransform;
private onMQTTSubscribe: SubscribeHandler;
private subscribeOptionsResolver: SubscribeOptionsResolver;
private publishOptionsResolver: PublishOptionsResolver;
private mqttConnection: Client;

private subscriptionMap: { [subId: number]: [string, Function] };
private subsRefsMap: { [trigger: string]: Array<number> };
private currentSubscriptionId: number;
private parseMessageWithEncoding: string;
}

export type Path = Array<string | number>;
Expand Down
14 changes: 7 additions & 7 deletions src/pubsub-async-iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ import { PubSubEngine } from 'graphql-subscriptions/dist/pubsub-engine';
*/
export class PubSubAsyncIterator<T> implements AsyncIterator<T> {

private pullQueue: Function[];
private pushQueue: any[];
private eventsArray: string[];
private allSubscribed: Promise<number[]>;
private listening: boolean;
private pubsub: PubSubEngine;

constructor(pubsub: PubSubEngine, eventNames: string | string[]) {
this.pubsub = pubsub;
this.pullQueue = [];
Expand Down Expand Up @@ -60,13 +67,6 @@ export class PubSubAsyncIterator<T> implements AsyncIterator<T> {
return this;
}

private pullQueue: Function[];
private pushQueue: any[];
private eventsArray: string[];
private allSubscribed: Promise<number[]>;
private listening: boolean;
private pubsub: PubSubEngine;

private async pushValue(event) {
await this.allSubscribed;
if (this.pullQueue.length !== 0) {
Expand Down
103 changes: 103 additions & 0 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,106 @@ describe('PubSubAsyncIterator', function () {
});

});

describe('Wildcards in subscription topic', function () {

it('should receive message while subscribing to topic containing wildcard', done => {
const pubSub = new MQTTPubSub();
let unSubIds = [];
let callCount = 0;
const onMessageSpy = spy(() => {
callCount++;
if (callCount === 3) {
pubSub.unsubscribe(unSubIds[0]);
pubSub.unsubscribe(unSubIds[1]);

expect(onMessageSpy.callCount).to.equals(3);
onMessageSpy.calls.forEach(call => {
expect(call.args).to.have.members(['test']);
});

done();
}
});
const subscriptionPromises = [
pubSub.subscribe('Posts/#', onMessageSpy as Function),
pubSub.subscribe('Posts/CategoryA', onMessageSpy as Function),
];

Promise.all(subscriptionPromises).then(subIds => {
try {
expect(subIds.length).to.equals(2);
pubSub.publish('Posts/CategoryA', 'test');
pubSub.publish('Posts/CategoryB', 'test');
unSubIds = subIds;
} catch (e) {
done(e);
}
});
});

it('can subscribe to everything with "#" topic', function (done) {
const pubSub = new MQTTPubSub();
let sub;
let expectedMessages = ['test0', 'test1', 'test2', 'test3'];
let messages = [];
const onMessage = message => {
try {
if (messages.length === 3) {
messages.push(message);
expect(messages).to.deep.equal(expectedMessages);
pubSub.unsubscribe(sub);
done();
} else {
messages.push(message);
}
} catch (e) {
done(e);
}
};

pubSub.subscribe('#', onMessage).then(subId => {
expect(subId).to.be.a('number');
pubSub.publish('Posts', 'test0');
pubSub.publish('Posts/A', 'test1');
pubSub.publish('Posts/A/B', 'test2');
pubSub.publish('Posts/A/D/C', 'test3');
sub = subId;
}).catch(err => done(err));
});

it('can subscribe to only specific subset', function (done) {
const pubSub = new MQTTPubSub();
let sub;
let expectedMessages = ['test2', 'test3', 'test4'];
let messages = [];
const onMessage = message => {
try {
if (expectedMessages.indexOf(message) > -1) {
if (messages.length === 2) {
messages.push(message);
expect(messages).to.deep.equal(expectedMessages);
pubSub.unsubscribe(sub);
done();
} else {
messages.push(message);
}
}
} catch (e) {
done(e);
}
};

pubSub.subscribe('Posts/+/D', onMessage).then(subId => {
expect(subId).to.be.a('number');
pubSub.publish('Posts/A', 'test1');
pubSub.publish('Posts/B/D', 'test2');
pubSub.publish('Posts/C/D', 'test3');
pubSub.publish('Posts/E/D', 'test4');
pubSub.publish('Posts/F/G', 'test5');
pubSub.publish('Posts/H/D/I', 'test6');
pubSub.publish('Posts', 'test7');
sub = subId;
}).catch(err => done(err));
});
});

0 comments on commit a8a8bd4

Please sign in to comment.