From caee563dd69a34944c9b175654e24946b264c2f9 Mon Sep 17 00:00:00 2001 From: Joachim Van Herwegen Date: Tue, 24 Jan 2023 13:44:49 +0100 Subject: [PATCH] feat: Replace expiration feature with startAt and endAt --- .../KeyValueSubscriptionStorage.ts | 5 +++-- .../notifications/ListeningActivityHandler.ts | 6 ++++++ .../notifications/NotificationSubscriber.ts | 4 ++-- src/server/notifications/Subscription.ts | 5 ++++- .../notifications/SubscriptionStorage.ts | 3 ++- .../KeyValueSubscriptionStorage.test.ts | 2 +- .../ListeningActivityHandler.test.ts | 11 +++++++++++ .../NotificationSubscriber.test.ts | 10 +++++----- .../server/notifications/Subscription.test.ts | 19 ++++++++++++++++--- .../WebHookSubscription2021.test.ts | 3 ++- .../WebSocketSubscription2021.test.ts | 3 ++- 11 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/server/notifications/KeyValueSubscriptionStorage.ts b/src/server/notifications/KeyValueSubscriptionStorage.ts index 59b9cf9c9e..b6badb479c 100644 --- a/src/server/notifications/KeyValueSubscriptionStorage.ts +++ b/src/server/notifications/KeyValueSubscriptionStorage.ts @@ -31,7 +31,8 @@ export class KeyValueSubscriptionStorage> impl topic: subscription.topic, type: subscription.type, lastEmit: 0, - expiration: subscription.expiration, + startAt: subscription.startAt, + endAt: subscription.endAt, accept: subscription.accept, rate: subscription.rate, state: subscription.state, @@ -42,7 +43,7 @@ export class KeyValueSubscriptionStorage> impl public async get(id: string): Promise | undefined> { const info = await this.storage.get(id); if (info && this.isSubscriptionInfo(info)) { - if (typeof info.expiration === 'number' && info.expiration < Date.now()) { + if (typeof info.endAt === 'number' && info.endAt < Date.now()) { this.logger.info(`Subscription ${id} has expired.`); await this.locker.withWriteLock(this.getLockKey(id), async(): Promise => { await this.deleteInfo(info); diff --git a/src/server/notifications/ListeningActivityHandler.ts b/src/server/notifications/ListeningActivityHandler.ts index 772e767614..80a47bda23 100644 --- a/src/server/notifications/ListeningActivityHandler.ts +++ b/src/server/notifications/ListeningActivityHandler.ts @@ -45,10 +45,16 @@ export class ListeningActivityHandler extends StaticHandler { continue; } + // Don't emit if the previous notification was too recent according to the requested rate if (info.rate && info.rate > Date.now() - info.lastEmit) { continue; } + // Don't emit if we have not yet reached the requested starting time + if (info.startAt && info.startAt > Date.now()) { + continue; + } + // No need to wait on this to resolve before going to the next subscription. // Prevent failed notification from blocking other notifications. this.handler.handleSafe({ info, activity, topic }).catch((error): void => { diff --git a/src/server/notifications/NotificationSubscriber.ts b/src/server/notifications/NotificationSubscriber.ts index cd85322afe..054e5a8f57 100644 --- a/src/server/notifications/NotificationSubscriber.ts +++ b/src/server/notifications/NotificationSubscriber.ts @@ -79,9 +79,9 @@ export class NotificationSubscriber extends OperationHttpHandler { } if (this.maxDuration) { - const duration = (subscription.expiration ?? Number.POSITIVE_INFINITY) - Date.now(); + const duration = (subscription.endAt ?? Number.POSITIVE_INFINITY) - Date.now(); if (duration > this.maxDuration) { - subscription.expiration = Date.now() + this.maxDuration; + subscription.endAt = Date.now() + this.maxDuration; } } diff --git a/src/server/notifications/Subscription.ts b/src/server/notifications/Subscription.ts index 390def8a3f..ab60119153 100644 --- a/src/server/notifications/Subscription.ts +++ b/src/server/notifications/Subscription.ts @@ -16,7 +16,10 @@ export const SUBSCRIBE_SCHEMA = object({ type: string().required(), topic: string().required(), state: string().optional(), - expiration: number().transform((value, original): number | undefined => + startAt: number().transform((value, original): number | undefined => + // Convert the date string to milliseconds + Date.parse(original)).optional(), + endAt: number().transform((value, original): number | undefined => // Convert the date string to milliseconds Date.parse(original)).optional(), rate: number().transform((value, original): number | undefined => diff --git a/src/server/notifications/SubscriptionStorage.ts b/src/server/notifications/SubscriptionStorage.ts index 2b1c48887f..f27e17fa14 100644 --- a/src/server/notifications/SubscriptionStorage.ts +++ b/src/server/notifications/SubscriptionStorage.ts @@ -9,7 +9,8 @@ export type SubscriptionInfo> = { id: string; topic: string; type: string; - expiration?: number; + startAt?: number; + endAt?: number; accept?: string; rate?: number; state?: string; diff --git a/test/unit/server/notifications/KeyValueSubscriptionStorage.test.ts b/test/unit/server/notifications/KeyValueSubscriptionStorage.test.ts index 0fb7e0b658..1b20bc2003 100644 --- a/test/unit/server/notifications/KeyValueSubscriptionStorage.test.ts +++ b/test/unit/server/notifications/KeyValueSubscriptionStorage.test.ts @@ -70,7 +70,7 @@ describe('A KeyValueSubscriptionStorage', (): void => { }); it('deletes expired info.', async(): Promise => { - info.expiration = 0; + info.endAt = 0; await storage.add(info); await expect(storage.get(info.id)).resolves.toBeUndefined(); expect(internalMap.size).toBe(0); diff --git a/test/unit/server/notifications/ListeningActivityHandler.test.ts b/test/unit/server/notifications/ListeningActivityHandler.test.ts index c1502e22f8..7d3a5e87dc 100644 --- a/test/unit/server/notifications/ListeningActivityHandler.test.ts +++ b/test/unit/server/notifications/ListeningActivityHandler.test.ts @@ -71,6 +71,17 @@ describe('A ListeningActivityHandler', (): void => { expect(logger.error).toHaveBeenCalledTimes(0); }); + it('does not emit an event on subscriptions if their start time has not been reached.', async(): Promise => { + info.startAt = Date.now() + 100000; + + emitter.emit('changed', topic, activity); + + await flushPromises(); + + expect(notificationHandler.handleSafe).toHaveBeenCalledTimes(0); + expect(logger.error).toHaveBeenCalledTimes(0); + }); + it('does not stop if one subscription causes an error.', async(): Promise => { storage.getAll.mockResolvedValue([ info.id, info.id ]); notificationHandler.handleSafe.mockRejectedValueOnce(new Error('bad input')); diff --git a/test/unit/server/notifications/NotificationSubscriber.test.ts b/test/unit/server/notifications/NotificationSubscriber.test.ts index f0730462ed..b95bb84be6 100644 --- a/test/unit/server/notifications/NotificationSubscriber.test.ts +++ b/test/unit/server/notifications/NotificationSubscriber.test.ts @@ -110,25 +110,25 @@ describe('A NotificationSubscriber', (): void => { await subscriber.handle({ operation, request, response }); expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({ - expiration: Date.now() + (60 * 60 * 1000), + endAt: Date.now() + (60 * 60 * 1000), }), { public: {}}); operation.body.data = guardedStreamFrom(JSON.stringify({ ...subscriptionBody, - expiration: new Date(Date.now() + 99999999999999).toISOString(), + endAt: new Date(Date.now() + 99999999999999).toISOString(), })); await subscriber.handle({ operation, request, response }); expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({ - expiration: Date.now() + (60 * 60 * 1000), + endAt: Date.now() + (60 * 60 * 1000), }), { public: {}}); operation.body.data = guardedStreamFrom(JSON.stringify({ ...subscriptionBody, - expiration: new Date(Date.now() + 5).toISOString(), + endAt: new Date(Date.now() + 5).toISOString(), })); await subscriber.handle({ operation, request, response }); expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({ - expiration: Date.now() + 5, + endAt: Date.now() + 5, }), { public: {}}); jest.useRealTimers(); diff --git a/test/unit/server/notifications/Subscription.test.ts b/test/unit/server/notifications/Subscription.test.ts index 992c731bbe..c768c13acf 100644 --- a/test/unit/server/notifications/Subscription.test.ts +++ b/test/unit/server/notifications/Subscription.test.ts @@ -40,16 +40,29 @@ describe('A Subscription', (): void => { await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(true); }); - it('converts the expiration date to a number.', async(): Promise => { + it('converts the start date to a number.', async(): Promise => { const date = '1988-03-09T14:48:00.000Z'; const ms = Date.parse(date); const subscription: unknown = { ...validSubscription, - expiration: date, + startAt: date, }; await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({ - expiration: ms, + startAt: ms, + })); + }); + + it('converts the end date to a number.', async(): Promise => { + const date = '1988-03-09T14:48:00.000Z'; + const ms = Date.parse(date); + + const subscription: unknown = { + ...validSubscription, + endAt: date, + }; + await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({ + endAt: ms, })); }); diff --git a/test/unit/server/notifications/WebHookSubscription2021/WebHookSubscription2021.test.ts b/test/unit/server/notifications/WebHookSubscription2021/WebHookSubscription2021.test.ts index 9b44b4e192..27cc6f1e6e 100644 --- a/test/unit/server/notifications/WebHookSubscription2021/WebHookSubscription2021.test.ts +++ b/test/unit/server/notifications/WebHookSubscription2021/WebHookSubscription2021.test.ts @@ -44,7 +44,8 @@ describe('A WebHookSubscription2021', (): void => { topic: 'https://storage.example/resource', target, state: undefined, - expiration: undefined, + startAt: undefined, + endAt: undefined, accept: undefined, rate: undefined, }; diff --git a/test/unit/server/notifications/WebSocketSubscription2021/WebSocketSubscription2021.test.ts b/test/unit/server/notifications/WebSocketSubscription2021/WebSocketSubscription2021.test.ts index 659303ac8e..f37e89f416 100644 --- a/test/unit/server/notifications/WebSocketSubscription2021/WebSocketSubscription2021.test.ts +++ b/test/unit/server/notifications/WebSocketSubscription2021/WebSocketSubscription2021.test.ts @@ -22,7 +22,8 @@ describe('A WebSocketSubscription2021', (): void => { type: 'WebSocketSubscription2021', topic: 'https://storage.example/resource', state: undefined, - expiration: undefined, + startAt: undefined, + endAt: undefined, accept: undefined, rate: undefined, };