Skip to content

Commit

Permalink
use nestjs events to notify about config updates
Browse files Browse the repository at this point in the history
  • Loading branch information
danieldietzler committed Mar 15, 2024
1 parent abedfd1 commit 8bfd030
Show file tree
Hide file tree
Showing 18 changed files with 99 additions and 68 deletions.
31 changes: 31 additions & 0 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"@nestjs/common": "^10.2.2",
"@nestjs/config": "^3.0.0",
"@nestjs/core": "^10.2.2",
"@nestjs/event-emitter": "^2.0.4",
"@nestjs/platform-express": "^10.2.2",
"@nestjs/platform-socket.io": "^10.2.2",
"@nestjs/schedule": "^4.0.0",
Expand Down
4 changes: 2 additions & 2 deletions server/src/domain/domain.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import { TrashService } from './trash';
import { UserService } from './user';

const providers: Provider[] = [
APIKeyService,
ActivityService,
AlbumService,
APIKeyService,
AssetService,
AuditService,
AuthService,
Expand All @@ -39,8 +39,8 @@ const providers: Provider[] = [
LibraryService,
MediaService,
MetadataService,
PersonService,
PartnerService,
PersonService,
SearchService,
ServerInfoService,
SharedLinkService,
Expand Down
4 changes: 2 additions & 2 deletions server/src/domain/job/job.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
JobItem,
JobStatus,
} from '../repositories';
import { FeatureFlag, SystemConfigCore } from '../system-config/system-config.core';
import { FeatureFlag } from '../system-config/system-config.core';
import { JobCommand, JobName, QueueName } from './job.constants';
import { JobService } from './job.service';

Expand Down Expand Up @@ -230,7 +230,7 @@ describe(JobService.name, () => {
it('should subscribe to config changes', async () => {
await sut.init(makeMockHandlers(JobStatus.FAILED));

SystemConfigCore.create(newSystemConfigRepositoryMock(false)).config$.next({
sut.onConfigUpdate({
job: {
[QueueName.BACKGROUND_TASK]: { concurrency: 10 },
[QueueName.SMART_SEARCH]: { concurrency: 10 },
Expand Down
24 changes: 13 additions & 11 deletions server/src/domain/job/job.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AssetType } from '@app/infra/entities';
import { AssetType, SystemConfig } from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { mapAsset } from '../asset';
import {
ClientEvent,
Expand Down Expand Up @@ -165,18 +166,19 @@ export class JobService {
}
});
}
}

this.configCore.config$.subscribe((config) => {
this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
let concurrency = 1;
if (this.isConcurrentQueue(queueName)) {
concurrency = config.job[queueName].concurrency;
}
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
this.jobRepository.setConcurrency(queueName, concurrency);
@OnEvent('config_update')
onConfigUpdate({ job }: SystemConfig) {
this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
let concurrency = 1;
if (this.isConcurrentQueue(queueName)) {
concurrency = job[queueName].concurrency;
}
});
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
this.jobRepository.setConcurrency(queueName, concurrency);
}
}

private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName {
Expand Down
3 changes: 1 addition & 2 deletions server/src/domain/library/library.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import {
JobStatus,
StorageEventType,
} from '../repositories';
import { SystemConfigCore } from '../system-config/system-config.core';
import { mapLibrary } from './library.dto';
import { LibraryService } from './library.service';

Expand Down Expand Up @@ -89,7 +88,7 @@ describe(LibraryService.name, () => {
expect(configMock.load).toHaveBeenCalled();
expect(jobMock.addCronJob).toHaveBeenCalled();

SystemConfigCore.create(newSystemConfigRepositoryMock(false)).config$.next({
sut.onConfigUpdate({
library: {
scan: {
enabled: true,
Expand Down
20 changes: 11 additions & 9 deletions server/src/domain/library/library.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AssetType, LibraryEntity, LibraryType } from '@app/infra/entities';
import { AssetType, LibraryEntity, LibraryType, SystemConfig } from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { Trie } from 'mnemonist';
import { R_OK } from 'node:constants';
import { EventEmitter } from 'node:events';
Expand Down Expand Up @@ -97,16 +98,17 @@ export class LibraryService extends EventEmitter {
if (this.watchLibraries) {
await this.watchAll();
}
}

this.configCore.config$.subscribe(({ library }) => {
this.jobRepository.updateCronJob('libraryScan', library.scan.cronExpression, library.scan.enabled);
@OnEvent('config_update')
onConfigUpdate({ library }: SystemConfig) {
this.jobRepository.updateCronJob('libraryScan', library.scan.cronExpression, library.scan.enabled);

if (library.watch.enabled !== this.watchLibraries) {
// Watch configuration changed, update accordingly
this.watchLibraries = library.watch.enabled;
handlePromiseError(this.watchLibraries ? this.watchAll() : this.unwatchAll(), this.logger);
}
});
if (library.watch.enabled !== this.watchLibraries) {
// Watch configuration changed, update accordingly
this.watchLibraries = library.watch.enabled;
handlePromiseError(this.watchLibraries ? this.watchAll() : this.unwatchAll(), this.logger);
}
}

private async watch(id: string): Promise<boolean> {
Expand Down
13 changes: 6 additions & 7 deletions server/src/domain/metadata/metadata.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { AssetEntity, AssetType, ExifEntity } from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ExifDateTime, Tags } from 'exiftool-vendored';
import { firstDateTime } from 'exiftool-vendored/dist/FirstDateTime';
import _ from 'lodash';
import { Duration } from 'luxon';
import { constants } from 'node:fs/promises';
import path from 'node:path';
import { Subscription } from 'rxjs';
import { handlePromiseError, usePagination } from '../domain.util';
import { IBaseJob, IEntityJob, ISidecarWriteJob, JOBS_ASSET_PAGINATION_SIZE, JobName, QueueName } from '../job';
import {
Expand Down Expand Up @@ -97,7 +97,6 @@ export class MetadataService {
private logger = new ImmichLogger(MetadataService.name);
private storageCore: StorageCore;
private configCore: SystemConfigCore;
private subscription: Subscription | null = null;

constructor(
@Inject(IAlbumRepository) private albumRepository: IAlbumRepository,
Expand Down Expand Up @@ -125,10 +124,6 @@ export class MetadataService {
}

async init() {
if (!this.subscription) {
this.subscription = this.configCore.config$.subscribe(() => handlePromiseError(this.init(), this.logger));
}

const { reverseGeocoding } = await this.configCore.getConfig();
const { enabled } = reverseGeocoding;

Expand All @@ -148,10 +143,14 @@ export class MetadataService {
}

async teardown() {
this.subscription?.unsubscribe();
await this.repository.teardown();
}

@OnEvent('config_update')
onConfigUpdate() {
handlePromiseError(this.init(), this.logger);
}

async handleLivePhotoLinking(job: IEntityJob): Promise<JobStatus> {
const { id } = job;
const [asset] = await this.assetRepository.getByIds([id], { exifInfo: true });
Expand Down
3 changes: 2 additions & 1 deletion server/src/domain/repositories/system-config.repository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SystemConfigEntity } from '@app/infra/entities';
import { SystemConfig, SystemConfigEntity } from '@app/infra/entities';

export const ISystemConfigRepository = 'ISystemConfigRepository';

Expand All @@ -7,5 +7,6 @@ export interface ISystemConfigRepository {
load(): Promise<SystemConfigEntity[]>;
readFile(filename: string): Promise<string>;
saveAll(items: SystemConfigEntity[]): Promise<SystemConfigEntity[]>;
emitUpdate(config: SystemConfig): void;
deleteKeys(keys: string[]): Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {
} from '@test';
import { when } from 'jest-when';
import { Stats } from 'node:fs';
import { SystemConfigCore } from '../system-config';

describe(StorageTemplateService.name, () => {
let sut: StorageTemplateService;
Expand Down Expand Up @@ -71,7 +70,7 @@ describe(StorageTemplateService.name, () => {
databaseMock,
);

SystemConfigCore.create(configMock).config$.next(defaults);
sut.onConfigUpdate(defaults);
});

describe('handleMigrationSingle', () => {
Expand Down
19 changes: 10 additions & 9 deletions server/src/domain/storage-template/storage-template.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AssetEntity, AssetPathType, AssetType, SystemConfig } from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import handlebar from 'handlebars';
import * as luxon from 'luxon';
import path from 'node:path';
Expand Down Expand Up @@ -75,7 +76,6 @@ export class StorageTemplateService {
) {
this.configCore = SystemConfigCore.create(configRepository);
this.configCore.addValidator((config) => this.validate(config));
this.configCore.config$.subscribe((config) => this.onConfig(config));
this.storageCore = StorageCore.create(
assetRepository,
moveRepository,
Expand All @@ -86,6 +86,15 @@ export class StorageTemplateService {
);
}

@OnEvent('config_update')
onConfigUpdate(config: SystemConfig) {
const template = config.storageTemplate.template;
if (!this._template || template !== this.template.raw) {
this.logger.debug(`Compiling new storage template: ${template}`);
this._template = this.compile(template);
}
}

async handleMigrationSingle({ id }: IEntityJob): Promise<JobStatus> {
const config = await this.configCore.getConfig();
const storageTemplateEnabled = config.storageTemplate.enabled;
Expand Down Expand Up @@ -279,14 +288,6 @@ export class StorageTemplateService {
}
}

private onConfig(config: SystemConfig) {
const template = config.storageTemplate.template;
if (!this._template || template !== this.template.raw) {
this.logger.debug(`Compiling new storage template: ${template}`);
this._template = this.compile(template);
}
}

private compile(template: string) {
return {
raw: template,
Expand Down
10 changes: 3 additions & 7 deletions server/src/domain/system-config/system-config.core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ import {
VideoCodec,
} from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { BadRequestException, ForbiddenException, Injectable } from '@nestjs/common';
import { BadRequestException, ForbiddenException } from '@nestjs/common';
import { CronExpression } from '@nestjs/schedule';
import { plainToInstance } from 'class-transformer';
import { validate } from 'class-validator';
import { load as loadYaml } from 'js-yaml';
import * as _ from 'lodash';
import { Subject } from 'rxjs';
import { QueueName } from '../job/job.constants';
import { ISystemConfigRepository } from '../repositories';
import { SystemConfigDto } from './dto';
Expand Down Expand Up @@ -164,14 +163,11 @@ export type FeatureFlags = Record<FeatureFlag, boolean>;

let instance: SystemConfigCore | null;

@Injectable()
export class SystemConfigCore {
private logger = new ImmichLogger(SystemConfigCore.name);
private validators: SystemConfigValidator[] = [];
private configCache: SystemConfigEntity<SystemConfigValue>[] | null = null;

public config$ = new Subject<SystemConfig>();

private constructor(private repository: ISystemConfigRepository) {}

static create(repository: ISystemConfigRepository) {
Expand Down Expand Up @@ -327,15 +323,15 @@ export class SystemConfigCore {

const config = await this.getConfig();

this.config$.next(config);
this.repository.emitUpdate(config);

return config;
}

public async refreshConfig() {
const newConfig = await this.getConfig(true);

this.config$.next(newConfig);
this.repository.emitUpdate(newConfig);
}

private async loadFromFile(filepath: string, force = false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,8 @@ describe(SystemConfigService.name, () => {

describe('refreshConfig', () => {
it('should notify the subscribers', async () => {
const changeMock = jest.fn();
const subscription = sut.config$.subscribe(changeMock);

await sut.refreshConfig();

expect(changeMock).toHaveBeenCalledWith(defaults);

subscription.unsubscribe();
expect(configMock.emitUpdate).toHaveBeenCalledWith(defaults);
});
});

Expand Down

0 comments on commit 8bfd030

Please sign in to comment.