Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@ await this.transactionalEventEmitter.emitAsync(

> **Note:** Use `emitAsync` if you need to wait for listeners to execute and complete before moving on. Use `emit` if you want to fire-and-forget the event delivery.

### Immediate vs Deferred Processing

By default, events are immediately processed after being saved to the database. You can disable this behavior per-event using the `immediateProcessing` option:

```typescript
{
name: UserApplicationAssignedEvent.name,
listeners: {
expiresAtTTL: 1000 * 60 * 60 * 24,
maxExecutionTimeTTL: 1000 * 15,
readyToRetryAfterTTL: 10000,
},
immediateProcessing: false, // Only save to DB, process later via poller
}
```

**Trade-offs:**

| Immediate Processing (`true`, default) | Deferred Processing (`false`) |
|----------------------------------------|-------------------------------|
| Lower latency for listeners | Higher latency (waits for poller) |
| Best effort delivery on first attempt | All delivery via poller |
| If app crashes during processing, event is still in DB for retry | Safer for crash recovery - no in-flight processing |
| Suitable for most use cases | Suitable for "fire and forget" pattern |

### Event Contract:
Ensure that your event classes implement the `InboxOutboxEvent` interface for consistency and clarity.

Expand All @@ -165,6 +190,7 @@ Ensure that your event classes implement the `InboxOutboxEvent` interface for co
- **readyToRetryAfterTTL**: This is how long it will wait before retrying the event listeners
- **retryEveryMilliseconds**: This is how often it will check for events that need to be retried
- **maxInboxOutboxTransportEventPerRetry**: This is how many events it will retry at a time
- **immediateProcessing** (optional, default: `true`): Whether to immediately process the event after saving to DB. When `true`, events are saved and immediately delivered to listeners. When `false`, events are only saved to DB and processed later by the poller (fire-and-forget pattern)

#### Registration
- Register the `InboxOutboxModule` within your application's bootstrap process, specifying global accessibility and event configurations.
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/emitter/transactional-event-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ export class TransactionalEventEmitter {
persister.persist(inboxOutboxTransportEvent);
await persister.flush();

if (eventOptions.immediateProcessing === false) {
return;
}

if (awaitProcessor) {
await this.inboxOutboxEventProcessor.process(eventOptions, inboxOutboxTransportEvent, this.getListeners(event.name));
return;
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/inbox-outbox.module-definition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ export interface InboxOutboxModuleEventOptions {
readyToRetryAfterTTL: number;
maxExecutionTimeTTL: number;
};
/**
* Whether to immediately process the event after saving to DB.
* When true (default), events are saved and immediately delivered to listeners.
* When false, events are only saved to DB and processed later by the poller.
* Use false for "fire and forget" pattern that's safer for crash recovery.
* @default true
*/
immediateProcessing?: boolean;
}

export interface InboxOutboxModuleOptions {
Expand Down
99 changes: 99 additions & 0 deletions packages/core/src/test/unit/transaction-event-emitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,103 @@ describe('TransacationalEventEmitter', () => {

expect(transactionalEventEmitter.getEventNames()).toContain('eventName');
})

it('Should not call process when immediateProcessing is false', async () => {
inboxOutboxOptions.events = [
{
name: 'newEvent',
listeners: {
expiresAtTTL: 1000,
readyToRetryAfterTTL: 1000,
maxExecutionTimeTTL: 1000,
},
immediateProcessing: false,
},
];

const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);

const newEvent = {
name: 'newEvent',
};

await transactionalEventEmitter.emit(newEvent, []);

expect(mockedDriver.persist).toHaveBeenCalledTimes(1);
expect(mockedDriver.flush).toHaveBeenCalled();
expect(mockedInboxOutboxEventProcessor.process).not.toHaveBeenCalled();
});

it('Should not call process when immediateProcessing is false with emitAsync', async () => {
inboxOutboxOptions.events = [
{
name: 'newEvent',
listeners: {
expiresAtTTL: 1000,
readyToRetryAfterTTL: 1000,
maxExecutionTimeTTL: 1000,
},
immediateProcessing: false,
},
];

const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);

const newEvent = {
name: 'newEvent',
};

await transactionalEventEmitter.emitAsync(newEvent, []);

expect(mockedDriver.persist).toHaveBeenCalledTimes(1);
expect(mockedDriver.flush).toHaveBeenCalled();
expect(mockedInboxOutboxEventProcessor.process).not.toHaveBeenCalled();
});

it('Should call process when immediateProcessing is true', async () => {
inboxOutboxOptions.events = [
{
name: 'newEvent',
listeners: {
expiresAtTTL: 1000,
readyToRetryAfterTTL: 1000,
maxExecutionTimeTTL: 1000,
},
immediateProcessing: true,
},
];

const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);

const newEvent = {
name: 'newEvent',
};

await transactionalEventEmitter.emit(newEvent, []);

expect(mockedInboxOutboxEventProcessor.process).toHaveBeenCalledTimes(1);
});

