diff --git a/packages/api/.env.example b/packages/api/.env.example index 9ba4626b..42d208a0 100644 --- a/packages/api/.env.example +++ b/packages/api/.env.example @@ -12,5 +12,5 @@ COOKIE_SECRET="89YHGASP9DYFHWEOIH098yasd8rtwepuddrw902" PROCESS_ST_CHECKLIST_URL="https://app.process.st/workflows/CodersCamp-Test-Checklist-kTrxJoZgP-9IabhRbohIrw/run-link" EVENT_REPOSITORY="prisma" -SUBSCRIPTION_QUEUE_MAX_RETRY_COUNT=10; -SUBSCRIPTION_QUEUE_WAITING_TIME_ON_RETRY_MS=100; +SUBSCRIPTION_QUEUE_MAX_RETRY_COUNT=10 +SUBSCRIPTION_QUEUE_WAITING_TIME_ON_RETRY_MS=100 \ No newline at end of file diff --git a/packages/api/.env.test b/packages/api/.env.test index 9c4ef933..7b947e05 100644 --- a/packages/api/.env.test +++ b/packages/api/.env.test @@ -12,3 +12,5 @@ COOKIE_SECRET="786Tgiu3564Rti8oh89YG6re6" PROCESS_ST_CHECKLIST_URL="https://app.process.st/workflows/CodersCamp-Test-Checklist-kTrxJoZgP-9IabhRbohIrw/run-link" EVENT_REPOSITORY="prisma" +SUBSCRIPTION_QUEUE_MAX_RETRY_COUNT=10 +SUBSCRIPTION_QUEUE_WAITING_TIME_ON_RETRY_MS=100 \ No newline at end of file diff --git a/packages/api/src/module/automation/send-email-when-learning-materials-url-was-generated/learning-materials-url-was-generated-event-handler.service.ts b/packages/api/src/module/automation/send-email-when-learning-materials-url-was-generated/learning-materials-url-was-generated-event-handler.service.ts index 9c13d18a..21f74671 100644 --- a/packages/api/src/module/automation/send-email-when-learning-materials-url-was-generated/learning-materials-url-was-generated-event-handler.service.ts +++ b/packages/api/src/module/automation/send-email-when-learning-materials-url-was-generated/learning-materials-url-was-generated-event-handler.service.ts @@ -4,6 +4,7 @@ import { LearningMaterialsUrlWasGenerated } from '@/events/learning-materials-ur import { ApplicationEvent } from '@/module/application-command-events'; import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription'; import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry'; +import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager'; // fixme: example for automation, implement later @Injectable() @@ -17,7 +18,7 @@ export class LearningMaterialsUrlWasGeneratedEventHandler implements OnApplicati .subscription('SendEmailWhenLearningMaterialsUrlWasGenerated_Automation_v1') .onEvent( 'LearningMaterialsUrlWasGenerated', - this.onLearningMaterialsUrlWasGenerated, + LearningMaterialsUrlWasGeneratedEventHandler.onLearningMaterialsUrlWasGenerated, ) .build(); await this.eventsSubscription.start(); @@ -27,5 +28,8 @@ export class LearningMaterialsUrlWasGeneratedEventHandler implements OnApplicati await this.eventsSubscription.stop(); } - onLearningMaterialsUrlWasGenerated(_event: ApplicationEvent) {} + private static onLearningMaterialsUrlWasGenerated( + _context: PrismaTransactionContext, + _event: ApplicationEvent, + ) {} } diff --git a/packages/api/src/module/automation/when-user-registration-was-started-then-request-email-confirmation/user-registration-was-started.event-handler.ts b/packages/api/src/module/automation/when-user-registration-was-started-then-request-email-confirmation/user-registration-was-started.event-handler.ts index 3fdf57be..56d7c1b6 100644 --- a/packages/api/src/module/automation/when-user-registration-was-started-then-request-email-confirmation/user-registration-was-started.event-handler.ts +++ b/packages/api/src/module/automation/when-user-registration-was-started-then-request-email-confirmation/user-registration-was-started.event-handler.ts @@ -1,5 +1,4 @@ -import {Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; -import { CommandBus } from '@nestjs/cqrs'; +import { Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; import { ApplicationEvent } from '@/module/application-command-events'; import { @@ -10,13 +9,13 @@ import { UserRegistrationWasStarted } from '@/module/events/user-registration-wa import { ApplicationCommandFactory } from '@/write/shared/application/application-command.factory'; import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription'; import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry'; +import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager'; @Injectable() export class UserRegistrationWasStartedEventHandler implements OnApplicationBootstrap, OnModuleDestroy { private eventsSubscription: EventsSubscription; constructor( - private readonly commandBus: CommandBus, private readonly commandFactory: ApplicationCommandFactory, private readonly eventsSubscriptionsFactory: EventsSubscriptionsRegistry, ) {} @@ -24,9 +23,7 @@ export class UserRegistrationWasStartedEventHandler implements OnApplicationBoot async onApplicationBootstrap() { this.eventsSubscription = this.eventsSubscriptionsFactory .subscription('WhenUserRegistrationWasStartedThenRequestEmailConfirmation_Automation_v1') - .onEvent('UserRegistrationWasStarted', (event) => - this.onUserRegistrationWasStarted(event), - ) + .onEvent('UserRegistrationWasStarted', this.onUserRegistrationWasStarted.bind(this)) .build(); await this.eventsSubscription.start(); } @@ -35,7 +32,10 @@ export class UserRegistrationWasStartedEventHandler implements OnApplicationBoot await this.eventsSubscription.stop(); } - async onUserRegistrationWasStarted(event: ApplicationEvent) { + async onUserRegistrationWasStarted( + context: PrismaTransactionContext, + event: ApplicationEvent, + ) { const command = this.commandFactory.applicationCommand((idGenerator) => ({ class: RequestEmailConfirmationApplicationCommand, ...requestEmailConfirmationCommand({ @@ -46,6 +46,6 @@ export class UserRegistrationWasStartedEventHandler implements OnApplicationBoot metadata: { correlationId: event.metadata.correlationId, causationId: event.id }, })); - await this.commandBus.execute(command); + await context.executeCommand(command); } } diff --git a/packages/api/src/module/read/course-progress/course-progress.read-module.ts b/packages/api/src/module/read/course-progress/course-progress.read-module.ts index e08d400c..7b881cfb 100644 --- a/packages/api/src/module/read/course-progress/course-progress.read-module.ts +++ b/packages/api/src/module/read/course-progress/course-progress.read-module.ts @@ -1,14 +1,12 @@ -import {Module, OnApplicationBootstrap, OnModuleDestroy} from '@nestjs/common'; +import { Module, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; import { LearningMaterialsUrlWasGenerated } from '@/events/learning-materials-url-was-generated.domain-event'; import { ApplicationEvent } from '@/module/application-command-events'; import { TaskWasCompleted } from '@/module/events/task-was-completed.domain-event'; import { TaskWasUncompleted } from '@/module/events/task-was-uncompleted-event.domain-event'; -import { - EventsSubscription, - PrismaTransactionManager, -} from '@/write/shared/application/events-subscription/events-subscription'; +import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription'; import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry'; +import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager'; import { SharedModule } from '@/write/shared/shared.module'; import { CourseProgressRestController } from './course-progress.rest-controller'; @@ -40,46 +38,46 @@ export class CourseProgressReadModule implements OnApplicationBootstrap, OnModul await this.eventsSubscription.stop(); } - private static async onInitialPosition(_position: number, context: { transaction: PrismaTransactionManager }) { - await context.transaction.courseProgress.deleteMany({}); + private static onInitialPosition(context: PrismaTransactionContext, _position: number) { + return context.executeWithTransaction((prisma) => prisma.courseProgress.deleteMany({})); } - private static async onLearningMaterialsUrlWasGenerated( + private static onLearningMaterialsUrlWasGenerated( + context: PrismaTransactionContext, event: ApplicationEvent, - context: { transaction: PrismaTransactionManager }, ) { - await context.transaction.courseProgress.upsert({ - where: { learningMaterialsId: event.data.learningMaterialsId }, - create: { - learningMaterialsId: event.data.learningMaterialsId, - learningMaterialsCompletedTasks: 0, - courseUserId: event.data.courseUserId, - }, - update: { courseUserId: event.data.courseUserId }, - }); + return context.executeWithTransaction((prisma) => + prisma.courseProgress.upsert({ + where: { learningMaterialsId: event.data.learningMaterialsId }, + create: { + learningMaterialsId: event.data.learningMaterialsId, + learningMaterialsCompletedTasks: 0, + courseUserId: event.data.courseUserId, + }, + update: { courseUserId: event.data.courseUserId }, + }), + ); } - private static async onTaskWasCompleted( - event: ApplicationEvent, - context: { transaction: PrismaTransactionManager }, - ) { - await context.transaction.courseProgress.upsert( - CourseProgressReadModule.courseProgressStateUpdate({ - learningMaterialsId: event.data.learningMaterialsId, - completedTasks: 'increment', - }), + private static onTaskWasCompleted(context: PrismaTransactionContext, event: ApplicationEvent) { + return context.executeWithTransaction((prisma) => + prisma.courseProgress.upsert( + CourseProgressReadModule.courseProgressStateUpdate({ + learningMaterialsId: event.data.learningMaterialsId, + completedTasks: 'increment', + }), + ), ); } - private static async onTaskWasUncompleted( - event: ApplicationEvent, - context: { transaction: PrismaTransactionManager }, - ) { - await context.transaction.courseProgress.upsert( - CourseProgressReadModule.courseProgressStateUpdate({ - learningMaterialsId: event.data.learningMaterialsId, - completedTasks: 'decrement', - }), + private static onTaskWasUncompleted(context: PrismaTransactionContext, event: ApplicationEvent) { + return context.executeWithTransaction((prisma) => + prisma.courseProgress.upsert( + CourseProgressReadModule.courseProgressStateUpdate({ + learningMaterialsId: event.data.learningMaterialsId, + completedTasks: 'decrement', + }), + ), ); } diff --git a/packages/api/src/module/read/learning-materials/learning-materials.read-module.ts b/packages/api/src/module/read/learning-materials/learning-materials.read-module.ts index e096dea9..c79facd5 100644 --- a/packages/api/src/module/read/learning-materials/learning-materials.read-module.ts +++ b/packages/api/src/module/read/learning-materials/learning-materials.read-module.ts @@ -1,12 +1,10 @@ -import {Module, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; +import { Module, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; import { LearningMaterialsUrlWasGenerated } from '@/events/learning-materials-url-was-generated.domain-event'; import { ApplicationEvent } from '@/module/application-command-events'; -import { - EventsSubscription, - PrismaTransactionManager, -} from '@/write/shared/application/events-subscription/events-subscription'; +import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription'; import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry'; +import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager'; import { SharedModule } from '@/write/shared/shared.module'; import { LearningMaterialsRestController } from './learning-materials.rest-controller'; @@ -23,10 +21,10 @@ export class LearningMaterialsReadModule implements OnApplicationBootstrap, OnMo async onApplicationBootstrap() { this.eventsSubscription = this.eventsSubscriptionsFactory .subscription('LearningMaterials_ReadModel_v1') - .onInitialPosition(this.onInitialPosition) + .onInitialPosition(LearningMaterialsReadModule.onInitialPosition) .onEvent( 'LearningMaterialsUrlWasGenerated', - this.onLearningMaterialsUrlWasGenerated, + LearningMaterialsReadModule.onLearningMaterialsUrlWasGenerated, ) .build(); await this.eventsSubscription.start(); @@ -36,20 +34,22 @@ export class LearningMaterialsReadModule implements OnApplicationBootstrap, OnMo await this.eventsSubscription.stop(); } - async onInitialPosition(_position: number, context: { transaction: PrismaTransactionManager }) { - await context.transaction.learningMaterials.deleteMany({}); + private static onInitialPosition(context: PrismaTransactionContext, _position: number) { + return context.executeWithTransaction((prisma) => prisma.learningMaterials.deleteMany({})); } - async onLearningMaterialsUrlWasGenerated( + private static onLearningMaterialsUrlWasGenerated( + context: PrismaTransactionContext, event: ApplicationEvent, - context: { transaction: PrismaTransactionManager }, ) { - await context.transaction.learningMaterials.create({ - data: { - id: event.data.learningMaterialsId, - courseUserId: event.data.courseUserId, - url: event.data.materialsUrl, - }, - }); + return context.executeWithTransaction((prisma) => + prisma.learningMaterials.create({ + data: { + id: event.data.learningMaterialsId, + courseUserId: event.data.courseUserId, + url: event.data.materialsUrl, + }, + }), + ); } } diff --git a/packages/api/src/module/write/email-confirmation/application/approve-email-confirmation-application-command-handler.ts b/packages/api/src/module/write/email-confirmation/application/approve-email-confirmation-application-command-handler.ts index e8139450..a41cfc2a 100644 --- a/packages/api/src/module/write/email-confirmation/application/approve-email-confirmation-application-command-handler.ts +++ b/packages/api/src/module/write/email-confirmation/application/approve-email-confirmation-application-command-handler.ts @@ -23,7 +23,7 @@ export class ApproveEmailConfirmationApplicationCommandHandler await this.applicationService.execute( eventStream, - { causationId: command.id, correlationId: command.metadata.correlationId }, + command, approveEmailConfirmation(command), ); } diff --git a/packages/api/src/module/write/email-confirmation/application/request-email-confirmation-application-command-handler.ts b/packages/api/src/module/write/email-confirmation/application/request-email-confirmation-application-command-handler.ts index be58fd71..92f14715 100644 --- a/packages/api/src/module/write/email-confirmation/application/request-email-confirmation-application-command-handler.ts +++ b/packages/api/src/module/write/email-confirmation/application/request-email-confirmation-application-command-handler.ts @@ -23,7 +23,7 @@ export class RequestEmailConfirmationApplicationCommandHandler await this.applicationService.execute( eventStream, - { causationId: command.id, correlationId: command.metadata.correlationId }, + command, requestEmailConfirmation(command), ); } diff --git a/packages/api/src/module/write/learning-materials-tasks/application/complete-task.command-handler.ts b/packages/api/src/module/write/learning-materials-tasks/application/complete-task.command-handler.ts index 3bf718c2..5be5ec15 100644 --- a/packages/api/src/module/write/learning-materials-tasks/application/complete-task.command-handler.ts +++ b/packages/api/src/module/write/learning-materials-tasks/application/complete-task.command-handler.ts @@ -18,13 +18,8 @@ export class CompleteTaskCommandHandler implements ICommandHandler { const eventStream = EventStreamName.from('LearningMaterialsTasks', command.data.learningMaterialsId); - await this.applicationService.execute( - eventStream, - { - causationId: command.id, - correlationId: command.metadata.correlationId, - }, - (pastEvents) => completeTask(pastEvents, command), + await this.applicationService.execute(eventStream, command, (pastEvents) => + completeTask(pastEvents, command), ); } } diff --git a/packages/api/src/module/write/learning-materials-url/application/generate-learning-materials-url.command-handler.ts b/packages/api/src/module/write/learning-materials-url/application/generate-learning-materials-url.command-handler.ts index b31f23a8..6c59d12b 100644 --- a/packages/api/src/module/write/learning-materials-url/application/generate-learning-materials-url.command-handler.ts +++ b/packages/api/src/module/write/learning-materials-url/application/generate-learning-materials-url.command-handler.ts @@ -28,10 +28,8 @@ export class GenerateLearningMaterialsUrlCommandHandler const eventStream = EventStreamName.from('LearningMaterialsUrl', command.data.courseUserId); - await this.applicationService.execute( - eventStream, - { causationId: command.id, correlationId: command.metadata.correlationId }, - (pastEvents) => generateLearningMaterialsUrl(pastEvents, command, learningMaterials.url, learningMaterials.id), + await this.applicationService.execute(eventStream, command, (pastEvents) => + generateLearningMaterialsUrl(pastEvents, command, learningMaterials.url, learningMaterials.id), ); } } diff --git a/packages/api/src/module/write/shared/application/application-service.ts b/packages/api/src/module/write/shared/application/application-service.ts index 92a500e0..827cd7d6 100644 --- a/packages/api/src/module/write/shared/application/application-service.ts +++ b/packages/api/src/module/write/shared/application/application-service.ts @@ -1,12 +1,10 @@ -import { ApplicationEvent } from '@/module/application-command-events'; +import { ApplicationCommand, ApplicationEvent } from '@/module/application-command-events'; import { DomainEvent } from '@/module/domain.event'; import { EventStreamName } from './event-stream-name.value-object'; export const APPLICATION_SERVICE = Symbol('APPLICATION_SERVICE'); -export type ApplicationExecutionContext = { correlationId: string; causationId?: string }; - export type DomainLogic = ( pastEvents: EventType[], ) => EventType[] | Promise; @@ -15,7 +13,7 @@ export type EventStream = export interface ApplicationService { execute( streamName: EventStreamName, - context: ApplicationExecutionContext, + command: ApplicationCommand, domainLogic: DomainLogic, ): Promise; } diff --git a/packages/api/src/module/write/shared/application/event-repository.ts b/packages/api/src/module/write/shared/application/event-repository.ts index 1876afb6..125c9e88 100644 --- a/packages/api/src/module/write/shared/application/event-repository.ts +++ b/packages/api/src/module/write/shared/application/event-repository.ts @@ -13,7 +13,6 @@ export type StorableEvent< > = Omit, 'globalOrder' | 'streamVersion' | 'streamName'>; export type ReadAllFilter = { streamCategory?: string; eventTypes?: string[]; fromGlobalPosition?: number }; - export interface EventRepository { read(streamName: EventStreamName): Promise; diff --git a/packages/api/src/module/write/shared/application/events-subscription/events-subscription-builder.ts b/packages/api/src/module/write/shared/application/events-subscription/events-subscription-builder.ts index ca3d1efc..75b12ce1 100644 --- a/packages/api/src/module/write/shared/application/events-subscription/events-subscription-builder.ts +++ b/packages/api/src/module/write/shared/application/events-subscription/events-subscription-builder.ts @@ -12,6 +12,7 @@ import { SubscriptionId, SubscriptionOptions, } from '@/write/shared/application/events-subscription/events-subscription'; +import { PrismaTransactionManagerFactory } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager-factory'; export interface NeedsEventOrPositionHandlers { onInitialPosition(handle: OnPositionFn): MoreEventHandlersOrBuild; @@ -35,6 +36,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv private readonly options: SubscriptionOptions, private readonly positionHandlers: PositionHandler[] = [], private readonly eventHandlers: ApplicationEventHandler[] = [], + private readonly transactionManagerFactory: PrismaTransactionManagerFactory, ) {} onInitialPosition(handle: OnPositionFn): MoreEventHandlersOrBuild { @@ -51,6 +53,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv this.options, [...this.positionHandlers, handlerToRegister], this.eventHandlers, + this.transactionManagerFactory, ); } @@ -71,6 +74,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv this.options, this.positionHandlers, [...this.eventHandlers, handlerToRegister], + this.transactionManagerFactory, ); } @@ -85,6 +89,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv this.prismaService, this.eventRepository, this.eventEmitter, + this.transactionManagerFactory, ); } } diff --git a/packages/api/src/module/write/shared/application/events-subscription/events-subscription.fixture.spec.ts b/packages/api/src/module/write/shared/application/events-subscription/events-subscription.fixture.spec.ts index 196326fa..c32d6c42 100644 --- a/packages/api/src/module/write/shared/application/events-subscription/events-subscription.fixture.spec.ts +++ b/packages/api/src/module/write/shared/application/events-subscription/events-subscription.fixture.spec.ts @@ -11,7 +11,7 @@ import { SharedModule } from '@/write/shared/shared.module'; import { eventEmitterRootModule } from '../../../../../event-emitter.root-module'; import { ApplicationEventBus } from '../application.event-bus'; -import { SubscriptionOptions } from './events-subscription'; +import { OnEventFn, OnPositionFn, SubscriptionOptions } from './events-subscription'; export async function initTestEventsSubscription() { const app = await initWriteTestModule({ @@ -101,9 +101,9 @@ class EventsSubscriptionConcurrencyTestFixtureImpl implements EventsSubscription export async function initEventsSubscriptionConcurrencyTestFixture(options: SubscriptionOptions['queue']) { const fixtureBase = await initTestEventsSubscription(); - const onInitialPosition = jest.fn(); - const onSampleDomainEvent = jest.fn(); - const onAnotherSampleDomainEvent = jest.fn(); + const onInitialPosition: jest.MockedFunction = jest.fn(); + const onSampleDomainEvent: jest.MockedFunction> = jest.fn(); + const onAnotherSampleDomainEvent: jest.MockedFunction> = jest.fn(); const sut = fixtureBase.eventsSubscriptions .subscription(fixtureBase.randomUuid(), { queue: options }) diff --git a/packages/api/src/module/write/shared/application/events-subscription/events-subscription.spec.ts b/packages/api/src/module/write/shared/application/events-subscription/events-subscription.spec.ts index 649c5ee7..b9b64d6f 100644 --- a/packages/api/src/module/write/shared/application/events-subscription/events-subscription.spec.ts +++ b/packages/api/src/module/write/shared/application/events-subscription/events-subscription.spec.ts @@ -13,7 +13,11 @@ import { } from '@/shared/test-utils'; import { using } from '@/shared/using'; import { EventStreamName } from '@/write/shared/application/event-stream-name.value-object'; -import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription'; +import { + EventsSubscription, + OnEventFn, + OnPositionFn, +} from '@/write/shared/application/events-subscription/events-subscription'; import { initEventsSubscriptionConcurrencyTestFixture, @@ -24,9 +28,9 @@ describe('Events subscription', () => { let sut: AsyncReturnType; let subscription: EventsSubscription; - const onInitialPosition = jest.fn(); - const onSampleDomainEvent = jest.fn(); - const onAnotherSampleDomainEvent = jest.fn(); + const onInitialPosition: jest.MockedFunction = jest.fn(); + const onSampleDomainEvent: jest.MockedFunction> = jest.fn(); + const onAnotherSampleDomainEvent: jest.MockedFunction> = jest.fn(); beforeEach(async () => { onInitialPosition.mockReset(); @@ -170,7 +174,7 @@ describe('Events subscription', () => { let lastEventValue: string | undefined; - onAnotherSampleDomainEvent.mockImplementation((e) => { + onAnotherSampleDomainEvent.mockImplementation((_context, e) => { lastEventValue = e.data.value1; }); @@ -284,10 +288,10 @@ describe('Events subscription concurrency tests', () => { const processedEvents: ApplicationEvent[] = []; - onSampleDomainEvent.mockImplementation((event) => { + onSampleDomainEvent.mockImplementation((_context, event) => { processedEvents.push(event); }); - onAnotherSampleDomainEvent.mockImplementation((event) => { + onAnotherSampleDomainEvent.mockImplementation((_context, event) => { processedEvents.push(event); }); diff --git a/packages/api/src/module/write/shared/application/events-subscription/events-subscription.ts b/packages/api/src/module/write/shared/application/events-subscription/events-subscription.ts index ee0f7286..aa205cef 100644 --- a/packages/api/src/module/write/shared/application/events-subscription/events-subscription.ts +++ b/packages/api/src/module/write/shared/application/events-subscription/events-subscription.ts @@ -1,13 +1,17 @@ /* eslint-disable no-await-in-loop */ import { Logger } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; -import { PrismaClient } from '@prisma/client'; import { retry, RetryConfig, wait } from 'ts-retry-promise'; import { ApplicationEvent } from '@/module/application-command-events'; import { DomainEvent } from '@/module/domain.event'; import { PrismaService } from '@/prisma/prisma.service'; import { EventRepository } from '@/write/shared/application/event-repository'; +import { + PrismaTransactionClient, + PrismaTransactionContext, +} from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager'; +import { PrismaTransactionManagerFactory } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager-factory'; import { OrderedEventQueue } from './ordered-event-queue'; @@ -27,11 +31,9 @@ export type SubscriptionOptions = { retry?: SubscriptionRetriesConfig; }; -export type PrismaTransactionManager = Omit; - export type OnEventFn = ( + context: PrismaTransactionContext, event: ApplicationEvent, - context: { transaction: PrismaTransactionManager }, ) => Promise | void; export type ApplicationEventHandler = { @@ -39,10 +41,7 @@ export type ApplicationEventHandler = { readonly onEvent: OnEventFn; }; -export type OnPositionFn = ( - position: number, - context: { transaction: PrismaTransactionManager }, -) => Promise | void; +export type OnPositionFn = (context: PrismaTransactionContext, position: number) => Promise | void; export type PositionHandler = { readonly position: number; @@ -78,6 +77,7 @@ export class EventsSubscription { private readonly prismaService: PrismaService, private readonly eventRepository: EventRepository, private readonly eventEmitter: EventEmitter2, + private readonly transactionManagerFactory: PrismaTransactionManagerFactory, ) {} /** @@ -193,10 +193,14 @@ export class EventsSubscription { this.eventsRetryCount.delete(event.id); try { - // TODO add transaction - await this.processSubscriptionPositionChange(event, this.prismaService); - await this.processEvent(event, this.prismaService); - await this.moveCurrentPosition(event.globalOrder, this.prismaService); + const transactionManager = this.transactionManagerFactory.create(); + + await this.processSubscriptionPositionChange(event, transactionManager); + await this.processEvent(event, transactionManager); + transactionManager.executeWithTransaction((prismaClient) => + this.moveCurrentPosition(event.globalOrder, prismaClient), + ); + await transactionManager.executeTransaction(); } catch (e) { await this.stop(); this.logger.warn( @@ -222,24 +226,24 @@ export class EventsSubscription { await wait(this.configuration.options.queue.waitingTimeOnRetry); } - private async processEvent(event: ApplicationEvent, transaction: PrismaTransactionManager) { + private async processEvent(event: ApplicationEvent, context: PrismaTransactionContext) { await Promise.all( this.configuration.eventHandlers .filter((handler) => handler.eventType === event.type) - .map((handler) => handler.onEvent(event, { transaction })), + .map((handler) => handler.onEvent(context, event)), ); } - private async processSubscriptionPositionChange(event: ApplicationEvent, transaction: PrismaTransactionManager) { + private async processSubscriptionPositionChange(event: ApplicationEvent, context: PrismaTransactionContext) { await Promise.all( this.configuration.positionHandlers .filter((handler) => handler.position === event.globalOrder) - .map((handler) => handler.onPosition(event.globalOrder, { transaction })), + .map((handler) => handler.onPosition(context, event.globalOrder)), ); } - private async moveCurrentPosition(position: number, transaction: PrismaTransactionManager) { - await transaction.eventsSubscription.upsert({ + private moveCurrentPosition(position: number, prismaClient: PrismaTransactionClient) { + return prismaClient.eventsSubscription.upsert({ where: { id: this.subscriptionId, }, diff --git a/packages/api/src/module/write/shared/application/events-subscription/events-subscriptions-registry.ts b/packages/api/src/module/write/shared/application/events-subscription/events-subscriptions-registry.ts index 6d37dac7..aa976428 100644 --- a/packages/api/src/module/write/shared/application/events-subscription/events-subscriptions-registry.ts +++ b/packages/api/src/module/write/shared/application/events-subscription/events-subscriptions-registry.ts @@ -12,6 +12,7 @@ import { NeedsEventOrPositionHandlers, SubscriptionBuilder, } from '@/write/shared/application/events-subscription/events-subscription-builder'; +import { PrismaTransactionManagerFactory } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager-factory'; export interface CanCreateSubscription { subscription(id: SubscriptionId, config?: Partial): NeedsEventOrPositionHandlers; @@ -23,6 +24,7 @@ export class EventsSubscriptionsRegistry implements CanCreateSubscription { private readonly prismaService: PrismaService, @Inject(EVENT_REPOSITORY) private readonly eventRepository: EventRepository, private readonly eventEmitter: EventEmitter2, + private readonly transactionManagerFactory: PrismaTransactionManagerFactory, ) {} subscription(id: SubscriptionId, options?: Partial): NeedsEventOrPositionHandlers { @@ -46,6 +48,7 @@ export class EventsSubscriptionsRegistry implements CanCreateSubscription { startConfig, [], [], + this.transactionManagerFactory, ); } } diff --git a/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager-factory.ts b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager-factory.ts new file mode 100644 index 00000000..a7283524 --- /dev/null +++ b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager-factory.ts @@ -0,0 +1,16 @@ +import { Injectable } from '@nestjs/common'; +import { CommandBus } from '@nestjs/cqrs'; +import { v4 as uuid } from 'uuid'; + +import { PrismaService } from '@/shared/prisma/prisma.service'; + +import { PrismaTransactionManager } from './prisma-transaction-manager'; + +@Injectable() +export class PrismaTransactionManagerFactory { + constructor(private readonly commandBus: CommandBus, private readonly prismaService: PrismaService) {} + + create() { + return new PrismaTransactionManager(this.commandBus, this.prismaService, uuid()); + } +} diff --git a/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.fixture.spec.ts b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.fixture.spec.ts new file mode 100644 index 00000000..57ad8dbb --- /dev/null +++ b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.fixture.spec.ts @@ -0,0 +1,39 @@ +import { Test } from '@nestjs/testing'; +import { AsyncReturnType } from 'type-fest'; + +import { PrismaService } from '@/shared/prisma/prisma.service'; +import { cleanupDatabase, getCommandBusSpy } from '@/shared/test-utils'; + +import { eventEmitterRootModule } from '../../../../../event-emitter.root-module'; +import { SharedModule } from '../../shared.module'; +import { PrismaTransactionManagerFactory } from './prisma-transaction-manager-factory'; + +export type PrismaTransactionManagerTestFixture = AsyncReturnType; + +export async function initPrismaTransactionManagerTestModule() { + const app = await Test.createTestingModule({ + imports: [eventEmitterRootModule, SharedModule], + }).compile(); + + await app.init(); + + const prismaService = app.get(PrismaService); + const factory = app.get(PrismaTransactionManagerFactory); + const execute = getCommandBusSpy(app); + const sut = factory.create(); + const sut1 = factory.create(); + + async function close() { + await app.close(); + await cleanupDatabase(prismaService); + await prismaService.$disconnect(); + } + + return { + sut, + sut1, + close, + prismaService, + mocks: { commandBus: { execute } }, + }; +} diff --git a/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.spec.ts b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.spec.ts new file mode 100644 index 00000000..5717dd01 --- /dev/null +++ b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.spec.ts @@ -0,0 +1,114 @@ +import { sampleDatabaseEvent } from '@/shared/test-utils'; + +import { + initPrismaTransactionManagerTestModule, + PrismaTransactionManagerTestFixture, +} from './prisma-transaction-manager.fixture.spec'; + +describe('PrismaTransactionManager', () => { + let fixture: PrismaTransactionManagerTestFixture; + + beforeEach(async () => { + fixture = await initPrismaTransactionManagerTestModule(); + }); + + afterEach(async () => { + await fixture.close(); + }); + + it('Given some registered callbacks when executeWithTransaction then all registered changes should be persisted', async () => { + // Given + const { sut, prismaService } = fixture; + const sampleEvent0 = sampleDatabaseEvent(); + const sampleEvent1 = sampleDatabaseEvent(); + const sampleEvent2 = sampleDatabaseEvent(); + const sampleEvent3 = sampleDatabaseEvent(); + const expected = [sampleEvent0, sampleEvent1, sampleEvent2, sampleEvent3].map((event, i) => ({ + ...event, + globalOrder: i + 1, + })); + + sut.executeWithTransaction((prisma) => { + return prisma.event.createMany({ data: [sampleEvent0, sampleEvent1] }); + }); + + sut.executeWithTransaction((prisma) => { + return prisma.event.create({ data: sampleEvent2 }); + }); + + sut.executeWithTransaction((prisma) => { + return prisma.event.create({ data: sampleEvent3 }); + }); + + // When + await sut.executeTransaction(); + + // Then + const events = await prismaService.event.findMany({}); + + expect(events).toStrictEqual(expected); + }); + + it('Given some registered callbacks when executeWithTransaction throws error then none of registered changes should be persisted', async () => { + // Given + const { sut, prismaService } = fixture; + const sampleEvent0 = sampleDatabaseEvent(); + const sampleEvent1 = sampleDatabaseEvent(); + const sampleEvent2 = sampleDatabaseEvent(); + const sampleEvent3 = sampleDatabaseEvent(); + const expected: Event[] = []; + + sut.executeWithTransaction((prisma) => { + return prisma.event.createMany({ data: [sampleEvent0, sampleEvent1] }); + }); + + sut.executeWithTransaction((prisma) => { + return prisma.event.create({ data: sampleEvent2 }); + }); + + sut.executeWithTransaction((prisma) => { + return prisma.event.create({ data: sampleEvent3 }); + }); + + sut.executeWithTransaction((prisma) => { + return prisma.$queryRaw`RAISE 'throw inside transaction'`; + }); + + // When-Then + await expect(sut.executeTransaction()).rejects.toThrow(); + + const events = await prismaService.event.findMany({}); + + expect(events).toStrictEqual(expected); + }); + + it(`Given two transactions [sut0, sut1] running concurrently with some registered callbacks + When sut0 throws error and sut1 don't + Then for sut0 none of registered changes should be persisted + Then for sut1 all registered changes should be persisted`, async () => { + // Given + const { sut: sut0, sut1, prismaService } = fixture; + const sampleEvent0 = sampleDatabaseEvent(); + const sampleEvent1 = sampleDatabaseEvent(); + const expected = [expect.objectContaining(sampleEvent1)]; + + sut0.executeWithTransaction((prisma) => { + return prisma.event.create({ data: sampleEvent0 }); + }); + + sut0.executeWithTransaction((prisma) => { + return prisma.$queryRaw`RAISE 'throw inside transaction'`; + }); + + sut1.executeWithTransaction((prisma) => { + return prisma.event.create({ data: sampleEvent1 }); + }); + + // When-Then + await Promise.all([sut1.executeTransaction(), expect(sut0.executeTransaction()).rejects.toThrow()]); + + const events = await prismaService.event.findMany({}); + + expect(events).toEqual(expected); + }); +}); diff --git a/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.ts b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.ts new file mode 100644 index 00000000..d5c1e386 --- /dev/null +++ b/packages/api/src/module/write/shared/application/prisma-transaction-manager/prisma-transaction-manager.ts @@ -0,0 +1,105 @@ +import { CommandBus } from '@nestjs/cqrs'; +import { PrismaClient, PrismaPromise } from '@prisma/client'; + +import { ApplicationCommand, DefaultCommandMetadata } from '@/module/application-command-events'; +import { PrismaService } from '@/shared/prisma/prisma.service'; + +type PrismaTransactionClient = Omit; + +type WithTransactionCallback = PrismaPromise> = ( + prismaClient: PrismaTransactionClient, +) => TPrismaPromise; +type AfterTransactionCallback = (prismaClient: PrismaTransactionClient) => void | Promise; + +interface PrismaTransactionContext { + executeWithTransaction>( + cb: WithTransactionCallback, + ): void; + + executeCommand(command: TApplicationCommand): Promise; + + executeAfterTransaction(cb: AfterTransactionCallback): void; + + trxId: string; +} + +type PrismaTransactionManagerState = 'created' | 'executing' | 'executed'; + +class PrismaTransactionManager implements PrismaTransactionContext { + private readonly afterTransactionCallbacks: AfterTransactionCallback[] = []; + + private readonly withTransactionCallbacks: WithTransactionCallback[] = []; + + private internalState: PrismaTransactionManagerState = 'created'; + + public get state() { + return this.internalState; + } + + static CONTEXT_TOKEN = Symbol('PRISMA_TRANSACTION_MANAGER_CONTEXT_TOKEN'); + + constructor( + private readonly commandBus: CommandBus, + private readonly prismaService: PrismaService, + public readonly trxId: string, + ) {} + + async executeTransaction(): Promise { + this.validateInternalState(); + this.internalState = 'executing'; + + try { + const prismaPromises = this.withTransactionCallbacks.map((cb) => cb(this.prismaService)); + + await this.prismaService.$transaction(prismaPromises); + await Promise.all(this.afterTransactionCallbacks.map((cb) => cb(this.prismaService))); + } finally { + this.internalState = 'executed'; + this.cleanup(); + } + } + + cleanup() { + this.afterTransactionCallbacks.length = 0; + this.withTransactionCallbacks.length = 0; + } + + executeWithTransaction>( + cb: WithTransactionCallback, + ): void { + this.validateInternalState(); + this.withTransactionCallbacks.push(cb); + } + + executeCommand< + TApplicationCommand extends ApplicationCommand, DefaultCommandMetadata>, + >(command: TApplicationCommand): Promise { + this.validateInternalState(); + this.injectContextIntoCommand(command); + + return this.commandBus.execute(command); + } + + executeAfterTransaction(cb: AfterTransactionCallback): void { + this.validateInternalState(); + this.afterTransactionCallbacks.push(cb); + } + + private validateInternalState() { + if (this.internalState !== 'created') { + throw new Error(`Invalid operation on PrismaTransactionManager(${this.trxId}) state(${this.internalState})`); + } + } + + private injectContextIntoCommand(command: ApplicationCommand) { + // eslint-disable-next-line no-param-reassign + (command as unknown as Record)[PrismaTransactionManager.CONTEXT_TOKEN] = this; + } + + static getContextFromCommand(command: ApplicationCommand): PrismaTransactionContext | undefined { + return (command as unknown as Record)[PrismaTransactionManager.CONTEXT_TOKEN]; + } +} + +export { PrismaTransactionManager }; +export type { PrismaTransactionClient, PrismaTransactionContext }; diff --git a/packages/api/src/module/write/shared/infrastructure/application-service/event-application-service.ts b/packages/api/src/module/write/shared/infrastructure/application-service/event-application-service.ts index 3f7d9250..8a8585c0 100644 --- a/packages/api/src/module/write/shared/infrastructure/application-service/event-application-service.ts +++ b/packages/api/src/module/write/shared/infrastructure/application-service/event-application-service.ts @@ -1,15 +1,22 @@ import { Inject } from '@nestjs/common'; -import { ApplicationEvent } from '@/module/application-command-events'; +import { ApplicationCommand, ApplicationEvent, DefaultCommandMetadata } from '@/module/application-command-events'; import { DomainEvent } from '@/module/domain.event'; import { ApplicationEventBus } from '../../application/application.event-bus'; -import { ApplicationExecutionContext, ApplicationService, DomainLogic } from '../../application/application-service'; +import { ApplicationService, DomainLogic } from '../../application/application-service'; import { EVENT_REPOSITORY, EventRepository, StorableEvent } from '../../application/event-repository'; import { EventStreamName } from '../../application/event-stream-name.value-object'; import { EventStreamVersion } from '../../application/event-stream-version'; import { ID_GENERATOR, IdGenerator } from '../../application/id-generator'; +import { + PrismaTransactionContext, + PrismaTransactionManager, +} from '../../application/prisma-transaction-manager/prisma-transaction-manager'; import { TIME_PROVIDER, TimeProvider } from '../../application/time-provider.port'; +import { PrismaTransactionEventRepository } from '../transaction-event-repository/prisma-transaction-event-repository'; + +type ApplicationMetadataContext = { correlationId: string; causationId?: string }; export class EventApplicationService implements ApplicationService { constructor( @@ -17,13 +24,59 @@ export class EventApplicationService implements ApplicationService { @Inject(TIME_PROVIDER) private readonly timeProvider: TimeProvider, @Inject(ID_GENERATOR) private readonly idGenerator: IdGenerator, private readonly eventBus: ApplicationEventBus, + private readonly transactionEventRepository: PrismaTransactionEventRepository, ) {} - async execute( + execute>>( + streamName: EventStreamName, + command: ApplicationCommand, + domainLogic: DomainLogic, + ): Promise { + const context = PrismaTransactionManager.getContextFromCommand(command); + const metadata: ApplicationMetadataContext = { + correlationId: command.id, + causationId: command.metadata?.causationId, + }; + + return context === undefined + ? this.executeIndependently(streamName, metadata, domainLogic) + : this.executeWithTransaction(streamName, metadata, domainLogic, context); + } + + async executeWithTransaction( + streamName: EventStreamName, + metadata: ApplicationMetadataContext, + domainLogic: DomainLogic, + context: PrismaTransactionContext, + ): Promise { + const { eventsToStore, streamVersion } = await this.executeDomainLogic(streamName, metadata, domainLogic); + + const eventsToPublishIds = this.transactionEventRepository.write(streamName, eventsToStore, streamVersion, context); + + return context.executeAfterTransaction(async (prisma) => { + const eventsToPublish = await this.transactionEventRepository.readAll(eventsToPublishIds, prisma); + + await this.eventBus.publishAll(eventsToPublish); + }); + } + + async executeIndependently( streamName: EventStreamName, - context: ApplicationExecutionContext, + metadata: ApplicationMetadataContext, domainLogic: DomainLogic, ): Promise { + const { eventsToStore, streamVersion } = await this.executeDomainLogic(streamName, metadata, domainLogic); + + const eventsToPublish = await this.eventRepository.write(streamName, eventsToStore, streamVersion); + + await this.eventBus.publishAll(eventsToPublish); + } + + async executeDomainLogic( + streamName: EventStreamName, + metadata: ApplicationMetadataContext, + domainLogic: DomainLogic, + ) { const eventStream = await this.eventRepository.read(streamName); const streamVersion = EventApplicationService.streamVersion(eventStream); @@ -33,19 +86,17 @@ export class EventApplicationService implements ApplicationService { }), ); - const eventsToStore: StorableEvent[] = resultDomainEvents.map((e, index) => ({ + const eventsToStore: StorableEvent[] = resultDomainEvents.map((e, index) => ({ data: e.data, type: e.type, id: this.idGenerator.generate(), occurredAt: this.timeProvider.currentTime(), - metadata: { correlationId: context.correlationId, causationId: context.causationId }, + metadata, streamVersion: streamVersion + 1 + index, streamName, })); - const eventsToPublish = await this.eventRepository.write(streamName, eventsToStore, streamVersion); - - await this.eventBus.publishAll(eventsToPublish); + return { eventsToStore, streamVersion }; } private static streamVersion(eventStream?: ApplicationEvent[]): EventStreamVersion { diff --git a/packages/api/src/module/write/shared/infrastructure/event-repository/prisma-event-repository.service.ts b/packages/api/src/module/write/shared/infrastructure/event-repository/prisma-event-repository.service.ts index 5bb5429e..b40a1e92 100644 --- a/packages/api/src/module/write/shared/infrastructure/event-repository/prisma-event-repository.service.ts +++ b/packages/api/src/module/write/shared/infrastructure/event-repository/prisma-event-repository.service.ts @@ -9,8 +9,9 @@ import { EventStreamName } from '../../application/event-stream-name.value-objec import { EventStreamVersion } from '../../application/event-stream-version'; import { TIME_PROVIDER, TimeProvider } from '../../application/time-provider.port'; -const parseData = (value: unknown): Record => JSON.parse(typeof value === 'string' ? value : '{}'); -const parseMetadata = (value: unknown): DefaultCommandMetadata & Record => { +export const parseStoredEventData = (value: unknown): Record => + JSON.parse(typeof value === 'string' ? value : '{}'); +export const parseStoredEventMetadata = (value: unknown): DefaultCommandMetadata & Record => { const metadata = JSON.parse(typeof value === 'string' ? value : '{}'); const hasCorrectCorrelationId = 'correlationId' in metadata && typeof metadata.correlationId === 'string'; @@ -40,8 +41,8 @@ export class PrismaEventRepository implements EventRepository { type: e.type, id: e.id, occurredAt: e.occurredAt, - data: parseData(e.data), - metadata: parseMetadata(e.metadata), + data: parseStoredEventData(e.data), + metadata: parseStoredEventMetadata(e.metadata), streamVersion: e.streamVersion, streamName: EventStreamName.from(e.streamCategory, e.streamId), globalOrder: e.globalOrder, @@ -88,8 +89,8 @@ export class PrismaEventRepository implements EventRepository { type: e.type, id: e.id, occurredAt: e.occurredAt, - data: parseData(e.data), - metadata: parseMetadata(e.metadata), + data: parseStoredEventData(e.data), + metadata: parseStoredEventMetadata(e.metadata), streamVersion: e.streamVersion, streamName: EventStreamName.from(e.streamCategory, e.streamId), globalOrder: e.globalOrder, @@ -115,8 +116,8 @@ export class PrismaEventRepository implements EventRepository { type: e.type, id: e.id, occurredAt: e.occurredAt, - data: parseData(e.data), - metadata: parseMetadata(e.metadata), + data: parseStoredEventData(e.data), + metadata: parseStoredEventMetadata(e.metadata), streamVersion: e.streamVersion, streamName: EventStreamName.from(e.streamCategory, e.streamId), globalOrder: e.globalOrder, diff --git a/packages/api/src/module/write/shared/infrastructure/transaction-event-repository/prisma-transaction-event-repository.ts b/packages/api/src/module/write/shared/infrastructure/transaction-event-repository/prisma-transaction-event-repository.ts new file mode 100644 index 00000000..e6ddc25b --- /dev/null +++ b/packages/api/src/module/write/shared/infrastructure/transaction-event-repository/prisma-transaction-event-repository.ts @@ -0,0 +1,68 @@ +import { ApplicationEvent } from '@/module/application-command-events'; + +import { StorableEvent } from '../../application/event-repository'; +import { EventStreamName } from '../../application/event-stream-name.value-object'; +import { EventStreamVersion } from '../../application/event-stream-version'; +import { + PrismaTransactionClient, + PrismaTransactionContext, +} from '../../application/prisma-transaction-manager/prisma-transaction-manager'; +import { parseStoredEventData, parseStoredEventMetadata } from '../event-repository/prisma-event-repository.service'; + +type EventId = string; + +export class PrismaTransactionEventRepository { + write( + streamName: EventStreamName, + events: StorableEvent[], + expectedStreamVersion: EventStreamVersion, + context: PrismaTransactionContext, + ): EventId[] { + const databaseEvents = events.map((e, index) => ({ + id: e.id, + type: e.type, + streamId: streamName.streamId, + streamCategory: streamName.streamCategory, + streamVersion: expectedStreamVersion + 1 + index, + occurredAt: e.occurredAt, + data: JSON.stringify(e.data), + metadata: JSON.stringify({ ...e.metadata, trxId: context.trxId }), + })); + + context.executeWithTransaction((prisma) => { + return prisma.eventLock.createMany({ + data: databaseEvents.map((x) => ({ + streamId: x.streamId, + streamVersion: x.streamVersion, + })), + }); + }); + context.executeWithTransaction((prisma) => { + return prisma.event.createMany({ data: databaseEvents }); + }); + + return databaseEvents.map((x) => x.id); + } + + async readAll(eventsToPublishIds: EventId[], prisma: PrismaTransactionClient): Promise { + const storedEvents = await prisma.event.findMany({ + where: { + id: { in: eventsToPublishIds }, + }, + orderBy: { + globalOrder: 'asc', + }, + }); + + return storedEvents.map((e) => ({ + type: e.type, + id: e.id, + occurredAt: e.occurredAt, + data: parseStoredEventData(e.data), + metadata: parseStoredEventMetadata(e.metadata), + streamVersion: e.streamVersion, + streamName: EventStreamName.from(e.streamCategory, e.streamId), + globalOrder: e.globalOrder, + })); + } +} diff --git a/packages/api/src/module/write/shared/shared.module.ts b/packages/api/src/module/write/shared/shared.module.ts index a1b8e0e7..b9c85e6e 100644 --- a/packages/api/src/module/write/shared/shared.module.ts +++ b/packages/api/src/module/write/shared/shared.module.ts @@ -13,12 +13,14 @@ import { ApplicationCommandFactory } from './application/application-command.fac import { APPLICATION_SERVICE } from './application/application-service'; import { EVENT_REPOSITORY } from './application/event-repository'; import { ID_GENERATOR } from './application/id-generator'; +import { PrismaTransactionManagerFactory } from './application/prisma-transaction-manager/prisma-transaction-manager-factory'; import { TIME_PROVIDER } from './application/time-provider.port'; import { EventApplicationService } from './infrastructure/application-service/event-application-service'; import { InMemoryEventRepository } from './infrastructure/event-repository/in-memory-event-repository'; import { PrismaEventRepository } from './infrastructure/event-repository/prisma-event-repository.service'; import { UuidGenerator } from './infrastructure/id-generator/uuid-generator'; import { SystemTimeProvider } from './infrastructure/time-provider/system-time-provider'; +import { PrismaTransactionEventRepository } from './infrastructure/transaction-event-repository/prisma-transaction-event-repository'; const imports: ModuleMetadata['imports'] = [CqrsModule, PrismaModule]; @@ -52,7 +54,9 @@ const imports: ModuleMetadata['imports'] = [CqrsModule, PrismaModule]; useClass: CryptoPasswordEncoder, }, ApplicationCommandFactory, + PrismaTransactionManagerFactory, EventsSubscriptionsRegistry, + PrismaTransactionEventRepository, ], exports: [ CqrsModule, diff --git a/packages/api/src/module/write/user-registration/application/register-user.command-handler.ts b/packages/api/src/module/write/user-registration/application/register-user.command-handler.ts index 9862a5f5..ba5d761c 100644 --- a/packages/api/src/module/write/user-registration/application/register-user.command-handler.ts +++ b/packages/api/src/module/write/user-registration/application/register-user.command-handler.ts @@ -33,7 +33,7 @@ export class RegisterUserCommandHandler implements ICommandHandler( EventStreamName.from('UserRegistration', data.userId), - { correlationId: command.metadata.correlationId, causationId: command.metadata.causationId }, + command, (pastEvents) => registerUser(pastEvents, command, hashedPassword), ); } catch (ex) { diff --git a/packages/api/src/shared/test-utils.ts b/packages/api/src/shared/test-utils.ts index 76d67e6b..3b35004c 100644 --- a/packages/api/src/shared/test-utils.ts +++ b/packages/api/src/shared/test-utils.ts @@ -30,6 +30,20 @@ import { setupMiddlewares } from '../app.middlewares'; import { AppModule } from '../app.module'; import { eventEmitterRootModule } from '../event-emitter.root-module'; +function sampleCommandFactory(command: Partial = {}): ApplicationCommand { + return { + id: uuid(), + data: {}, + type: 'SampleCommand', + issuedAt: new Date(), + metadata: { + correlationId: uuid(), + causationId: uuid(), + }, + ...command, + }; +} + export async function cleanupDatabase(prismaService: PrismaService) { await Promise.all( Object.values(prismaService).map((table) => (table?.deleteMany ? table.deleteMany() : Promise.resolve())), @@ -58,13 +72,9 @@ export async function initReadTestModule(config?: { modules?: ModuleMetadata['im } async function eventsOccurred(eventStreamName: EventStreamName, events: DomainEvent[]) { - const sourceCommandId = uuid(); + const sampleCommand = sampleCommandFactory(); - await applicationService.execute( - eventStreamName, - { correlationId: sourceCommandId, causationId: sourceCommandId }, - () => events, - ); + await applicationService.execute(eventStreamName, sampleCommand, () => events); } async function eventOccurred(eventStreamName: EventStreamName, event: DomainEvent): Promise { @@ -205,23 +215,15 @@ export async function initWriteTestModule(config?: { } async function eventOccurred(eventStreamName: EventStreamName, event: DomainEvent) { - const sourceCommandId = uuid(); + const sampleCommand = sampleCommandFactory(); - await applicationService.execute( - eventStreamName, - { correlationId: sourceCommandId, causationId: sourceCommandId }, - () => [event], - ); + await applicationService.execute(eventStreamName, sampleCommand, () => [event]); } async function eventsOccurred(eventStreamName: EventStreamName, events: DomainEvent[]) { - const sourceCommandId = uuid(); + const sampleCommand = sampleCommandFactory(); - await applicationService.execute( - eventStreamName, - { correlationId: sourceCommandId, causationId: sourceCommandId }, - () => events, - ); + await applicationService.execute(eventStreamName, sampleCommand, () => events); } async function expectSubscriptionPosition(expectation: { subscriptionId: SubscriptionId; position: number }) { @@ -369,6 +371,28 @@ export function sampleApplicationEvent(event: Partial): Applic }; } +type StorableStream = { + streamId: string; + streamCategory: string; + streamVersion: number; +}; + +export function sampleDatabaseEvent( + stream: Partial & { streamVersion: number } = { streamVersion: 0 }, + event: Partial = {}, +) { + return { + id: event?.id ?? uuid(), + type: event?.type ?? uuid(), + streamId: stream?.streamId ?? uuid(), + streamCategory: stream?.streamCategory ?? uuid(), + streamVersion: stream.streamVersion, + occurredAt: event?.occurredAt ?? new Date(), + data: JSON.stringify(event?.data ?? {}), + metadata: JSON.stringify(event.metadata ?? {}), + }; +} + export const commandBusNoFailWithoutHandler: Partial = { register: jest.fn(), execute: jest.fn(),