Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ UEX_ITEMS_SYNC_ENABLED=true
UEX_LOCATIONS_SYNC_ENABLED=true
UEX_API_BASE_URL=https://uexcorp.space/api/2.0
UEX_TIMEOUT_MS=60000
# Milliseconds to wait between UEX API requests (default: 500).
# Increase if UEX rate-limits this application.
UEX_REQUEST_DELAY_MS=500
UEX_BATCH_SIZE=100
UEX_CONCURRENT_CATEGORIES=3
UEX_RETRY_ATTEMPTS=3
Expand Down
112 changes: 112 additions & 0 deletions backend/src/common/services/advisory-lock.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConflictException } from '@nestjs/common';
import { getDataSourceToken } from '@nestjs/typeorm';
import { AdvisoryLockService } from './advisory-lock.service';

describe('AdvisoryLockService', () => {
let service: AdvisoryLockService;
let mockQr: {
connect: jest.Mock;
query: jest.Mock;
release: jest.Mock;
};
let mockDataSource: { createQueryRunner: jest.Mock };

beforeEach(async () => {
mockQr = {
connect: jest.fn().mockResolvedValue(undefined),
query: jest.fn(),
release: jest.fn().mockResolvedValue(undefined),
};

mockDataSource = {
createQueryRunner: jest.fn().mockReturnValue(mockQr),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
AdvisoryLockService,
{
provide: getDataSourceToken(),
useValue: mockDataSource,
},
],
}).compile();

service = module.get<AdvisoryLockService>(AdvisoryLockService);
});

afterEach(() => {
jest.clearAllMocks();
});

describe('withLock', () => {
it('acquires lock, runs fn, releases lock and QueryRunner', async () => {
mockQr.query
.mockResolvedValueOnce([{ acquired: true }]) // pg_try_advisory_lock
.mockResolvedValueOnce([{}]); // pg_advisory_unlock

const fn = jest.fn().mockResolvedValue('result');

const result = await service.withLock('test-lock', fn);

expect(result).toBe('result');
expect(mockDataSource.createQueryRunner).toHaveBeenCalledTimes(1);

// Both queries go through the same QueryRunner
expect(mockQr.connect).toHaveBeenCalledTimes(1);
expect(mockQr.query).toHaveBeenNthCalledWith(
1,
`SELECT pg_try_advisory_lock(hashtext($1)) AS acquired`,
['test-lock'],
);
expect(fn).toHaveBeenCalledTimes(1);
expect(mockQr.query).toHaveBeenNthCalledWith(
2,
`SELECT pg_advisory_unlock(hashtext($1))`,
['test-lock'],
);
expect(mockQr.release).toHaveBeenCalledTimes(1);
});

it('throws ConflictException when lock is not acquired; fn not called; runner still released', async () => {
mockQr.query.mockResolvedValueOnce([{ acquired: false }]);

const fn = jest.fn();

await expect(service.withLock('test-lock', fn)).rejects.toThrow(
ConflictException,
);

expect(fn).not.toHaveBeenCalled();
// unlock query should NOT have been called
expect(mockQr.query).not.toHaveBeenCalledWith(
`SELECT pg_advisory_unlock(hashtext($1))`,
expect.anything(),
);
// runner must still be released
expect(mockQr.release).toHaveBeenCalled();
});

it('releases lock and runner even when fn throws', async () => {
mockQr.query
.mockResolvedValueOnce([{ acquired: true }]) // pg_try_advisory_lock
.mockResolvedValueOnce([{}]); // pg_advisory_unlock

const error = new Error('fn blew up');
const fn = jest.fn().mockRejectedValue(error);

await expect(service.withLock('test-lock', fn)).rejects.toThrow(
'fn blew up',
);

// unlock still called
expect(mockQr.query).toHaveBeenCalledWith(
`SELECT pg_advisory_unlock(hashtext($1))`,
['test-lock'],
);
// runner still released
expect(mockQr.release).toHaveBeenCalledTimes(1);
});
});
});
31 changes: 31 additions & 0 deletions backend/src/common/services/advisory-lock.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Injectable, ConflictException } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';

@Injectable()
export class AdvisoryLockService {
constructor(
@InjectDataSource()
private readonly dataSource: DataSource,
) {}

async withLock<T>(lockKey: string, fn: () => Promise<T>): Promise<T> {
const qr = this.dataSource.createQueryRunner();
await qr.connect();
try {
const [{ acquired }] = await qr.query(
`SELECT pg_try_advisory_lock(hashtext($1)) AS acquired`,
[lockKey],
);
if (!acquired)
throw new ConflictException(`Lock '${lockKey}' already held`);
try {
return await fn();
} finally {
await qr.query(`SELECT pg_advisory_unlock(hashtext($1))`, [lockKey]);
}
} finally {
await qr.release();
}
}
}
1 change: 1 addition & 0 deletions backend/src/common/services/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { AdvisoryLockService } from './advisory-lock.service';
3 changes: 2 additions & 1 deletion backend/src/modules/catalog-etl/catalog-etl.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { CatalogEtlService } from './catalog-etl.service';
import { CatalogEtlController } from './catalog-etl.controller';
import { EtlRun } from './entities/etl-run.entity';
import { EtlWarning } from './entities/etl-warning.entity';
import { AdvisoryLockService } from '../../common/services';

