Skip to content

Commit

Permalink
refactor(server): event emits
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasm91 committed Jun 27, 2024
1 parent 922430d commit 58b5ea6
Show file tree
Hide file tree
Showing 25 changed files with 219 additions and 171 deletions.
36 changes: 26 additions & 10 deletions server/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BullModule } from '@nestjs/bullmq';
import { Module, OnModuleInit, ValidationPipe } from '@nestjs/common';
import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE, ModuleRef } from '@nestjs/core';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule';
import { TypeOrmModule } from '@nestjs/typeorm';
Expand All @@ -12,15 +12,15 @@ import { bullConfig, bullQueues, clsConfig, immichAppConfig } from 'src/config';
import { controllers } from 'src/controllers';
import { databaseConfig } from 'src/database.config';
import { entities } from 'src/entities';
import { IEventRepository } from 'src/interfaces/event.interface';
import { AuthGuard } from 'src/middleware/auth.guard';
import { ErrorInterceptor } from 'src/middleware/error.interceptor';
import { FileUploadInterceptor } from 'src/middleware/file-upload.interceptor';
import { HttpExceptionFilter } from 'src/middleware/http-exception.filter';
import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
import { repositories } from 'src/repositories';
import { services } from 'src/services';
import { ApiService } from 'src/services/api.service';
import { MicroservicesService } from 'src/services/microservices.service';
import { setupEventHandlers } from 'src/utils/events';
import { otelConfig } from 'src/utils/instrumentation';

