Skip to content

Commit bacbc1c

Browse files
committed
✨ feat(triggers): backend-driven update queue with per-trigger concurrency limit
- Add 'queued' status and phase to container update operation model - API handler creates operation as 'queued' immediately on update request - Docker trigger uses pLimit(1) semaphore to serialize concurrent updates - Executor transitions queued→in-progress instead of inserting duplicate - Container list API attaches both queued and in-progress operations - UI recognizes backend 'queued' status for "Queued" display
1 parent 56580bf commit bacbc1c

17 files changed

+190
-34
lines changed

app/api/container-actions.test.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ vi.mock('../configuration', () => ({
5656
getVersion: vi.fn(() => 'test-version'),
5757
}));
5858

59+
vi.mock('../store/update-operation', () => ({
60+
insertOperation: vi.fn((op) => ({ id: op.id || 'op-mock', ...op })),
61+
updateOperation: vi.fn(),
62+
getOperationById: vi.fn(),
63+
getOperationsByContainerName: vi.fn(() => []),
64+
getInProgressOperationByContainerName: vi.fn(),
65+
getInProgressOperationByContainerId: vi.fn(),
66+
getActiveOperationByContainerName: vi.fn(),
67+
getActiveOperationByContainerId: vi.fn(),
68+
}));
69+
5970
vi.mock('../log', () => ({
6071
default: { child: vi.fn(() => ({ info: vi.fn(), warn: vi.fn(), debug: vi.fn() })) },
6172
}));

