Skip to content

Commit 21dd1c8

Browse files
committed
make it work with mikro-orm .transactional usage? (vibe-kanban 3c966397)
which relies on async local storage see https://mikro-orm.io/docs/transactions and `create-tour-participant.ts` in `~/src/tournee/be` app code would look like this \\`\`\` @transactional() private async createParticipant( auth: AuthContext, division: TourDivisionEntity, input: CreateTourParticipantInput, ) { const nextNumber = await this.getNextParticipantNumber(division); const participant = await this.createSingleParticipant( auth, division, input, nextNumber, ); await this.emitter.emitAsync( TourParticipantsCreated.name, new TourParticipantsCreated([participant]), ); return participant; } \\`\`\` can this lib work with a code such as the one above ? (p.s. how event object, name nad payload is created, that's a detail - dont focus on this now)
1 parent e909d43 commit 21dd1c8

File tree

7 files changed

+399
-8
lines changed

7 files changed

+399
-8
lines changed

packages/core/src/emitter/transactional-event-emitter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ export class TransactionalEventEmitter {
7878
entities: {
7979
operation: TransactionalEventEmitterOperations;
8080
entity: object;
81-
}[],
81+
}[] = [],
8282
customDatabaseDriverPersister?: DatabaseDriverPersister,
8383
): Promise<void> {
8484
return this.emitInternal(event, entities, customDatabaseDriverPersister, false);
@@ -89,7 +89,7 @@ export class TransactionalEventEmitter {
8989
entities: {
9090
operation: TransactionalEventEmitterOperations;
9191
entity: object;
92-
}[],
92+
}[] = [],
9393
customDatabaseDriverPersister?: DatabaseDriverPersister,
9494
): Promise<void> {
9595
return this.emitInternal(event, entities, customDatabaseDriverPersister, true);

packages/mikroorm-driver/README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ new MikroORMDatabaseDriverFactory(orm, {
109109
new MikroORMDatabaseDriverFactory(orm, {
110110
listenNotify: { enabled: false },
111111
})
112+
113+
// Enable AsyncLocalStorage context (for @Transactional() decorator)
114+
new MikroORMDatabaseDriverFactory(orm, {
115+
useContext: true,
116+
})
112117
```
113118

114119
### How It Works
@@ -119,6 +124,55 @@ The `PostgreSQLEventListener`:
119124
- Works alongside polling as a fallback mechanism
120125
- Requires the LISTEN/NOTIFY migration from `InboxOutboxMigrations`
121126

127+
## @Transactional() Decorator Support
128+
129+
The driver supports MikroORM's `@Transactional()` decorator via AsyncLocalStorage context propagation. Enable `useContext: true` to participate in the same transaction:
130+
131+
```typescript
132+
// Module setup
133+
InboxOutboxModule.registerAsync({
134+
imports: [MikroOrmModule],
135+
useFactory: (orm: MikroORM) => ({
136+
driverFactory: new MikroORMDatabaseDriverFactory(orm, {
137+
useContext: true, // Enable context propagation
138+
}),
139+
events: [...],
140+
retryEveryMilliseconds: 30_000,
141+
maxInboxOutboxTransportEventPerRetry: 10,
142+
}),
143+
inject: [MikroORM],
144+
}),
145+
```
146+
147+
```typescript
148+
// Service using @Transactional()
149+
import { Transactional } from '@mikro-orm/core';
150+
151+
@Injectable()
152+
export class OrderService {
153+
constructor(
154+
private em: EntityManager,
155+
private emitter: TransactionalEventEmitter,
156+
) {}
157+
158+
@Transactional()
159+
async createOrder(data: CreateOrderInput) {
160+
const order = this.em.create(Order, data);
161+
this.em.persist(order);
162+
163+
// Event is persisted in the same transaction
164+
await this.emitter.emitAsync(new OrderCreatedEvent(order.id));
165+
166+
return order;
167+
}
168+
}
169+
```
170+
171+
With `useContext: true`:
172+
- Event persistence participates in the `@Transactional()` transaction
173+
- Rollbacks affect both business data and events atomically
174+
- No `em.clear()` after flush (preserves shared identity map)
175+
122176
## Supported Databases
123177

124178
| Database | Real-time Support |

packages/mikroorm-driver/src/driver/mikroorm-database-driver.factory.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,18 @@ import { MikroORMDatabaseDriver } from './mikroorm.database-driver';
55

66
export interface MikroORMDatabaseDriverFactoryOptions {
77
listenNotify?: PostgreSQLEventListenerOptions & { enabled?: boolean };
8+
/**
9+
* When true, uses the current transactional context (via AsyncLocalStorage)
10+
* instead of forking a new EntityManager. This enables compatibility with
11+
* MikroORM's @Transactional() decorator.
12+
* @default false
13+
*/
14+
useContext?: boolean;
815
}
916

1017
export class MikroORMDatabaseDriverFactory {
1118
private eventListener: PostgreSQLEventListener | null = null;
19+
private useContext: boolean;
1220

1321
constructor(
1422
private readonly orm: MikroORM,
@@ -17,11 +25,14 @@ export class MikroORMDatabaseDriverFactory {
1725
if (options?.listenNotify?.enabled !== false) {
1826
this.eventListener = new PostgreSQLEventListener(orm, options?.listenNotify);
1927
}
28+
this.useContext = options?.useContext ?? false;
2029
}
2130

2231
create(eventConfigurationResolver: EventConfigurationResolverContract): DatabaseDriver {
23-
const forkedEm = this.orm.em.fork();
24-
return new MikroORMDatabaseDriver(forkedEm, eventConfigurationResolver);
32+
const em = this.useContext ? this.orm.em.getContext() : this.orm.em.fork();
33+
return new MikroORMDatabaseDriver(em, eventConfigurationResolver, {
34+
clearAfterFlush: !this.useContext,
35+
});
2536
}
2637

2738
getEventListener(): EventListener | null {

packages/mikroorm-driver/src/driver/mikroorm.database-driver.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
import { EntityManager, LockMode } from '@mikro-orm/core';
22
import { DatabaseDriver, EventConfigurationResolverContract, InboxOutboxTransportEvent } from '@nestixis/nestjs-inbox-outbox';
33
import { MikroOrmInboxOutboxTransportEvent } from '../model/mikroorm-inbox-outbox-transport-event.model';
4-
4+
5+
export interface MikroORMDatabaseDriverOptions {
6+
clearAfterFlush?: boolean;
7+
}
8+
59
export class MikroORMDatabaseDriver implements DatabaseDriver {
10+
private readonly clearAfterFlush: boolean;
11+
612
constructor(
7-
private readonly em: EntityManager, private readonly eventConfigurationResolver: EventConfigurationResolverContract) {
13+
private readonly em: EntityManager,
14+
private readonly eventConfigurationResolver: EventConfigurationResolverContract,
15+
options?: MikroORMDatabaseDriverOptions,
16+
) {
17+
this.clearAfterFlush = options?.clearAfterFlush ?? true;
818
}
919

1020
async findAndExtendReadyToRetryEvents(limit: number): Promise<InboxOutboxTransportEvent[]> {
@@ -40,7 +50,9 @@ export class MikroORMDatabaseDriver implements DatabaseDriver {
4050

4151
async flush(): Promise<void> {
4252
await this.em.flush();
43-
this.em.clear();
53+
if (this.clearAfterFlush) {
54+
this.em.clear();
55+
}
4456
}
4557

4658
createInboxOutboxTransportEvent(eventName: string, eventPayload: any, expireAt: number, readyToRetryAfter: number | null): InboxOutboxTransportEvent {

packages/mikroorm-driver/src/test/mikroorm-database-driver-factory.spec.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,40 @@ describe('MikroORMDatabaseDriverFactory', () => {
118118
});
119119
});
120120

121+
describe('useContext option', () => {
122+
it('should use forked EntityManager when useContext is false (default)', () => {
123+
const factory = new MikroORMDatabaseDriverFactory(orm);
124+
const driver = factory.create(createEventConfigResolver());
125+
126+
expect(driver).toBeInstanceOf(MikroORMDatabaseDriver);
127+
});
128+
129+
it('should use context EntityManager when useContext is true', () => {
130+
const factory = new MikroORMDatabaseDriverFactory(orm, {
131+
useContext: true,
132+
});
133+
const driver = factory.create(createEventConfigResolver());
134+
135+
expect(driver).toBeInstanceOf(MikroORMDatabaseDriver);
136+
});
137+
138+
it('should support combining useContext with listenNotify options', () => {
139+
const factory = new MikroORMDatabaseDriverFactory(orm, {
140+
useContext: true,
141+
listenNotify: {
142+
channelName: 'custom_channel',
143+
},
144+
});
145+
146+
const driver = factory.create(createEventConfigResolver());
147+
const listener = factory.getEventListener() as PostgreSQLEventListener;
148+
149+
expect(driver).toBeInstanceOf(MikroORMDatabaseDriver);
150+
expect(listener).toBeInstanceOf(PostgreSQLEventListener);
151+
expect(listener.channelName).toBe('custom_channel');
152+
});
153+
});
154+
121155
describe('getEventListener', () => {
122156
it('should return PostgreSQLEventListener by default', () => {
123157
const factory = new MikroORMDatabaseDriverFactory(orm);

packages/mikroorm-driver/src/test/test-utils.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export interface TestAppConfig {
2020
retryEveryMilliseconds?: number;
2121
maxInboxOutboxTransportEventPerRetry?: number;
2222
databaseType?: DatabaseType;
23+
useContext?: boolean;
2324
}
2425

2526
export interface TestContext {
@@ -112,7 +113,9 @@ export async function createTestApp(config: TestAppConfig): Promise<TestContext>
112113
const inboxOutboxModule = InboxOutboxModule.registerAsync({
113114
imports: [MikroOrmModule],
114115
useFactory: (orm: MikroORM) => {
115-
const driverFactory = new MikroORMDatabaseDriverFactory(orm);
116+
const driverFactory = new MikroORMDatabaseDriverFactory(orm, {
117+
useContext: config.useContext,
118+
});
116119
return {
117120
driverFactory,
118121
events: config.events,

0 commit comments

Comments
 (0)