Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: Test Core Package

on:
push:
branches:
- main
paths:
- 'packages/core/**'
- 'package.json'
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-mikroorm-driver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: Test MikroORM Driver

on:
push:
branches:
- main
paths:
- 'packages/mikroorm-driver/**'
- 'packages/core/**'
Expand Down
64 changes: 48 additions & 16 deletions packages/core/src/poller/retryable-inbox-outbox-event.poller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { EMPTY, catchError, concatMap, from, interval, repeat } from 'rxjs';
import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { EMPTY, Subscription, catchError, concatMap, from, interval, repeat } from 'rxjs';
import { DATABASE_DRIVER_FACTORY_TOKEN, DatabaseDriverFactory } from '../driver/database-driver.factory';
import { TransactionalEventEmitter } from '../emitter/transactional-event-emitter';
import { InboxOutboxModuleOptions, MODULE_OPTIONS_TOKEN } from '../inbox-outbox.module-definition';
Expand All @@ -8,7 +8,11 @@ import { INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN, InboxOutboxEventProcessorContract }
import { EventConfigurationResolver } from '../resolver/event-configuration.resolver';

@Injectable()
export class RetryableInboxOutboxEventPoller implements OnModuleInit {
export class RetryableInboxOutboxEventPoller implements OnModuleInit, OnModuleDestroy {
private subscription: Subscription | null = null;
private inFlightProcessing: Set<Promise<unknown>> = new Set();
private isShuttingDown = false;

constructor(
@Inject(MODULE_OPTIONS_TOKEN) private options: InboxOutboxModuleOptions,
@Inject(DATABASE_DRIVER_FACTORY_TOKEN) private databaseDriverFactory: DatabaseDriverFactory,
Expand All @@ -19,9 +23,12 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit {
) {}
async onModuleInit() {
this.logger.log(`Inbox options: retryEveryMilliseconds: ${this.options.retryEveryMilliseconds}, maxInboxOutboxTransportEventPerRetry: ${this.options.maxInboxOutboxTransportEventPerRetry}, events: ${JSON.stringify(this.options.events)}, driver: ${this.options.driverFactory.constructor.name}`);
interval(this.options.retryEveryMilliseconds)
this.subscription = interval(this.options.retryEveryMilliseconds)
.pipe(
concatMap(() => {
if (this.isShuttingDown) {
return EMPTY;
}
return from(this.poolRetryableEvents());
}),
catchError((exception) => {
Expand All @@ -34,6 +41,24 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit {
.subscribe();
}

async onModuleDestroy() {
this.isShuttingDown = true;
this.logger.log('Shutting down RetryableInboxOutboxEventPoller...');

if (this.subscription) {
this.subscription.unsubscribe();
this.subscription = null;
}

if (this.inFlightProcessing.size > 0) {
this.logger.log(`Waiting for ${this.inFlightProcessing.size} in-flight event(s) to complete...`);
await Promise.allSettled([...this.inFlightProcessing]);
this.logger.log('All in-flight events completed.');
}

this.logger.log('RetryableInboxOutboxEventPoller shutdown complete.');
}

async poolRetryableEvents() {
try {
const maxInboxOutboxTransportEventPerRetry = this.options.maxInboxOutboxTransportEventPerRetry;
Expand All @@ -55,18 +80,25 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit {
}

private async processAsynchronousRetryableEvents(inboxOutboxTransportEvents: InboxOutboxTransportEvent[]) {
return Promise.allSettled(
inboxOutboxTransportEvents.map((inboxOutboxTransportEvent) => {
const notDeliveredToListeners = this.transactionalEventEmitter.getListeners(inboxOutboxTransportEvent.eventName).filter((listener) => {
return !inboxOutboxTransportEvent.deliveredToListeners.includes(listener.getName());
});
const processingPromises = inboxOutboxTransportEvents.map((inboxOutboxTransportEvent) => {
const notDeliveredToListeners = this.transactionalEventEmitter.getListeners(inboxOutboxTransportEvent.eventName).filter((listener) => {
return !inboxOutboxTransportEvent.deliveredToListeners.includes(listener.getName());
});

const processingPromise = this.inboxOutboxEventProcessor.process(
this.options.events.find((event) => event.name === inboxOutboxTransportEvent.eventName),
inboxOutboxTransportEvent,
notDeliveredToListeners,
);

this.inFlightProcessing.add(processingPromise);
processingPromise.finally(() => {
this.inFlightProcessing.delete(processingPromise);
});

return processingPromise;
});

return this.inboxOutboxEventProcessor.process(
this.options.events.find((event) => event.name === inboxOutboxTransportEvent.eventName),
inboxOutboxTransportEvent,
notDeliveredToListeners,
);
}),
);
return Promise.allSettled(processingPromises);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { Logger } from '@nestjs/common';
import { DatabaseDriverFactory } from '../../driver/database-driver.factory';
import { DatabaseDriver } from '../../driver/database.driver';
import { TransactionalEventEmitter } from '../../emitter/transactional-event-emitter';
import { InboxOutboxModuleOptions } from '../../inbox-outbox.module-definition';
import { RetryableInboxOutboxEventPoller } from '../../poller/retryable-inbox-outbox-event.poller';
import { InboxOutboxEventProcessorContract } from '../../processor/inbox-outbox-event-processor.contract';
import { EventConfigurationResolver } from '../../resolver/event-configuration.resolver';
import { createMockedDriverFactory } from './mock/driver-factory.mock';
import { createMockedDriver } from './mock/driver.mock';
import { createMockedInboxOutboxOptionsFactory } from './mock/inbox-outbox-options.mock';

describe('RetryableInboxOutboxEventPoller', () => {
let mockedDriver: DatabaseDriver;
let mockedDriverFactory: DatabaseDriverFactory;
let inboxOutboxOptions: InboxOutboxModuleOptions;
let mockLogger: Logger;
let mockTransactionalEventEmitter: TransactionalEventEmitter;
let mockEventConfigurationResolver: EventConfigurationResolver;
let mockInboxOutboxEventProcessor: InboxOutboxEventProcessorContract;

beforeEach(() => {
jest.useFakeTimers();
mockedDriver = createMockedDriver();
mockedDriverFactory = createMockedDriverFactory(mockedDriver);
inboxOutboxOptions = createMockedInboxOutboxOptionsFactory(mockedDriverFactory, [
{
name: 'testEvent',
listeners: {
expiresAtTTL: 1000,
readyToRetryAfterTTL: 1000,
maxExecutionTimeTTL: 1000,
},
},
]);
mockLogger = {
log: jest.fn(),
error: jest.fn(),
warn: jest.fn(),
debug: jest.fn(),
} as unknown as Logger;

mockTransactionalEventEmitter = {
getListeners: jest.fn().mockReturnValue([]),
} as unknown as TransactionalEventEmitter;

mockEventConfigurationResolver = {} as EventConfigurationResolver;

mockInboxOutboxEventProcessor = {
process: jest.fn().mockResolvedValue(undefined),
};
});

afterEach(() => {
jest.useRealTimers();
});

function createPoller() {
return new RetryableInboxOutboxEventPoller(
inboxOutboxOptions,
mockedDriverFactory,
mockInboxOutboxEventProcessor,
mockTransactionalEventEmitter,
mockEventConfigurationResolver,
mockLogger,
);
}

describe('onModuleDestroy', () => {
it('should unsubscribe from interval on shutdown', async () => {
const poller = createPoller();
await poller.onModuleInit();

await poller.onModuleDestroy();

expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
});

it('should stop polling after shutdown is initiated', async () => {
(mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([]);
const poller = createPoller();
await poller.onModuleInit();

jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds);
await Promise.resolve();

const callCountBeforeShutdown = (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mock.calls.length;

await poller.onModuleDestroy();

jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds * 5);
await Promise.resolve();

const callCountAfterShutdown = (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mock.calls.length;
expect(callCountAfterShutdown).toBe(callCountBeforeShutdown);
});

it('should wait for in-flight processing to complete before shutdown', async () => {
let resolveProcessing: () => void;
const processingPromise = new Promise<void>((resolve) => {
resolveProcessing = resolve;
});

(mockInboxOutboxEventProcessor.process as jest.Mock).mockReturnValue(processingPromise);

const mockEvent = {
id: 1,
eventName: 'testEvent',
eventPayload: {},
deliveredToListeners: [],
readyToRetryAfter: Date.now(),
expireAt: Date.now() + 1000,
insertedAt: Date.now(),
};
(mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([mockEvent]);

const poller = createPoller();
await poller.onModuleInit();

jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds);
await Promise.resolve();
await Promise.resolve();

const shutdownPromise = poller.onModuleDestroy();

expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
expect(mockLogger.log).toHaveBeenCalledWith(expect.stringContaining('Waiting for'));

let shutdownCompleted = false;
shutdownPromise.then(() => {
shutdownCompleted = true;
});

await Promise.resolve();
expect(shutdownCompleted).toBe(false);

resolveProcessing!();
await shutdownPromise;

expect(mockLogger.log).toHaveBeenCalledWith('All in-flight events completed.');
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
});

it('should handle shutdown when no in-flight processing exists', async () => {
(mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([]);
const poller = createPoller();
await poller.onModuleInit();

await poller.onModuleDestroy();

expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
expect(mockLogger.log).not.toHaveBeenCalledWith(expect.stringContaining('Waiting for'));
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
});

it('should handle shutdown gracefully even if called before onModuleInit', async () => {
const poller = createPoller();

await poller.onModuleDestroy();

expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
});
});
});
Loading