Skip to content

Commit

Permalink
feat: Replace expiration feature with startAt and endAt
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimvh committed Apr 20, 2023
1 parent 10980e9 commit caee563
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 17 deletions.
5 changes: 3 additions & 2 deletions src/server/notifications/KeyValueSubscriptionStorage.ts
Expand Up @@ -31,7 +31,8 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
topic: subscription.topic,
type: subscription.type,
lastEmit: 0,
expiration: subscription.expiration,
startAt: subscription.startAt,
endAt: subscription.endAt,
accept: subscription.accept,
rate: subscription.rate,
state: subscription.state,
Expand All @@ -42,7 +43,7 @@ export class KeyValueSubscriptionStorage<T extends Record<string, unknown>> impl
public async get(id: string): Promise<SubscriptionInfo<T> | undefined> {
const info = await this.storage.get(id);
if (info && this.isSubscriptionInfo(info)) {
if (typeof info.expiration === 'number' && info.expiration < Date.now()) {
if (typeof info.endAt === 'number' && info.endAt < Date.now()) {
this.logger.info(`Subscription ${id} has expired.`);
await this.locker.withWriteLock(this.getLockKey(id), async(): Promise<void> => {
await this.deleteInfo(info);
Expand Down
6 changes: 6 additions & 0 deletions src/server/notifications/ListeningActivityHandler.ts
Expand Up @@ -45,10 +45,16 @@ export class ListeningActivityHandler extends StaticHandler {
continue;
}

// Don't emit if the previous notification was too recent according to the requested rate
if (info.rate && info.rate > Date.now() - info.lastEmit) {
continue;
}

// Don't emit if we have not yet reached the requested starting time
if (info.startAt && info.startAt > Date.now()) {
continue;
}

// No need to wait on this to resolve before going to the next subscription.
// Prevent failed notification from blocking other notifications.
this.handler.handleSafe({ info, activity, topic }).catch((error): void => {
Expand Down
4 changes: 2 additions & 2 deletions src/server/notifications/NotificationSubscriber.ts
Expand Up @@ -79,9 +79,9 @@ export class NotificationSubscriber extends OperationHttpHandler {
}

if (this.maxDuration) {
const duration = (subscription.expiration ?? Number.POSITIVE_INFINITY) - Date.now();
const duration = (subscription.endAt ?? Number.POSITIVE_INFINITY) - Date.now();
if (duration > this.maxDuration) {
subscription.expiration = Date.now() + this.maxDuration;
subscription.endAt = Date.now() + this.maxDuration;
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/server/notifications/Subscription.ts
Expand Up @@ -16,7 +16,10 @@ export const SUBSCRIBE_SCHEMA = object({
type: string().required(),
topic: string().required(),
state: string().optional(),
expiration: number().transform((value, original): number | undefined =>
startAt: number().transform((value, original): number | undefined =>
// Convert the date string to milliseconds
Date.parse(original)).optional(),
endAt: number().transform((value, original): number | undefined =>
// Convert the date string to milliseconds
Date.parse(original)).optional(),
rate: number().transform((value, original): number | undefined =>
Expand Down
3 changes: 2 additions & 1 deletion src/server/notifications/SubscriptionStorage.ts
Expand Up @@ -9,7 +9,8 @@ export type SubscriptionInfo<T = Record<string, unknown>> = {
id: string;
topic: string;
type: string;
expiration?: number;
startAt?: number;
endAt?: number;
accept?: string;
rate?: number;
state?: string;
Expand Down
Expand Up @@ -70,7 +70,7 @@ describe('A KeyValueSubscriptionStorage', (): void => {
});

it('deletes expired info.', async(): Promise<void> => {
info.expiration = 0;
info.endAt = 0;
await storage.add(info);
await expect(storage.get(info.id)).resolves.toBeUndefined();
expect(internalMap.size).toBe(0);
Expand Down
11 changes: 11 additions & 0 deletions test/unit/server/notifications/ListeningActivityHandler.test.ts
Expand Up @@ -71,6 +71,17 @@ describe('A ListeningActivityHandler', (): void => {
expect(logger.error).toHaveBeenCalledTimes(0);
});

it('does not emit an event on subscriptions if their start time has not been reached.', async(): Promise<void> => {
info.startAt = Date.now() + 100000;

emitter.emit('changed', topic, activity);

await flushPromises();

expect(notificationHandler.handleSafe).toHaveBeenCalledTimes(0);
expect(logger.error).toHaveBeenCalledTimes(0);
});

it('does not stop if one subscription causes an error.', async(): Promise<void> => {
storage.getAll.mockResolvedValue([ info.id, info.id ]);
notificationHandler.handleSafe.mockRejectedValueOnce(new Error('bad input'));
Expand Down
10 changes: 5 additions & 5 deletions test/unit/server/notifications/NotificationSubscriber.test.ts
Expand Up @@ -110,25 +110,25 @@ describe('A NotificationSubscriber', (): void => {

await subscriber.handle({ operation, request, response });
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
expiration: Date.now() + (60 * 60 * 1000),
endAt: Date.now() + (60 * 60 * 1000),
}), { public: {}});

operation.body.data = guardedStreamFrom(JSON.stringify({
...subscriptionBody,
expiration: new Date(Date.now() + 99999999999999).toISOString(),
endAt: new Date(Date.now() + 99999999999999).toISOString(),
}));
await subscriber.handle({ operation, request, response });
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
expiration: Date.now() + (60 * 60 * 1000),
endAt: Date.now() + (60 * 60 * 1000),
}), { public: {}});

operation.body.data = guardedStreamFrom(JSON.stringify({
...subscriptionBody,
expiration: new Date(Date.now() + 5).toISOString(),
endAt: new Date(Date.now() + 5).toISOString(),
}));
await subscriber.handle({ operation, request, response });
expect(subscriptionType.subscribe).toHaveBeenLastCalledWith(expect.objectContaining({
expiration: Date.now() + 5,
endAt: Date.now() + 5,
}), { public: {}});

jest.useRealTimers();
Expand Down
19 changes: 16 additions & 3 deletions test/unit/server/notifications/Subscription.test.ts
Expand Up @@ -40,16 +40,29 @@ describe('A Subscription', (): void => {
await expect(SUBSCRIBE_SCHEMA.isValid(subscription)).resolves.toBe(true);
});

it('converts the expiration date to a number.', async(): Promise<void> => {
it('converts the start date to a number.', async(): Promise<void> => {
const date = '1988-03-09T14:48:00.000Z';
const ms = Date.parse(date);

const subscription: unknown = {
...validSubscription,
expiration: date,
startAt: date,
};
await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({
expiration: ms,
startAt: ms,
}));
});

it('converts the end date to a number.', async(): Promise<void> => {
const date = '1988-03-09T14:48:00.000Z';
const ms = Date.parse(date);

const subscription: unknown = {
...validSubscription,
endAt: date,
};
await expect(SUBSCRIBE_SCHEMA.validate(subscription)).resolves.toEqual(expect.objectContaining({
endAt: ms,
}));
});

Expand Down
Expand Up @@ -44,7 +44,8 @@ describe('A WebHookSubscription2021', (): void => {
topic: 'https://storage.example/resource',
target,
state: undefined,
expiration: undefined,
startAt: undefined,
endAt: undefined,
accept: undefined,
rate: undefined,
};
Expand Down
Expand Up @@ -22,7 +22,8 @@ describe('A WebSocketSubscription2021', (): void => {
type: 'WebSocketSubscription2021',
topic: 'https://storage.example/resource',
state: undefined,
expiration: undefined,
startAt: undefined,
endAt: undefined,
accept: undefined,
rate: undefined,
};
Expand Down

0 comments on commit caee563

Please sign in to comment.