@Module({
imports: [TypeOrmModule.forFeature([EtlRun, EtlWarning])],
controllers: [CatalogEtlController],
providers: [CatalogEtlService],
providers: [CatalogEtlService, AdvisoryLockService],
exports: [CatalogEtlService],
})
export class CatalogEtlModule {}
40 changes: 15 additions & 25 deletions backend/src/modules/catalog-etl/catalog-etl.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { ConflictException } from '@nestjs/common';
import { getLoggerToken } from 'nestjs-pino';
import { DataSource } from 'typeorm';
import { CatalogEtlService } from './catalog-etl.service';
import { EtlRun } from './entities/etl-run.entity';
import { EtlWarning } from './entities/etl-warning.entity';
import { EtlStep } from './interfaces/etl-step.interface';
import { AdvisoryLockService } from '../../common/services';

function buildMockRun(overrides: Partial<EtlRun> = {}): EtlRun {
const run = new EtlRun();
Expand All @@ -25,7 +25,7 @@ describe('CatalogEtlService', () => {
let service: CatalogEtlService;
let mockEtlRunRepository: Record<string, jest.Mock>;
let mockEtlWarningRepository: Record<string, jest.Mock>;
let mockDataSource: { query: jest.Mock };
let mockAdvisoryLockService: { withLock: jest.Mock };

beforeEach(async () => {
mockEtlRunRepository = {
Expand All @@ -41,8 +41,10 @@ describe('CatalogEtlService', () => {
find: jest.fn(),
};

mockDataSource = {
query: jest.fn(),
mockAdvisoryLockService = {
withLock: jest
.fn()
.mockImplementation((_key: string, fn: () => Promise<unknown>) => fn()),
};

const module: TestingModule = await Test.createTestingModule({
Expand All @@ -66,8 +68,8 @@ describe('CatalogEtlService', () => {
useValue: mockEtlWarningRepository,
},
{
provide: DataSource,
useValue: mockDataSource,
provide: AdvisoryLockService,
useValue: mockAdvisoryLockService,
},
],
}).compile();
Expand Down Expand Up @@ -101,11 +103,6 @@ describe('CatalogEtlService', () => {
step2,
];

// Lock acquired
mockDataSource.query
.mockResolvedValueOnce([{ acquired: true }]) // pg_try_advisory_lock
.mockResolvedValueOnce([{}]); // pg_advisory_unlock

const initialRun = buildMockRun({ stepsTotal: 2 });
mockEtlRunRepository.create.mockReturnValue(initialRun);
mockEtlRunRepository.save
Expand All @@ -119,6 +116,10 @@ describe('CatalogEtlService', () => {

const result = await service.runEtl();

expect(mockAdvisoryLockService.withLock).toHaveBeenCalledWith(
'catalog_etl',
expect.any(Function),
);
expect(result.status).toBe('completed');
expect(result.stepsSucceeded).toBe(2);
expect(result.stepsFailed).toBe(0);
Expand All @@ -141,10 +142,6 @@ describe('CatalogEtlService', () => {
failingStep,
];

mockDataSource.query
.mockResolvedValueOnce([{ acquired: true }])
.mockResolvedValueOnce([{}]);

const initialRun = buildMockRun({ stepsTotal: 2 });
mockEtlRunRepository.create.mockReturnValue(initialRun);
mockEtlRunRepository.save
Expand Down Expand Up @@ -186,10 +183,6 @@ describe('CatalogEtlService', () => {
failingStep2,
];

mockDataSource.query
.mockResolvedValueOnce([{ acquired: true }])
.mockResolvedValueOnce([{}]);

const initialRun = buildMockRun({ stepsTotal: 2 });
mockEtlRunRepository.create.mockReturnValue(initialRun);
mockEtlRunRepository.save
Expand All @@ -211,10 +204,6 @@ describe('CatalogEtlService', () => {

it('no steps registered → status no_steps', async () => {
// ETL_STEPS remains [] (default)
mockDataSource.query
.mockResolvedValueOnce([{ acquired: true }])
.mockResolvedValueOnce([{}]);

const initialRun = buildMockRun({ stepsTotal: 0 });
mockEtlRunRepository.create.mockReturnValue(initialRun);
mockEtlRunRepository.save
Expand All @@ -232,12 +221,13 @@ describe('CatalogEtlService', () => {
});

it('concurrent lock rejection → throws 409 ConflictException', async () => {
mockDataSource.query.mockResolvedValue([{ acquired: false }]);
mockAdvisoryLockService.withLock.mockRejectedValueOnce(
new ConflictException("Lock 'catalog_etl' already held"),
);

const runPromise = service.runEtl();

await expect(runPromise).rejects.toThrow(ConflictException);
await expect(runPromise).rejects.toThrow('ETL run already in progress');

// Repository should never be touched when lock not acquired
expect(mockEtlRunRepository.create).not.toHaveBeenCalled();
Expand Down
Loading
Loading