const common = [...services, ...repositories];
Expand Down Expand Up @@ -50,23 +50,39 @@ const imports = [
controllers: [...controllers],
providers: [...common, ...middleware],
})
export class ApiModule implements OnModuleInit {
constructor(private service: ApiService) {}
export class ApiModule implements OnModuleInit, OnModuleDestroy {
constructor(
private moduleRef: ModuleRef,
@Inject(IEventRepository) private eventRepository: IEventRepository,
) {}

async onModuleInit() {
await this.service.init();
setupEventHandlers(this.moduleRef);
await this.eventRepository.emit('onBootstrapEvent', 'api');
}

async onModuleDestroy() {
await this.eventRepository.emit('onShutdownEvent');
}
}

@Module({
imports: [...imports],
providers: [...common, SchedulerRegistry],
})
export class MicroservicesModule implements OnModuleInit {
constructor(private service: MicroservicesService) {}
export class MicroservicesModule implements OnModuleInit, OnModuleDestroy {
constructor(
private moduleRef: ModuleRef,
@Inject(IEventRepository) private eventRepository: IEventRepository,
) {}

async onModuleInit() {
await this.service.init();
setupEventHandlers(this.moduleRef);
await this.eventRepository.emit('onBootstrapEvent', 'microservices');
}

async onModuleDestroy() {
await this.eventRepository.emit('onShutdownEvent');
}
}

Expand Down
8 changes: 6 additions & 2 deletions server/src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { OnEventOptions } from '@nestjs/event-emitter/dist/interfaces';
import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger';
import _ from 'lodash';
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
import { ServerAsyncEvent, ServerEvent } from 'src/interfaces/event.interface';
import { ServerEvent } from 'src/interfaces/event.interface';
import { Metadata } from 'src/middleware/auth.guard';
import { setUnion } from 'src/utils/set';

// PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the
Expand Down Expand Up @@ -129,9 +130,12 @@ export interface GenerateSqlQueries {
/** Decorator to enable versioning/tracking of generated Sql */
export const GenerateSql = (...options: GenerateSqlQueries[]) => SetMetadata(GENERATE_SQL_KEY, options);

export const OnServerEvent = (event: ServerEvent | ServerAsyncEvent, options?: OnEventOptions) =>
export const OnServerEvent = (event: ServerEvent, options?: OnEventOptions) =>
OnEvent(event, { suppressErrors: false, ...options });

export type HandlerOptions = { priority: number };
export const EventHandlerOptions = (options: HandlerOptions) => SetMetadata(Metadata.EVENT_HANDLER_OPTIONS, options);

type LifecycleRelease = 'NEXT_RELEASE' | string;
type LifecycleMetadata = {
addedAt?: LifecycleRelease;
Expand Down
32 changes: 20 additions & 12 deletions server/src/interfaces/event.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server-i

export const IEventRepository = 'IEventRepository';

type MaybePromise<T> = Promise<T> | T;

const noop = () => {};
const dummyHandlers = {
onBootstrapEvent: noop as (app: 'api' | 'microservices') => MaybePromise<void>,
onShutdownEvent: noop as () => MaybePromise<void>,
onConfigUpdateEvent: noop as (update: SystemConfigUpdate) => MaybePromise<void>,
onConfigValidateEvent: noop as (update: SystemConfigUpdate) => MaybePromise<void>,
};

export type SystemConfigUpdate = { newConfig: SystemConfig; oldConfig: SystemConfig };
export type EventHandlers = typeof dummyHandlers;
export type EmitEvent = keyof EventHandlers;
export type EmitEventHandler<T extends EmitEvent> = (...args: Parameters<EventHandlers[T]>) => MaybePromise<void>;
export const events = Object.keys(dummyHandlers) as EmitEvent[];
export type OnEvents = Partial<EventHandlers>;

export enum ClientEvent {
UPLOAD_SUCCESS = 'on_upload_success',
USER_DELETE = 'on_user_delete',
Expand Down Expand Up @@ -44,15 +61,10 @@ export interface ServerEventMap {
[ServerEvent.WEBSOCKET_CONNECT]: { userId: string };
}

export enum ServerAsyncEvent {
CONFIG_VALIDATE = 'config.validate',
}

export interface ServerAsyncEventMap {
[ServerAsyncEvent.CONFIG_VALIDATE]: { newConfig: SystemConfig; oldConfig: SystemConfig };
}

export interface IEventRepository {
on<T extends EmitEvent>(event: T, handler: EmitEventHandler<T>): void;
emit<T extends EmitEvent>(event: T, ...args: Parameters<EmitEventHandler<T>>): Promise<void>;

/**
* Send to connected clients for a specific user
*/
Expand All @@ -65,8 +77,4 @@ export interface IEventRepository {
* Notify listeners in this and connected processes. Subscribe to an event with `@OnServerEvent`
*/
serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]): boolean;
/**
* Notify and wait for responses from listeners in this process. Subscribe to an event with `@OnServerEvent`
*/
serverSendAsync<E extends keyof ServerAsyncEventMap>(event: E, data: ServerAsyncEventMap[E]): Promise<any>;
}
1 change: 1 addition & 0 deletions server/src/middleware/auth.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export enum Metadata {
ADMIN_ROUTE = 'admin_route',
SHARED_ROUTE = 'shared_route',
API_KEY_SECURITY = 'api_key',
EVENT_HANDLER_OPTIONS = 'event_handler_options',
}

type AdminRoute = { admin?: true };
Expand Down
21 changes: 16 additions & 5 deletions server/src/repositories/event.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import {
import { Server, Socket } from 'socket.io';
import {
ClientEventMap,
EmitEvent,
EmitEventHandler,
IEventRepository,
ServerAsyncEventMap,
ServerEvent,
ServerEventMap,
} from 'src/interfaces/event.interface';
Expand All @@ -27,6 +28,8 @@ import { Instrumentation } from 'src/utils/instrumentation';
})
@Injectable()
export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, IEventRepository {
private emitHandlers: Partial<Record<EmitEvent, EmitEventHandler<EmitEvent>[]>> = {};

@WebSocketServer()
private server?: Server;

Expand Down Expand Up @@ -71,6 +74,18 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
await client.leave(client.nsp.name);
}

on<T extends EmitEvent>(event: T, handler: EmitEventHandler<T>): void {
const handlers: EmitEventHandler<EmitEvent>[] = this.emitHandlers[event] || [];
this.emitHandlers[event] = [...handlers, handler];
}

async emit<T extends EmitEvent>(event: T, ...args: Parameters<EmitEventHandler<T>>): Promise<void> {
const handlers = this.emitHandlers[event] || [];
for (const handler of handlers) {
await handler(...args);
}
}

clientSend<E extends keyof ClientEventMap>(event: E, userId: string, data: ClientEventMap[E]) {
this.server?.to(userId).emit(event, data);
}
Expand All @@ -84,8 +99,4 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
this.server?.serverSideEmit(event, data);
return this.eventEmitter.emit(event, data);
}

serverSendAsync<E extends keyof ServerAsyncEventMap, R = any[]>(event: E, data: ServerAsyncEventMap[E]): Promise<R> {
return this.eventEmitter.emitAsync(event, data) as Promise<R>;
}
}
17 changes: 0 additions & 17 deletions server/src/services/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ import { join } from 'node:path';
import { ONE_HOUR, WEB_ROOT } from 'src/constants';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { AuthService } from 'src/services/auth.service';
import { DatabaseService } from 'src/services/database.service';
import { JobService } from 'src/services/job.service';
import { ServerInfoService } from 'src/services/server-info.service';
import { SharedLinkService } from 'src/services/shared-link.service';
import { StorageService } from 'src/services/storage.service';
import { SystemConfigService } from 'src/services/system-config.service';
import { VersionService } from 'src/services/version.service';
import { OpenGraphTags } from 'src/utils/misc';

Expand Down Expand Up @@ -39,12 +35,8 @@ const render = (index: string, meta: OpenGraphTags) => {
export class ApiService {
constructor(
private authService: AuthService,
private configService: SystemConfigService,
private jobService: JobService,
private serverService: ServerInfoService,
private sharedLinkService: SharedLinkService,
private storageService: StorageService,
private databaseService: DatabaseService,
private versionService: VersionService,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
Expand All @@ -61,15 +53,6 @@ export class ApiService {
await this.jobService.handleNightlyJobs();
}

async init() {
await this.databaseService.init();
await this.configService.init();
this.storageService.init();
await this.serverService.init();
await this.versionService.init();
this.logger.log(`Feature Flags: ${JSON.stringify(await this.serverService.getFeatures(), null, 2)}`);
}

ssr(excludePaths: string[]) {
let index = '';
try {
Expand Down
Loading

0 comments on commit 58b5ea6

Please sign in to comment.