it('Should call process when immediateProcessing is undefined (default behavior)', async () => {
inboxOutboxOptions.events = [
{
name: 'newEvent',
listeners: {
expiresAtTTL: 1000,
readyToRetryAfterTTL: 1000,
maxExecutionTimeTTL: 1000,
},
},
];

const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);

const newEvent = {
name: 'newEvent',
};

await transactionalEventEmitter.emit(newEvent, []);

expect(mockedInboxOutboxEventProcessor.process).toHaveBeenCalledTimes(1);
});
});
101 changes: 101 additions & 0 deletions packages/mikroorm-driver/src/test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,4 +412,105 @@ describe('Integration Tests', () => {
expect(transportEvents[0].expireAt).toBeGreaterThanOrEqual(beforeEmit + 60000);
});
});

describe('immediateProcessing configuration', () => {
it('should not process event immediately when immediateProcessing is false, but process via poller', async () => {
context = await createTestApp({
events: [
{
name: 'UserCreated',
listeners: {
expiresAtTTL: 60000,
readyToRetryAfterTTL: 50,
maxExecutionTimeTTL: 30000,
},
immediateProcessing: false,
},
],
additionalEntities: [User],
retryEveryMilliseconds: 100,
maxInboxOutboxTransportEventPerRetry: 10,
});

const emitter = context.module.get(TransactionalEventEmitter);
const orm = context.orm;

const handledEvents: UserCreatedEvent[] = [];
const listener: IListener<UserCreatedEvent> = {
getName: () => 'ImmediateProcessingListener',
handle: async (event: UserCreatedEvent) => {
handledEvents.push(event);
},
};
emitter.addListener('UserCreated', listener);

const user = new User();
user.email = 'deferred@example.com';
user.name = 'Deferred User';

const event = new UserCreatedEvent(1, 'deferred@example.com');

await emitter.emitAsync(event, [{ operation: TransactionalEventEmitterOperations.persist, entity: user }]);

expect(handledEvents).toHaveLength(0);

const em = orm.em.fork();
const transportEvents = await em.find(MikroOrmInboxOutboxTransportEvent, { eventName: 'UserCreated' });
expect(transportEvents).toHaveLength(1);

await new Promise(resolve => setTimeout(resolve, 300));

expect(handledEvents).toHaveLength(1);
expect(handledEvents[0]).toMatchObject({
name: 'UserCreated',
userId: 1,
email: 'deferred@example.com',
});
});

it('should process event immediately when immediateProcessing is true (default)', async () => {
context = await createTestApp({
events: [
{
name: 'UserCreated',
listeners: {
expiresAtTTL: 60000,
readyToRetryAfterTTL: 5000,
maxExecutionTimeTTL: 30000,
},
immediateProcessing: true,
},
],
additionalEntities: [User],
retryEveryMilliseconds: 10000,
maxInboxOutboxTransportEventPerRetry: 10,
});

const emitter = context.module.get(TransactionalEventEmitter);

const handledEvents: UserCreatedEvent[] = [];
const listener: IListener<UserCreatedEvent> = {
getName: () => 'ImmediateListener',
handle: async (event: UserCreatedEvent) => {
handledEvents.push(event);
},
};
emitter.addListener('UserCreated', listener);

const user = new User();
user.email = 'immediate@example.com';
user.name = 'Immediate User';

const event = new UserCreatedEvent(1, 'immediate@example.com');

await emitter.emitAsync(event, [{ operation: TransactionalEventEmitterOperations.persist, entity: user }]);

expect(handledEvents).toHaveLength(1);
expect(handledEvents[0]).toMatchObject({
name: 'UserCreated',
userId: 1,
email: 'immediate@example.com',
});
});
});
});
Loading