app/api/container-actions.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { type Container, clearDetectedUpdateState } from '../model/container.js'
99
import { getContainerActionsCounter } from '../prometheus/container-actions.js';
1010
import * as registry from '../registry/index.js';
1111
import * as storeContainer from '../store/container.js';
12+
import * as updateOperationStore from '../store/update-operation.js';
1213
import Trigger from '../triggers/providers/Trigger.js';
1314
import { recordAuditEvent } from './audit-events.js';
1415
import { findDockerTriggerForContainer, NO_DOCKER_TRIGGER_FOUND_ERROR } from './docker-trigger.js';
@@ -215,6 +216,14 @@ async function updateContainer(req: Request, res: Response) {
215216
const operationId = crypto.randomUUID();
216217
getContainerActionsCounter()?.inc({ action: 'container-update' });
217218

219+
updateOperationStore.insertOperation({
220+
id: operationId,
221+
containerId: container.id,
222+
containerName: container.name,
223+
status: 'queued',
224+
phase: 'queued',
225+
});
226+
218227
void (async () => {
219228
try {
220229
await trigger.trigger(container, { operationId });

app/api/container.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,11 @@ vi.mock('../store/audit', () => ({
6262

6363
vi.mock('../store/update-operation', () => ({
6464
getOperationsByContainerName: (...args: unknown[]) => mockGetOperationsByContainerName(...args),
65+
getOperationById: vi.fn(() => undefined),
6566
getInProgressOperationByContainerName: vi.fn(() => undefined),
6667
getInProgressOperationByContainerId: vi.fn(() => undefined),
68+
getActiveOperationByContainerName: vi.fn(() => undefined),
69+
getActiveOperationByContainerId: vi.fn(() => undefined),
6770
}));
6871

6972
vi.mock('../registry', () => ({
@@ -1432,6 +1435,8 @@ describe('Container Router', () => {
14321435
getOperationsByContainerName: vi.fn(() => []),
14331436
getInProgressOperationByContainerName: vi.fn(() => undefined),
14341437
getInProgressOperationByContainerId: vi.fn(() => undefined),
1438+
getActiveOperationByContainerName: vi.fn(() => undefined),
1439+
getActiveOperationByContainerId: vi.fn(() => undefined),
14351440
},
14361441
},
14371442
agentApi: {

app/api/container/crud-context.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ export interface UpdateOperationStoreApi {
2929
getOperationsByContainerName: (containerName: string) => unknown[];
3030
getInProgressOperationByContainerName: (containerName: string) => unknown | undefined;
3131
getInProgressOperationByContainerId: (containerId: string) => unknown | undefined;
32+
getActiveOperationByContainerName: (containerName: string) => unknown | undefined;
33+
getActiveOperationByContainerId: (containerId: string) => unknown | undefined;
3234
}
3335

3436
export interface ServerConfiguration {

app/api/container/crud.test.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ function createHarness(options: { containers?: any[] } = {}) {
158158
getOperationsByContainerName: vi.fn(() => []),
159159
getInProgressOperationByContainerName: vi.fn(() => undefined),
160160
getInProgressOperationByContainerId: vi.fn(() => undefined),
161+
getActiveOperationByContainerName: vi.fn(() => undefined),
162+
getActiveOperationByContainerId: vi.fn(() => undefined),
161163
},
162164
getServerConfiguration: vi.fn(() => ({ feature: { delete: true } })),
163165
getAgent: vi.fn(),
@@ -2370,7 +2372,7 @@ describe('api/container/crud', () => {
23702372
const harness = createHarness({
23712373
containers: [createContainer({ id: 'c1', name: 'edge-api' })],
23722374
});
2373-
harness.deps.updateOperationStore.getInProgressOperationByContainerName.mockReturnValue({
2375+
harness.deps.updateOperationStore.getActiveOperationByContainerName.mockReturnValue({
23742376
id: 'op-1',
23752377
status: 'in-progress',
23762378
phase: 'old-stopped',
@@ -2383,7 +2385,7 @@ describe('api/container/crud', () => {
23832385
const singleRes = callGetContainer(harness.handlers, 'c1');
23842386

23852387
expect(
2386-
harness.deps.updateOperationStore.getInProgressOperationByContainerName,
2388+
harness.deps.updateOperationStore.getActiveOperationByContainerName,
23872389
).toHaveBeenCalledWith('edge-api');
23882390
expect(listRes.json).toHaveBeenCalledWith(
23892391
expect.objectContaining({
@@ -2415,7 +2417,7 @@ describe('api/container/crud', () => {
24152417
const harness = createHarness({
24162418
containers: [createContainer({ id: 'new-c1', name: 'edge-api' })],
24172419
});
2418-
harness.deps.updateOperationStore.getInProgressOperationByContainerId.mockImplementation(
2420+
harness.deps.updateOperationStore.getActiveOperationByContainerId.mockImplementation(
24192421
(id: string) =>
24202422
id === 'new-c1'
24212423
? {
@@ -2435,10 +2437,10 @@ describe('api/container/crud', () => {
24352437
const singleRes = callGetContainer(harness.handlers, 'new-c1');
24362438

24372439
expect(
2438-
harness.deps.updateOperationStore.getInProgressOperationByContainerId,
2440+
harness.deps.updateOperationStore.getActiveOperationByContainerId,
24392441
).toHaveBeenCalledWith('new-c1');
24402442
expect(
2441-
harness.deps.updateOperationStore.getInProgressOperationByContainerName,
2443+
harness.deps.updateOperationStore.getActiveOperationByContainerName,
24422444
).not.toHaveBeenCalled();
24432445
expect(listRes.json).toHaveBeenCalledWith(
24442446
expect.objectContaining({
@@ -2470,7 +2472,7 @@ describe('api/container/crud', () => {
24702472
const harness = createHarness({
24712473
containers: [createContainer({ id: 'c1', name: 'edge-api' })],
24722474
});
2473-
harness.deps.updateOperationStore.getInProgressOperationByContainerName.mockReturnValue({
2475+
harness.deps.updateOperationStore.getActiveOperationByContainerName.mockReturnValue({
24742476
id: 'op-1',
24752477
status: 'success',
24762478
phase: 'complete',

app/api/container/handlers/list.test.ts

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ function createMockContext(operation?: unknown): CrudHandlerContext {
1111
getOperationsByContainerName: vi.fn(),
1212
getInProgressOperationByContainerName: vi.fn().mockReturnValue(operation),
1313
getInProgressOperationByContainerId: vi.fn(),
14+
getActiveOperationByContainerName: vi.fn().mockReturnValue(operation),
15+
getActiveOperationByContainerId: vi.fn(),
1416
},
1517
getServerConfiguration: vi.fn(),
1618
getAgent: vi.fn(),
@@ -112,21 +114,17 @@ describe('attachInProgressUpdateOperation', () => {
112114
};
113115
const context = createMockContext();
114116
(
115-
context.updateOperationStore.getInProgressOperationByContainerId as ReturnType<typeof vi.fn>
117+
context.updateOperationStore.getActiveOperationByContainerId as ReturnType<typeof vi.fn>
116118
).mockReturnValue(byIdResult);
117119
(
118-
context.updateOperationStore.getInProgressOperationByContainerName as ReturnType<typeof vi.fn>
120+
context.updateOperationStore.getActiveOperationByContainerName as ReturnType<typeof vi.fn>
119121
).mockReturnValue(byNameResult);
120122

121123
const result = attachInProgressUpdateOperation(context, container);
122124

123125
expect(result.updateOperation?.id).toBe('op-by-id');
124-
expect(context.updateOperationStore.getInProgressOperationByContainerId).toHaveBeenCalledWith(
125-
'c1',
126-
);
127-
expect(
128-
context.updateOperationStore.getInProgressOperationByContainerName,
129-
).not.toHaveBeenCalled();
126+
expect(context.updateOperationStore.getActiveOperationByContainerId).toHaveBeenCalledWith('c1');
127+
expect(context.updateOperationStore.getActiveOperationByContainerName).not.toHaveBeenCalled();
130128
});
131129

132130
test('does not attach name-matched operation that belongs to a different container ID (#256)', () => {
@@ -142,10 +140,10 @@ describe('attachInProgressUpdateOperation', () => {
142140
};
143141
const context = createMockContext();
144142
(
145-
context.updateOperationStore.getInProgressOperationByContainerId as ReturnType<typeof vi.fn>
143+
context.updateOperationStore.getActiveOperationByContainerId as ReturnType<typeof vi.fn>
146144
).mockImplementation((id: string) => (id === 'host1-abc' ? operationForA : undefined));
147145
(
148-
context.updateOperationStore.getInProgressOperationByContainerName as ReturnType<typeof vi.fn>
146+
context.updateOperationStore.getActiveOperationByContainerName as ReturnType<typeof vi.fn>
149147
).mockReturnValue(operationForA);
150148

151149
const resultA = attachInProgressUpdateOperation(context, containerA);
@@ -166,10 +164,10 @@ describe('attachInProgressUpdateOperation', () => {
166164
};
167165
const context = createMockContext();
168166
(
169-
context.updateOperationStore.getInProgressOperationByContainerId as ReturnType<typeof vi.fn>
167+
context.updateOperationStore.getActiveOperationByContainerId as ReturnType<typeof vi.fn>
170168
).mockReturnValue(undefined);
171169
(
172-
context.updateOperationStore.getInProgressOperationByContainerName as ReturnType<typeof vi.fn>
170+
context.updateOperationStore.getActiveOperationByContainerName as ReturnType<typeof vi.fn>
173171
).mockReturnValue(legacyOperation);
174172

175173
const result = attachInProgressUpdateOperation(context, container);

app/api/container/handlers/list.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ export function attachInProgressUpdateOperation(
8383
context: CrudHandlerContext,
8484
container: Container,
8585
): Container {
86-
const byId = context.updateOperationStore.getInProgressOperationByContainerId(container.id);
86+
const byId = context.updateOperationStore.getActiveOperationByContainerId(container.id);
8787
// Name-based fallback only for legacy operations that predate the containerId field.
8888
const byName = byId
8989
? undefined
90-
: context.updateOperationStore.getInProgressOperationByContainerName(container.name);
90+
: context.updateOperationStore.getActiveOperationByContainerName(container.name);
9191
const isLegacyOperation =
9292
byName && typeof byName === 'object' && !('containerId' in (byName as Record<string, unknown>));
9393
const matched = byId ?? (isLegacyOperation ? byName : undefined);

app/api/container/handlers/release-notes.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ function createMockContext(overrides: Partial<CrudHandlerContext> = {}): CrudHan
3535
getOperationsByContainerName: vi.fn(),
3636
getInProgressOperationByContainerName: vi.fn(),
3737
getInProgressOperationByContainerId: vi.fn(),
38+
getActiveOperationByContainerName: vi.fn(),
39+
getActiveOperationByContainerId: vi.fn(),
3840
},
3941
getServerConfiguration: vi.fn(),
4042
getAgent: vi.fn(),

app/model/container-update-operation.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export const CONTAINER_UPDATE_OPERATION_STATUSES = [
2+
'queued',
23
'in-progress',
34
'succeeded',
45
'rolled-back',
@@ -8,6 +9,7 @@ export const CONTAINER_UPDATE_OPERATION_STATUSES = [
89
export type ContainerUpdateOperationStatus = (typeof CONTAINER_UPDATE_OPERATION_STATUSES)[number];
910

1011
export const CONTAINER_UPDATE_OPERATION_PHASES = [
12+
'queued',
1113
'pulling',
1214
'pull-failed',
1315
'prepare',

app/store/update-operation.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ type UpdateOperationQuery =
4444
| { 'data.id': string }
4545
| { 'data.containerName': string }
4646
| { 'data.containerName': string; 'data.status': ContainerUpdateOperationStatus }
47+
| { 'data.containerId': string }
4748
| { 'data.containerId': string; 'data.status': ContainerUpdateOperationStatus }
49+
| { 'data.newContainerId': string }
4850
| { 'data.newContainerId': string; 'data.status': ContainerUpdateOperationStatus };
4951

5052
interface UpdateOperationCollection {
@@ -170,6 +172,17 @@ export function insertOperation(operation: InsertUpdateOperationInput): UpdateOp
170172
return operationToSave;
171173
}
172174

175+
/**
176+
* Return a single operation by its unique ID.
177+
*/
178+
export function getOperationById(id: string): UpdateOperation | undefined {
179+
if (!updateOperationCollection || !id) {
180+
return undefined;
181+
}
182+
183+
return updateOperationCollection.findOne({ 'data.id': id })?.data;
184+
}
185+
173186
/**
174187
* Update an operation by id.
175188
*/
@@ -254,6 +267,58 @@ export function getInProgressOperationByContainerId(
254267
return operations.at(0);
255268
}
256269

270+
const ACTIVE_STATUSES = ['in-progress', 'queued'] as const;
271+
272+
/**
273+
* Return the latest active (in-progress OR queued) operation for a container name.
274+
*/
275+
export function getActiveOperationByContainerName(
276+
containerName: string,
277+
): UpdateOperation | undefined {
278+
if (!updateOperationCollection) {
279+
return undefined;
280+
}
281+
282+
const operations = updateOperationCollection
283+
.find({ 'data.containerName': containerName })
284+
.filter((item) => (ACTIVE_STATUSES as readonly string[]).includes(item.data.status))
285+
.map((item) => item.data)
286+
.sort((a, b) => getOperationTimestamp(b) - getOperationTimestamp(a));
287+
288+
return operations.at(0);
289+
}
290+
291+
/**
292+
* Return the latest active (in-progress OR queued) operation for a container ID.
293+
*/
294+
export function getActiveOperationByContainerId(containerId: string): UpdateOperation | undefined {
295+
if (!updateOperationCollection || !containerId) {
296+
return undefined;
297+
}
298+
299+
const operationsById = new Map<string, UpdateOperation>();
300+
301+
for (const document of updateOperationCollection.find({ 'data.containerId': containerId })) {
302+
if ((ACTIVE_STATUSES as readonly string[]).includes(document.data.status)) {
303+
operationsById.set(document.data.id, document.data);
304+
}
305+
}
306+
307+
for (const document of updateOperationCollection.find({
308+
'data.newContainerId': containerId,
309+
})) {
310+
if ((ACTIVE_STATUSES as readonly string[]).includes(document.data.status)) {
311+
operationsById.set(document.data.id, document.data);
312+
}
313+
}
314+
315+
const operations = [...operationsById.values()].sort(
316+
(a, b) => getOperationTimestamp(b) - getOperationTimestamp(a),
317+
);
318+
319+
return operations.at(0);
320+
}
321+
257322
export function getOperationsByContainerName(containerName: string): UpdateOperation[] {
258323
if (!updateOperationCollection) {
259324
return [];

0 commit comments

Comments
 (0)