From 81340785ea6d1ab6b3458fbc39787d998fa0cee3 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 13 Mar 2024 12:06:27 -0400 Subject: [PATCH] add tenant to EventsStream interface subscribe method (#705) Adding `tenant` parameter to the subscribe method in order to better organize the pub/sub. --- src/event-log/event-emitter-stream.ts | 8 ++++---- src/handlers/events-subscribe.ts | 2 +- src/handlers/records-subscribe.ts | 2 +- src/types/subscriptions.ts | 2 +- tests/event-log/event-emitter-stream.spec.ts | 10 +++++----- tests/event-log/event-stream.spec.ts | 8 ++++---- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/event-log/event-emitter-stream.ts b/src/event-log/event-emitter-stream.ts index c180b6dbe..64a943c72 100644 --- a/src/event-log/event-emitter-stream.ts +++ b/src/event-log/event-emitter-stream.ts @@ -39,11 +39,11 @@ export class EventEmitterStream implements EventStream { */ private errorHandler: (error:any) => void = (error) => { console.error('event emitter error', error); }; - async subscribe(id: string, listener: EventListener): Promise { - this.eventEmitter.on(EVENTS_LISTENER_CHANNEL, listener); + async subscribe(tenant: string, id: string, listener: EventListener): Promise { + this.eventEmitter.on(`${tenant}_${EVENTS_LISTENER_CHANNEL}`, listener); return { id, - close: async (): Promise => { this.eventEmitter.off(EVENTS_LISTENER_CHANNEL, listener); } + close: async (): Promise => { this.eventEmitter.off(`${tenant}_${EVENTS_LISTENER_CHANNEL}`, listener); } }; } @@ -64,6 +64,6 @@ export class EventEmitterStream implements EventStream { )); return; } - this.eventEmitter.emit(EVENTS_LISTENER_CHANNEL, tenant, event, indexes); + this.eventEmitter.emit(`${tenant}_${EVENTS_LISTENER_CHANNEL}`, tenant, event, indexes); } } \ No newline at end of file diff --git a/src/handlers/events-subscribe.ts b/src/handlers/events-subscribe.ts index 62f47f2b8..e78ca1832 100644 --- a/src/handlers/events-subscribe.ts +++ b/src/handlers/events-subscribe.ts @@ -57,7 +57,7 @@ export class EventsSubscribeHandler implements MethodHandler { } }; - const subscription = await this.eventStream.subscribe(messageCid, listener); + const subscription = await this.eventStream.subscribe(tenant, messageCid, listener); return { status: { code: 200, detail: 'OK' }, diff --git a/src/handlers/records-subscribe.ts b/src/handlers/records-subscribe.ts index 00889c985..827fc8ebe 100644 --- a/src/handlers/records-subscribe.ts +++ b/src/handlers/records-subscribe.ts @@ -77,7 +77,7 @@ export class RecordsSubscribeHandler implements MethodHandler { }; const messageCid = await Message.getCid(message); - const subscription = await this.eventStream.subscribe(messageCid, listener); + const subscription = await this.eventStream.subscribe(tenant, messageCid, listener); return { status: { code: 200, detail: 'OK' }, subscription diff --git a/src/types/subscriptions.ts b/src/types/subscriptions.ts index 3db405a35..1b7b99309 100644 --- a/src/types/subscriptions.ts +++ b/src/types/subscriptions.ts @@ -18,7 +18,7 @@ export type MessageEvent = { * The EventStream interface implements a pub/sub system based on Message filters. */ export interface EventStream { - subscribe(id: string, listener: EventListener): Promise; + subscribe(tenant: string, id: string, listener: EventListener): Promise; emit(tenant: string, event: MessageEvent, indexes: KeyValues): void; open(): Promise; close(): Promise; diff --git a/tests/event-log/event-emitter-stream.spec.ts b/tests/event-log/event-emitter-stream.spec.ts index 75e86f5f9..e1892c45e 100644 --- a/tests/event-log/event-emitter-stream.spec.ts +++ b/tests/event-log/event-emitter-stream.spec.ts @@ -35,14 +35,14 @@ describe('EventEmitterStream', () => { const emitter = eventStream['eventEmitter']; // count the `events` listeners, which represents all listeners - expect(emitter.listenerCount('events')).to.equal(0); + expect(emitter.listenerCount('did:alice_events')).to.equal(0); - const sub = await eventStream.subscribe('id', () => {}); - expect(emitter.listenerCount('events')).to.equal(1); + const sub = await eventStream.subscribe('did:alice', 'id', () => {}); + expect(emitter.listenerCount('did:alice_events')).to.equal(1); // close the subscription, which should remove the listener await sub.close(); - expect(emitter.listenerCount('events')).to.equal(0); + expect(emitter.listenerCount('did:alice_events')).to.equal(0); }); it('logs message when the emitter experiences an error', async () => { @@ -71,7 +71,7 @@ describe('EventEmitterStream', () => { const messageCid = await Message.getCid(message); messageCids.push(messageCid); }; - const subscription = await eventStream.subscribe('sub-1', handler); + const subscription = await eventStream.subscribe('did:alice', 'sub-1', handler); // close eventStream await eventStream.close(); diff --git a/tests/event-log/event-stream.spec.ts b/tests/event-log/event-stream.spec.ts index ad95eafa8..a752f34f8 100644 --- a/tests/event-log/event-stream.spec.ts +++ b/tests/event-log/event-stream.spec.ts @@ -44,8 +44,8 @@ describe('EventStream', () => { messageCids2.push(messageCid); }; - const subscription1 = await eventStream.subscribe('sub-1', handler1); - const subscription2 = await eventStream.subscribe('sub-2', handler2); + const subscription1 = await eventStream.subscribe('did:alice', 'sub-1', handler1); + const subscription2 = await eventStream.subscribe('did:alice', 'sub-2', handler2); const message1 = await TestDataGenerator.generateRecordsWrite({}); const message1Cid = await Message.getCid(message1.message); @@ -73,7 +73,7 @@ describe('EventStream', () => { const messageCid = await Message.getCid(message); messageCids.push(messageCid); }; - const subscription = await eventStream.subscribe('sub-1', handler); + const subscription = await eventStream.subscribe('did:alice', 'sub-1', handler); const message1 = await TestDataGenerator.generateRecordsWrite({}); const message1Cid = await Message.getCid(message1.message); @@ -95,7 +95,7 @@ describe('EventStream', () => { const messageCid = await Message.getCid(message); messageCids.push(messageCid); }; - const subscription = await eventStream.subscribe('sub-1', handler); + const subscription = await eventStream.subscribe('did:alice', 'sub-1', handler); // close eventStream await eventStream.close();