Skip to content
This repository was archived by the owner on Mar 20, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ COOKIE_SECRET="89YHGASP9DYFHWEOIH098yasd8rtwepuddrw902"

PROCESS_ST_CHECKLIST_URL="https://app.process.st/workflows/CodersCamp-Test-Checklist-kTrxJoZgP-9IabhRbohIrw/run-link"
EVENT_REPOSITORY="prisma"
SUBSCRIPTION_QUEUE_MAX_RETRY_COUNT=10;
SUBSCRIPTION_QUEUE_WAITING_TIME_ON_RETRY_MS=100;
SUBSCRIPTION_QUEUE_MAX_RETRY_COUNT=10
SUBSCRIPTION_QUEUE_WAITING_TIME_ON_RETRY_MS=100
2 changes: 2 additions & 0 deletions packages/api/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ COOKIE_SECRET="786Tgiu3564Rti8oh89YG6re6"

PROCESS_ST_CHECKLIST_URL="https://app.process.st/workflows/CodersCamp-Test-Checklist-kTrxJoZgP-9IabhRbohIrw/run-link"
EVENT_REPOSITORY="prisma"
SUBSCRIPTION_QUEUE_MAX_RETRY_COUNT=10
SUBSCRIPTION_QUEUE_WAITING_TIME_ON_RETRY_MS=100
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { LearningMaterialsUrlWasGenerated } from '@/events/learning-materials-ur
import { ApplicationEvent } from '@/module/application-command-events';
import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription';
import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry';
import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager';

// fixme: example for automation, implement later
@Injectable()
Expand All @@ -17,7 +18,7 @@ export class LearningMaterialsUrlWasGeneratedEventHandler implements OnApplicati
.subscription('SendEmailWhenLearningMaterialsUrlWasGenerated_Automation_v1')
.onEvent<LearningMaterialsUrlWasGenerated>(
'LearningMaterialsUrlWasGenerated',
this.onLearningMaterialsUrlWasGenerated,
LearningMaterialsUrlWasGeneratedEventHandler.onLearningMaterialsUrlWasGenerated,
)
.build();
await this.eventsSubscription.start();
Expand All @@ -27,5 +28,8 @@ export class LearningMaterialsUrlWasGeneratedEventHandler implements OnApplicati
await this.eventsSubscription.stop();
}

onLearningMaterialsUrlWasGenerated(_event: ApplicationEvent<LearningMaterialsUrlWasGenerated>) {}
private static onLearningMaterialsUrlWasGenerated(
_context: PrismaTransactionContext,
_event: ApplicationEvent<LearningMaterialsUrlWasGenerated>,
) {}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';

