diff --git a/.github/workflows/test-core.yml b/.github/workflows/test-core.yml index 54aee5c..6122181 100644 --- a/.github/workflows/test-core.yml +++ b/.github/workflows/test-core.yml @@ -2,6 +2,8 @@ name: Test Core Package on: push: + branches: + - main paths: - 'packages/core/**' - 'package.json' diff --git a/.github/workflows/test-mikroorm-driver.yml b/.github/workflows/test-mikroorm-driver.yml index 49a03b5..e2c5a4b 100644 --- a/.github/workflows/test-mikroorm-driver.yml +++ b/.github/workflows/test-mikroorm-driver.yml @@ -2,6 +2,8 @@ name: Test MikroORM Driver on: push: + branches: + - main paths: - 'packages/mikroorm-driver/**' - 'packages/core/**' diff --git a/packages/core/src/poller/retryable-inbox-outbox-event.poller.ts b/packages/core/src/poller/retryable-inbox-outbox-event.poller.ts index 699baed..f741336 100644 --- a/packages/core/src/poller/retryable-inbox-outbox-event.poller.ts +++ b/packages/core/src/poller/retryable-inbox-outbox-event.poller.ts @@ -1,5 +1,5 @@ -import { Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common'; -import { EMPTY, catchError, concatMap, from, interval, repeat } from 'rxjs'; +import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; +import { EMPTY, Subscription, catchError, concatMap, from, interval, repeat } from 'rxjs'; import { DATABASE_DRIVER_FACTORY_TOKEN, DatabaseDriverFactory } from '../driver/database-driver.factory'; import { TransactionalEventEmitter } from '../emitter/transactional-event-emitter'; import { InboxOutboxModuleOptions, MODULE_OPTIONS_TOKEN } from '../inbox-outbox.module-definition'; @@ -8,7 +8,11 @@ import { INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN, InboxOutboxEventProcessorContract } import { EventConfigurationResolver } from '../resolver/event-configuration.resolver'; @Injectable() -export class RetryableInboxOutboxEventPoller implements OnModuleInit { +export class RetryableInboxOutboxEventPoller implements OnModuleInit, OnModuleDestroy { + private subscription: Subscription | null = null; + private inFlightProcessing: Set> = new Set(); + private isShuttingDown = false; + constructor( @Inject(MODULE_OPTIONS_TOKEN) private options: InboxOutboxModuleOptions, @Inject(DATABASE_DRIVER_FACTORY_TOKEN) private databaseDriverFactory: DatabaseDriverFactory, @@ -19,9 +23,12 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit { ) {} async onModuleInit() { this.logger.log(`Inbox options: retryEveryMilliseconds: ${this.options.retryEveryMilliseconds}, maxInboxOutboxTransportEventPerRetry: ${this.options.maxInboxOutboxTransportEventPerRetry}, events: ${JSON.stringify(this.options.events)}, driver: ${this.options.driverFactory.constructor.name}`); - interval(this.options.retryEveryMilliseconds) + this.subscription = interval(this.options.retryEveryMilliseconds) .pipe( concatMap(() => { + if (this.isShuttingDown) { + return EMPTY; + } return from(this.poolRetryableEvents()); }), catchError((exception) => { @@ -34,6 +41,24 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit { .subscribe(); } + async onModuleDestroy() { + this.isShuttingDown = true; + this.logger.log('Shutting down RetryableInboxOutboxEventPoller...'); + + if (this.subscription) { + this.subscription.unsubscribe(); + this.subscription = null; + } + + if (this.inFlightProcessing.size > 0) { + this.logger.log(`Waiting for ${this.inFlightProcessing.size} in-flight event(s) to complete...`); + await Promise.allSettled([...this.inFlightProcessing]); + this.logger.log('All in-flight events completed.'); + } + + this.logger.log('RetryableInboxOutboxEventPoller shutdown complete.'); + } + async poolRetryableEvents() { try { const maxInboxOutboxTransportEventPerRetry = this.options.maxInboxOutboxTransportEventPerRetry; @@ -55,18 +80,25 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit { } private async processAsynchronousRetryableEvents(inboxOutboxTransportEvents: InboxOutboxTransportEvent[]) { - return Promise.allSettled( - inboxOutboxTransportEvents.map((inboxOutboxTransportEvent) => { - const notDeliveredToListeners = this.transactionalEventEmitter.getListeners(inboxOutboxTransportEvent.eventName).filter((listener) => { - return !inboxOutboxTransportEvent.deliveredToListeners.includes(listener.getName()); - }); + const processingPromises = inboxOutboxTransportEvents.map((inboxOutboxTransportEvent) => { + const notDeliveredToListeners = this.transactionalEventEmitter.getListeners(inboxOutboxTransportEvent.eventName).filter((listener) => { + return !inboxOutboxTransportEvent.deliveredToListeners.includes(listener.getName()); + }); + + const processingPromise = this.inboxOutboxEventProcessor.process( + this.options.events.find((event) => event.name === inboxOutboxTransportEvent.eventName), + inboxOutboxTransportEvent, + notDeliveredToListeners, + ); + + this.inFlightProcessing.add(processingPromise); + processingPromise.finally(() => { + this.inFlightProcessing.delete(processingPromise); + }); + + return processingPromise; + }); - return this.inboxOutboxEventProcessor.process( - this.options.events.find((event) => event.name === inboxOutboxTransportEvent.eventName), - inboxOutboxTransportEvent, - notDeliveredToListeners, - ); - }), - ); + return Promise.allSettled(processingPromises); } } diff --git a/packages/core/src/test/unit/retryable-inbox-outbox-event-poller.spec.ts b/packages/core/src/test/unit/retryable-inbox-outbox-event-poller.spec.ts new file mode 100644 index 0000000..9efb3ee --- /dev/null +++ b/packages/core/src/test/unit/retryable-inbox-outbox-event-poller.spec.ts @@ -0,0 +1,166 @@ +import { Logger } from '@nestjs/common'; +import { DatabaseDriverFactory } from '../../driver/database-driver.factory'; +import { DatabaseDriver } from '../../driver/database.driver'; +import { TransactionalEventEmitter } from '../../emitter/transactional-event-emitter'; +import { InboxOutboxModuleOptions } from '../../inbox-outbox.module-definition'; +import { RetryableInboxOutboxEventPoller } from '../../poller/retryable-inbox-outbox-event.poller'; +import { InboxOutboxEventProcessorContract } from '../../processor/inbox-outbox-event-processor.contract'; +import { EventConfigurationResolver } from '../../resolver/event-configuration.resolver'; +import { createMockedDriverFactory } from './mock/driver-factory.mock'; +import { createMockedDriver } from './mock/driver.mock'; +import { createMockedInboxOutboxOptionsFactory } from './mock/inbox-outbox-options.mock'; + +describe('RetryableInboxOutboxEventPoller', () => { + let mockedDriver: DatabaseDriver; + let mockedDriverFactory: DatabaseDriverFactory; + let inboxOutboxOptions: InboxOutboxModuleOptions; + let mockLogger: Logger; + let mockTransactionalEventEmitter: TransactionalEventEmitter; + let mockEventConfigurationResolver: EventConfigurationResolver; + let mockInboxOutboxEventProcessor: InboxOutboxEventProcessorContract; + + beforeEach(() => { + jest.useFakeTimers(); + mockedDriver = createMockedDriver(); + mockedDriverFactory = createMockedDriverFactory(mockedDriver); + inboxOutboxOptions = createMockedInboxOutboxOptionsFactory(mockedDriverFactory, [ + { + name: 'testEvent', + listeners: { + expiresAtTTL: 1000, + readyToRetryAfterTTL: 1000, + maxExecutionTimeTTL: 1000, + }, + }, + ]); + mockLogger = { + log: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + } as unknown as Logger; + + mockTransactionalEventEmitter = { + getListeners: jest.fn().mockReturnValue([]), + } as unknown as TransactionalEventEmitter; + + mockEventConfigurationResolver = {} as EventConfigurationResolver; + + mockInboxOutboxEventProcessor = { + process: jest.fn().mockResolvedValue(undefined), + }; + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + function createPoller() { + return new RetryableInboxOutboxEventPoller( + inboxOutboxOptions, + mockedDriverFactory, + mockInboxOutboxEventProcessor, + mockTransactionalEventEmitter, + mockEventConfigurationResolver, + mockLogger, + ); + } + + describe('onModuleDestroy', () => { + it('should unsubscribe from interval on shutdown', async () => { + const poller = createPoller(); + await poller.onModuleInit(); + + await poller.onModuleDestroy(); + + expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...'); + expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.'); + }); + + it('should stop polling after shutdown is initiated', async () => { + (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([]); + const poller = createPoller(); + await poller.onModuleInit(); + + jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds); + await Promise.resolve(); + + const callCountBeforeShutdown = (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mock.calls.length; + + await poller.onModuleDestroy(); + + jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds * 5); + await Promise.resolve(); + + const callCountAfterShutdown = (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mock.calls.length; + expect(callCountAfterShutdown).toBe(callCountBeforeShutdown); + }); + + it('should wait for in-flight processing to complete before shutdown', async () => { + let resolveProcessing: () => void; + const processingPromise = new Promise((resolve) => { + resolveProcessing = resolve; + }); + + (mockInboxOutboxEventProcessor.process as jest.Mock).mockReturnValue(processingPromise); + + const mockEvent = { + id: 1, + eventName: 'testEvent', + eventPayload: {}, + deliveredToListeners: [], + readyToRetryAfter: Date.now(), + expireAt: Date.now() + 1000, + insertedAt: Date.now(), + }; + (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([mockEvent]); + + const poller = createPoller(); + await poller.onModuleInit(); + + jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds); + await Promise.resolve(); + await Promise.resolve(); + + const shutdownPromise = poller.onModuleDestroy(); + + expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...'); + expect(mockLogger.log).toHaveBeenCalledWith(expect.stringContaining('Waiting for')); + + let shutdownCompleted = false; + shutdownPromise.then(() => { + shutdownCompleted = true; + }); + + await Promise.resolve(); + expect(shutdownCompleted).toBe(false); + + resolveProcessing!(); + await shutdownPromise; + + expect(mockLogger.log).toHaveBeenCalledWith('All in-flight events completed.'); + expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.'); + }); + + it('should handle shutdown when no in-flight processing exists', async () => { + (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([]); + const poller = createPoller(); + await poller.onModuleInit(); + + await poller.onModuleDestroy(); + + expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...'); + expect(mockLogger.log).not.toHaveBeenCalledWith(expect.stringContaining('Waiting for')); + expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.'); + }); + + it('should handle shutdown gracefully even if called before onModuleInit', async () => { + const poller = createPoller(); + + await poller.onModuleDestroy(); + + expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...'); + expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.'); + }); + }); +});