Skip to content

Commit

Permalink
add tenant to EventsStream interface subscribe method (#705)
Browse files Browse the repository at this point in the history
Adding `tenant` parameter to the subscribe method in order to better
organize the pub/sub.
  • Loading branch information
LiranCohen committed Mar 13, 2024
1 parent aa9a246 commit 8134078
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 16 deletions.
8 changes: 4 additions & 4 deletions src/event-log/event-emitter-stream.ts
Expand Up @@ -39,11 +39,11 @@ export class EventEmitterStream implements EventStream {
*/
private errorHandler: (error:any) => void = (error) => { console.error('event emitter error', error); };

async subscribe(id: string, listener: EventListener): Promise<EventSubscription> {
this.eventEmitter.on(EVENTS_LISTENER_CHANNEL, listener);
async subscribe(tenant: string, id: string, listener: EventListener): Promise<EventSubscription> {
this.eventEmitter.on(`${tenant}_${EVENTS_LISTENER_CHANNEL}`, listener);
return {
id,
close: async (): Promise<void> => { this.eventEmitter.off(EVENTS_LISTENER_CHANNEL, listener); }
close: async (): Promise<void> => { this.eventEmitter.off(`${tenant}_${EVENTS_LISTENER_CHANNEL}`, listener); }
};
}

Expand All @@ -64,6 +64,6 @@ export class EventEmitterStream implements EventStream {
));
return;
}
this.eventEmitter.emit(EVENTS_LISTENER_CHANNEL, tenant, event, indexes);
this.eventEmitter.emit(`${tenant}_${EVENTS_LISTENER_CHANNEL}`, tenant, event, indexes);
}
}
2 changes: 1 addition & 1 deletion src/handlers/events-subscribe.ts
Expand Up @@ -57,7 +57,7 @@ export class EventsSubscribeHandler implements MethodHandler {
}
};

const subscription = await this.eventStream.subscribe(messageCid, listener);
const subscription = await this.eventStream.subscribe(tenant, messageCid, listener);

return {
status: { code: 200, detail: 'OK' },
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/records-subscribe.ts
Expand Up @@ -77,7 +77,7 @@ export class RecordsSubscribeHandler implements MethodHandler {
};

const messageCid = await Message.getCid(message);
const subscription = await this.eventStream.subscribe(messageCid, listener);
const subscription = await this.eventStream.subscribe(tenant, messageCid, listener);
return {
status: { code: 200, detail: 'OK' },
subscription
Expand Down
2 changes: 1 addition & 1 deletion src/types/subscriptions.ts
Expand Up @@ -18,7 +18,7 @@ export type MessageEvent = {
* The EventStream interface implements a pub/sub system based on Message filters.
*/
export interface EventStream {
subscribe(id: string, listener: EventListener): Promise<EventSubscription>;
subscribe(tenant: string, id: string, listener: EventListener): Promise<EventSubscription>;
emit(tenant: string, event: MessageEvent, indexes: KeyValues): void;
open(): Promise<void>;
close(): Promise<void>;
Expand Down
10 changes: 5 additions & 5 deletions tests/event-log/event-emitter-stream.spec.ts
Expand Up @@ -35,14 +35,14 @@ describe('EventEmitterStream', () => {
const emitter = eventStream['eventEmitter'];

// count the `events` listeners, which represents all listeners
expect(emitter.listenerCount('events')).to.equal(0);
expect(emitter.listenerCount('did:alice_events')).to.equal(0);

const sub = await eventStream.subscribe('id', () => {});
expect(emitter.listenerCount('events')).to.equal(1);
const sub = await eventStream.subscribe('did:alice', 'id', () => {});
expect(emitter.listenerCount('did:alice_events')).to.equal(1);

// close the subscription, which should remove the listener
await sub.close();
expect(emitter.listenerCount('events')).to.equal(0);
expect(emitter.listenerCount('did:alice_events')).to.equal(0);
});

it('logs message when the emitter experiences an error', async () => {
Expand Down Expand Up @@ -71,7 +71,7 @@ describe('EventEmitterStream', () => {
const messageCid = await Message.getCid(message);
messageCids.push(messageCid);
};
const subscription = await eventStream.subscribe('sub-1', handler);
const subscription = await eventStream.subscribe('did:alice', 'sub-1', handler);

// close eventStream
await eventStream.close();
Expand Down
8 changes: 4 additions & 4 deletions tests/event-log/event-stream.spec.ts
Expand Up @@ -44,8 +44,8 @@ describe('EventStream', () => {
messageCids2.push(messageCid);
};

const subscription1 = await eventStream.subscribe('sub-1', handler1);
const subscription2 = await eventStream.subscribe('sub-2', handler2);
const subscription1 = await eventStream.subscribe('did:alice', 'sub-1', handler1);
const subscription2 = await eventStream.subscribe('did:alice', 'sub-2', handler2);

const message1 = await TestDataGenerator.generateRecordsWrite({});
const message1Cid = await Message.getCid(message1.message);
Expand Down Expand Up @@ -73,7 +73,7 @@ describe('EventStream', () => {
const messageCid = await Message.getCid(message);
messageCids.push(messageCid);
};
const subscription = await eventStream.subscribe('sub-1', handler);
const subscription = await eventStream.subscribe('did:alice', 'sub-1', handler);

const message1 = await TestDataGenerator.generateRecordsWrite({});
const message1Cid = await Message.getCid(message1.message);
Expand All @@ -95,7 +95,7 @@ describe('EventStream', () => {
const messageCid = await Message.getCid(message);
messageCids.push(messageCid);
};
const subscription = await eventStream.subscribe('sub-1', handler);
const subscription = await eventStream.subscribe('did:alice', 'sub-1', handler);

// close eventStream
await eventStream.close();
Expand Down

0 comments on commit 8134078

Please sign in to comment.