import { ApplicationEvent } from '@/module/application-command-events';
import {
Expand All @@ -10,23 +9,21 @@ import { UserRegistrationWasStarted } from '@/module/events/user-registration-wa
import { ApplicationCommandFactory } from '@/write/shared/application/application-command.factory';
import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription';
import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry';
import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager';

@Injectable()
export class UserRegistrationWasStartedEventHandler implements OnApplicationBootstrap, OnModuleDestroy {
private eventsSubscription: EventsSubscription;

constructor(
private readonly commandBus: CommandBus,
private readonly commandFactory: ApplicationCommandFactory,
private readonly eventsSubscriptionsFactory: EventsSubscriptionsRegistry,
) {}

async onApplicationBootstrap() {
this.eventsSubscription = this.eventsSubscriptionsFactory
.subscription('WhenUserRegistrationWasStartedThenRequestEmailConfirmation_Automation_v1')
.onEvent<UserRegistrationWasStarted>('UserRegistrationWasStarted', (event) =>
this.onUserRegistrationWasStarted(event),
)
.onEvent<UserRegistrationWasStarted>('UserRegistrationWasStarted', this.onUserRegistrationWasStarted.bind(this))
.build();
await this.eventsSubscription.start();
}
Expand All @@ -35,7 +32,10 @@ export class UserRegistrationWasStartedEventHandler implements OnApplicationBoot
await this.eventsSubscription.stop();
}

async onUserRegistrationWasStarted(event: ApplicationEvent<UserRegistrationWasStarted>) {
async onUserRegistrationWasStarted(
context: PrismaTransactionContext,
event: ApplicationEvent<UserRegistrationWasStarted>,
) {
const command = this.commandFactory.applicationCommand((idGenerator) => ({
class: RequestEmailConfirmationApplicationCommand,
...requestEmailConfirmationCommand({
Expand All @@ -46,6 +46,6 @@ export class UserRegistrationWasStartedEventHandler implements OnApplicationBoot
metadata: { correlationId: event.metadata.correlationId, causationId: event.id },
}));

await this.commandBus.execute(command);
await context.executeCommand(command);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import {Module, OnApplicationBootstrap, OnModuleDestroy} from '@nestjs/common';
import { Module, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';

import { LearningMaterialsUrlWasGenerated } from '@/events/learning-materials-url-was-generated.domain-event';
import { ApplicationEvent } from '@/module/application-command-events';
import { TaskWasCompleted } from '@/module/events/task-was-completed.domain-event';
import { TaskWasUncompleted } from '@/module/events/task-was-uncompleted-event.domain-event';
import {
EventsSubscription,
PrismaTransactionManager,
} from '@/write/shared/application/events-subscription/events-subscription';
import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription';
import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry';
import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager';
import { SharedModule } from '@/write/shared/shared.module';

import { CourseProgressRestController } from './course-progress.rest-controller';
Expand Down Expand Up @@ -40,46 +38,46 @@ export class CourseProgressReadModule implements OnApplicationBootstrap, OnModul
await this.eventsSubscription.stop();
}

private static async onInitialPosition(_position: number, context: { transaction: PrismaTransactionManager }) {
await context.transaction.courseProgress.deleteMany({});
private static onInitialPosition(context: PrismaTransactionContext, _position: number) {
return context.executeWithTransaction((prisma) => prisma.courseProgress.deleteMany({}));
}

private static async onLearningMaterialsUrlWasGenerated(
private static onLearningMaterialsUrlWasGenerated(
context: PrismaTransactionContext,
event: ApplicationEvent<LearningMaterialsUrlWasGenerated>,
context: { transaction: PrismaTransactionManager },
) {
await context.transaction.courseProgress.upsert({
where: { learningMaterialsId: event.data.learningMaterialsId },
create: {
learningMaterialsId: event.data.learningMaterialsId,
learningMaterialsCompletedTasks: 0,
courseUserId: event.data.courseUserId,
},
update: { courseUserId: event.data.courseUserId },
});
return context.executeWithTransaction((prisma) =>
prisma.courseProgress.upsert({
where: { learningMaterialsId: event.data.learningMaterialsId },
create: {
learningMaterialsId: event.data.learningMaterialsId,
learningMaterialsCompletedTasks: 0,
courseUserId: event.data.courseUserId,
},
update: { courseUserId: event.data.courseUserId },
}),
);
}

private static async onTaskWasCompleted(
event: ApplicationEvent<TaskWasCompleted>,
context: { transaction: PrismaTransactionManager },
) {
await context.transaction.courseProgress.upsert(
CourseProgressReadModule.courseProgressStateUpdate({
learningMaterialsId: event.data.learningMaterialsId,
completedTasks: 'increment',
}),
private static onTaskWasCompleted(context: PrismaTransactionContext, event: ApplicationEvent<TaskWasCompleted>) {
return context.executeWithTransaction((prisma) =>
prisma.courseProgress.upsert(
CourseProgressReadModule.courseProgressStateUpdate({
learningMaterialsId: event.data.learningMaterialsId,
completedTasks: 'increment',
}),
),
);
}

private static async onTaskWasUncompleted(
event: ApplicationEvent<TaskWasUncompleted>,
context: { transaction: PrismaTransactionManager },
) {
await context.transaction.courseProgress.upsert(
CourseProgressReadModule.courseProgressStateUpdate({
learningMaterialsId: event.data.learningMaterialsId,
completedTasks: 'decrement',
}),
private static onTaskWasUncompleted(context: PrismaTransactionContext, event: ApplicationEvent<TaskWasUncompleted>) {
return context.executeWithTransaction((prisma) =>
prisma.courseProgress.upsert(
CourseProgressReadModule.courseProgressStateUpdate({
learningMaterialsId: event.data.learningMaterialsId,
completedTasks: 'decrement',
}),
),
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import {Module, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';
import { Module, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common';

import { LearningMaterialsUrlWasGenerated } from '@/events/learning-materials-url-was-generated.domain-event';
import { ApplicationEvent } from '@/module/application-command-events';
import {
EventsSubscription,
PrismaTransactionManager,
} from '@/write/shared/application/events-subscription/events-subscription';
import { EventsSubscription } from '@/write/shared/application/events-subscription/events-subscription';
import { EventsSubscriptionsRegistry } from '@/write/shared/application/events-subscription/events-subscriptions-registry';
import { PrismaTransactionContext } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager';
import { SharedModule } from '@/write/shared/shared.module';

import { LearningMaterialsRestController } from './learning-materials.rest-controller';
Expand All @@ -23,10 +21,10 @@ export class LearningMaterialsReadModule implements OnApplicationBootstrap, OnMo
async onApplicationBootstrap() {
this.eventsSubscription = this.eventsSubscriptionsFactory
.subscription('LearningMaterials_ReadModel_v1')
.onInitialPosition(this.onInitialPosition)
.onInitialPosition(LearningMaterialsReadModule.onInitialPosition)
.onEvent<LearningMaterialsUrlWasGenerated>(
'LearningMaterialsUrlWasGenerated',
this.onLearningMaterialsUrlWasGenerated,
LearningMaterialsReadModule.onLearningMaterialsUrlWasGenerated,
)
.build();
await this.eventsSubscription.start();
Expand All @@ -36,20 +34,22 @@ export class LearningMaterialsReadModule implements OnApplicationBootstrap, OnMo
await this.eventsSubscription.stop();
}

async onInitialPosition(_position: number, context: { transaction: PrismaTransactionManager }) {
await context.transaction.learningMaterials.deleteMany({});
private static onInitialPosition(context: PrismaTransactionContext, _position: number) {
return context.executeWithTransaction((prisma) => prisma.learningMaterials.deleteMany({}));
}

async onLearningMaterialsUrlWasGenerated(
private static onLearningMaterialsUrlWasGenerated(
context: PrismaTransactionContext,
event: ApplicationEvent<LearningMaterialsUrlWasGenerated>,
context: { transaction: PrismaTransactionManager },
) {
await context.transaction.learningMaterials.create({
data: {
id: event.data.learningMaterialsId,
courseUserId: event.data.courseUserId,
url: event.data.materialsUrl,
},
});
return context.executeWithTransaction((prisma) =>
prisma.learningMaterials.create({
data: {
id: event.data.learningMaterialsId,
courseUserId: event.data.courseUserId,
url: event.data.materialsUrl,
},
}),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class ApproveEmailConfirmationApplicationCommandHandler

await this.applicationService.execute<EmailConfirmationDomainEvent>(
eventStream,
{ causationId: command.id, correlationId: command.metadata.correlationId },
command,
approveEmailConfirmation(command),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class RequestEmailConfirmationApplicationCommandHandler

await this.applicationService.execute<EmailConfirmationDomainEvent>(
eventStream,
{ causationId: command.id, correlationId: command.metadata.correlationId },
command,
requestEmailConfirmation(command),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@ export class CompleteTaskCommandHandler implements ICommandHandler<CompleteTaskA
async execute(command: CompleteTaskApplicationCommand): Promise<void> {
const eventStream = EventStreamName.from('LearningMaterialsTasks', command.data.learningMaterialsId);

await this.applicationService.execute<TaskWasCompleted>(
eventStream,
{
causationId: command.id,
correlationId: command.metadata.correlationId,
},
(pastEvents) => completeTask(pastEvents, command),
await this.applicationService.execute<TaskWasCompleted>(eventStream, command, (pastEvents) =>
completeTask(pastEvents, command),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ export class GenerateLearningMaterialsUrlCommandHandler

const eventStream = EventStreamName.from('LearningMaterialsUrl', command.data.courseUserId);

await this.applicationService.execute<LearningMaterialsUrlDomainEvent>(
eventStream,
{ causationId: command.id, correlationId: command.metadata.correlationId },
(pastEvents) => generateLearningMaterialsUrl(pastEvents, command, learningMaterials.url, learningMaterials.id),
await this.applicationService.execute<LearningMaterialsUrlDomainEvent>(eventStream, command, (pastEvents) =>
generateLearningMaterialsUrl(pastEvents, command, learningMaterials.url, learningMaterials.id),
);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { ApplicationEvent } from '@/module/application-command-events';
import { ApplicationCommand, ApplicationEvent } from '@/module/application-command-events';
import { DomainEvent } from '@/module/domain.event';

import { EventStreamName } from './event-stream-name.value-object';

export const APPLICATION_SERVICE = Symbol('APPLICATION_SERVICE');

export type ApplicationExecutionContext = { correlationId: string; causationId?: string };

export type DomainLogic<EventType extends DomainEvent> = (
pastEvents: EventType[],
) => EventType[] | Promise<EventType[]>;
Expand All @@ -15,7 +13,7 @@ export type EventStream<EventType extends ApplicationEvent = ApplicationEvent> =
export interface ApplicationService {
execute<DomainEventType extends DomainEvent>(
streamName: EventStreamName,
context: ApplicationExecutionContext,
command: ApplicationCommand,
domainLogic: DomainLogic<DomainEventType>,
): Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export type StorableEvent<
> = Omit<ApplicationEvent<DomainEventType, EventMetadata>, 'globalOrder' | 'streamVersion' | 'streamName'>;

export type ReadAllFilter = { streamCategory?: string; eventTypes?: string[]; fromGlobalPosition?: number };

export interface EventRepository {
read(streamName: EventStreamName): Promise<EventStream>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
SubscriptionId,
SubscriptionOptions,
} from '@/write/shared/application/events-subscription/events-subscription';
import { PrismaTransactionManagerFactory } from '@/write/shared/application/prisma-transaction-manager/prisma-transaction-manager-factory';

export interface NeedsEventOrPositionHandlers {
onInitialPosition(handle: OnPositionFn): MoreEventHandlersOrBuild;
Expand All @@ -35,6 +36,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv
private readonly options: SubscriptionOptions,
private readonly positionHandlers: PositionHandler[] = [],
private readonly eventHandlers: ApplicationEventHandler[] = [],
private readonly transactionManagerFactory: PrismaTransactionManagerFactory,
) {}

onInitialPosition(handle: OnPositionFn): MoreEventHandlersOrBuild {
Expand All @@ -51,6 +53,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv
this.options,
[...this.positionHandlers, handlerToRegister],
this.eventHandlers,
this.transactionManagerFactory,
);
}

Expand All @@ -71,6 +74,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv
this.options,
this.positionHandlers,
[...this.eventHandlers, handlerToRegister],
this.transactionManagerFactory,
);
}

Expand All @@ -85,6 +89,7 @@ export class SubscriptionBuilder implements NeedsEventOrPositionHandlers, MoreEv
this.prismaService,
this.eventRepository,
this.eventEmitter,
this.transactionManagerFactory,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { SharedModule } from '@/write/shared/shared.module';

import { eventEmitterRootModule } from '../../../../../event-emitter.root-module';
import { ApplicationEventBus } from '../application.event-bus';
import { SubscriptionOptions } from './events-subscription';
import { OnEventFn, OnPositionFn, SubscriptionOptions } from './events-subscription';

export async function initTestEventsSubscription() {
const app = await initWriteTestModule({
Expand Down Expand Up @@ -101,9 +101,9 @@ class EventsSubscriptionConcurrencyTestFixtureImpl implements EventsSubscription
export async function initEventsSubscriptionConcurrencyTestFixture(options: SubscriptionOptions['queue']) {
const fixtureBase = await initTestEventsSubscription();

const onInitialPosition = jest.fn();
const onSampleDomainEvent = jest.fn();
const onAnotherSampleDomainEvent = jest.fn();
const onInitialPosition: jest.MockedFunction<OnPositionFn> = jest.fn();
const onSampleDomainEvent: jest.MockedFunction<OnEventFn<SampleDomainEvent>> = jest.fn();
const onAnotherSampleDomainEvent: jest.MockedFunction<OnEventFn<AnotherSampleDomainEvent>> = jest.fn();

const sut = fixtureBase.eventsSubscriptions
.subscription(fixtureBase.randomUuid(), { queue: options })
Expand Down
Loading