Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export interface InboxOutboxTransportEvent {
id: number;
eventName: string;
eventPayload: any;
delivedToListeners: string[];
deliveredToListeners: string[];
readyToRetryAfter: number | null;
expireAt: number;
insertedAt: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit {
return Promise.allSettled(
inboxOutboxTransportEvents.map((inboxOutboxTransportEvent) => {
const notDeliveredToListeners = this.transactionalEventEmitter.getListeners(inboxOutboxTransportEvent.eventName).filter((listener) => {
return !inboxOutboxTransportEvent.delivedToListeners.includes(listener.getName());
return !inboxOutboxTransportEvent.deliveredToListeners.includes(listener.getName());
});

return this.inboxOutboxEventProcessor.process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class InboxOutboxEventProcessor implements InboxOutboxEventProcessorContr
}

if (deliveredToListeners.length > 0) {
inboxOutboxTransportEvent.delivedToListeners.push(...deliveredToListeners);
inboxOutboxTransportEvent.deliveredToListeners.push(...deliveredToListeners);
await databaseDriver.persist(inboxOutboxTransportEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ describe('InboxOutboxEventProcessor', () => {

const inboxOutboxTransportEvent : InboxOutboxTransportEvent = {
readyToRetryAfter: new Date().getTime(),
delivedToListeners: [],
deliveredToListeners: [],
eventName: 'newEvent',
eventPayload: {},
expireAt: new Date().getTime() + 1000,
Expand Down Expand Up @@ -116,7 +116,7 @@ describe('InboxOutboxEventProcessor', () => {

const inboxOutboxTransportEvent : InboxOutboxTransportEvent = {
readyToRetryAfter: new Date().getTime(),
delivedToListeners: [],
deliveredToListeners: [],
eventName: 'newEvent',
eventPayload: {},
expireAt: new Date().getTime() + 1000,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Migration } from '@mikro-orm/migrations';

export class MigrationInboxOutbox1733250000 extends Migration {
async up(): Promise<void> {
this.addSql(
this.getKnex()
.schema.alterTable('inbox_outbox_transport_event', (table) => {
table.renameColumn('delived_to_listeners', 'delivered_to_listeners');
})
.toQuery(),
);
}

async down(): Promise<void> {
this.addSql(
this.getKnex()
.schema.alterTable('inbox_outbox_transport_event', (table) => {
table.renameColumn('delivered_to_listeners', 'delived_to_listeners');
})
.toQuery(),
);
}
}
5 changes: 3 additions & 2 deletions packages/mikroorm-driver/src/migrations/migrations.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { MigrationInboxOutbox1708527074 } from "./MigrationInboxOutbox1708527074";

export const InboxOutboxMigrations = [MigrationInboxOutbox1708527074];
import { MigrationInboxOutbox1733250000 } from "./MigrationInboxOutbox1733250000";

export const InboxOutboxMigrations = [MigrationInboxOutbox1708527074, MigrationInboxOutbox1733250000];
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ export class MikroOrmInboxOutboxTransportEvent implements InboxOutboxTransportEv

@Property({
type: 'json',
fieldName: 'delivered_to_listeners',
})
delivedToListeners: string[];
deliveredToListeners: string[];

@Property({ type: 'bigint' })
readyToRetryAfter: number;
Expand All @@ -37,7 +38,7 @@ export class MikroOrmInboxOutboxTransportEvent implements InboxOutboxTransportEv
event.expireAt = expireAt;
event.readyToRetryAfter = readyToRetryAfter;
event.insertedAt = Date.now();
event.delivedToListeners = [];
event.deliveredToListeners = [];
return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('MikroOrmInboxOutboxTransportEvent', () => {
expect(event.eventPayload).toEqual(eventPayload);
expect(event.expireAt).toBe(expireAt);
expect(event.readyToRetryAfter).toBe(readyToRetryAfter);
expect(event.delivedToListeners).toEqual([]);
expect(event.deliveredToListeners).toEqual([]);
expect(event.insertedAt).toBeDefined();
expect(event.insertedAt).toBeLessThanOrEqual(Date.now());
});
Expand Down Expand Up @@ -88,7 +88,7 @@ describe('MikroOrmInboxOutboxTransportEvent', () => {
expect(retrieved).toBeDefined();
expect(retrieved!.eventName).toBe('PersistenceTest');
expect(retrieved!.eventPayload).toEqual(eventPayload);
expect(retrieved!.delivedToListeners).toEqual([]);
expect(retrieved!.deliveredToListeners).toEqual([]);
});

it('should persist JSON payload correctly', async () => {
Expand Down Expand Up @@ -116,22 +116,22 @@ describe('MikroOrmInboxOutboxTransportEvent', () => {
expect(retrieved!.eventPayload).toEqual(complexPayload);
});

it('should persist delivedToListeners as JSON array', async () => {
it('should persist deliveredToListeners as JSON array', async () => {
const event = new MikroOrmInboxOutboxTransportEvent().create(
'ListenersTest',
{},
Date.now() + 60000,
Date.now() + 5000,
);
event.delivedToListeners = ['listener1', 'listener2'];
event.deliveredToListeners = ['listener1', 'listener2'];

orm.em.persist(event);
await orm.em.flush();
orm.em.clear();

const retrieved = await orm.em.findOne(MikroOrmInboxOutboxTransportEvent, { eventName: 'ListenersTest' });

expect(retrieved!.delivedToListeners).toEqual(['listener1', 'listener2']);
expect(retrieved!.deliveredToListeners).toEqual(['listener1', 'listener2']);
});

it('should generate auto-increment id on persist', async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class MigrationInboxOutbox1733250000 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.renameColumn(
'inbox_outbox_transport_event',
'delived_to_listeners',
'delivered_to_listeners'
);
}

async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.renameColumn(
'inbox_outbox_transport_event',
'delivered_to_listeners',
'delived_to_listeners'
);
}
}
3 changes: 2 additions & 1 deletion packages/typeorm-driver/src/migrations/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { MigrationInboxOutbox1726154235704 } from "./MigrationInboxOutbox1726154235704";
import { MigrationInboxOutbox1733250000 } from "./MigrationInboxOutbox1733250000";

export const InboxOutboxTransportEventMigrations = [MigrationInboxOutbox1726154235704]
export const InboxOutboxTransportEventMigrations = [MigrationInboxOutbox1726154235704, MigrationInboxOutbox1733250000]
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ export class TypeOrmInboxOutboxTransportEvent implements InboxOutboxTransportEve

@Column({
type: 'json',
name: 'delived_to_listeners',
name: 'delivered_to_listeners',
})
delivedToListeners: string[];
deliveredToListeners: string[];

@Column({
name: 'ready_to_retry_after',
Expand All @@ -48,7 +48,7 @@ export class TypeOrmInboxOutboxTransportEvent implements InboxOutboxTransportEve
event.expireAt = expireAt;
event.readyToRetryAfter = readyToRetryAfter;
event.insertedAt = Date.now();
event.delivedToListeners = [];
event.deliveredToListeners = [];
return event;
}
}