Skip to content

Commit 6b5c1f7

Browse files
committed
šŸ› fix(notifications): preserve update-applied success events across rename race (#290)
1 parent c9f21a7 commit 6b5c1f7

File tree

12 files changed

+318
-20
lines changed

12 files changed

+318
-20
lines changed

ā€Žapp/agent/AgentClient.test.tsā€Ž

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,29 @@ describe('AgentClient', () => {
954954
expect(event.emitContainerUpdateApplied).toHaveBeenCalledWith('local_nginx');
955955
});
956956

957+
test('should emit update-applied payload with agent context when agent sends object payload', async () => {
958+
await client.handleEvent('dd:update-applied', {
959+
containerName: 'local_nginx',
960+
container: {
961+
id: 'c1',
962+
name: 'nginx',
963+
watcher: 'local',
964+
updateAvailable: true,
965+
updateKind: { kind: 'tag', semverDiff: 'major' },
966+
},
967+
});
968+
969+
expect(event.emitContainerUpdateApplied).toHaveBeenCalledWith({
970+
containerName: 'local_nginx',
971+
container: expect.objectContaining({
972+
id: 'c1',
973+
name: 'nginx',
974+
watcher: 'local',
975+
agent: 'test-agent',
976+
}),
977+
});
978+
});
979+
957980
test('should ignore update-applied when data is an empty string', async () => {
958981
await client.handleEvent('dd:update-applied', '');
959982

@@ -1369,6 +1392,40 @@ describe('AgentClient', () => {
13691392
});
13701393
});
13711394

1395+
describe('getWatcher', () => {
1396+
test('should fetch watcher detail from the agent', async () => {
1397+
axios.get.mockResolvedValue({
1398+
data: {
1399+
id: 'docker.local',
1400+
type: 'docker',
1401+
name: 'local',
1402+
configuration: { cron: '0 * * * *' },
1403+
metadata: { nextRunAt: '2026-04-09T13:00:00.000Z' },
1404+
},
1405+
});
1406+
1407+
const result = await client.getWatcher('docker', 'local');
1408+
1409+
expect(axios.get).toHaveBeenCalledWith(
1410+
expect.stringContaining('/api/watchers/docker/local'),
1411+
expect.any(Object),
1412+
);
1413+
expect(result).toEqual({
1414+
id: 'docker.local',
1415+
type: 'docker',
1416+
name: 'local',
1417+
configuration: { cron: '0 * * * *' },
1418+
metadata: { nextRunAt: '2026-04-09T13:00:00.000Z' },
1419+
});
1420+
});
1421+
1422+
test('should throw when fetching watcher detail fails', async () => {
1423+
axios.get.mockRejectedValue(new Error('watcher fetch failed'));
1424+
1425+
await expect(client.getWatcher('docker', 'local')).rejects.toThrow('watcher fetch failed');
1426+
});
1427+
});
1428+
13721429
describe('watchContainer', () => {
13731430
test('should post to watcher container endpoint and process report', async () => {
13741431
const report = { container: { id: 'c1' } };

ā€Žapp/agent/AgentClient.tsā€Ž

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { StringDecoder } from 'node:string_decoder';
44
import axios, { type AxiosRequestConfig } from 'axios';
55
import type { Logger } from 'pino';
66
import type {
7+
ContainerUpdateAppliedEventPayload,
78
ContainerUpdateFailedEventPayload,
89
SecurityAlertEventPayload,
910
SecurityAlertSummary,
@@ -51,9 +52,12 @@ interface AgentClientRuntimeInfo {
5152
}
5253

5354
interface AgentComponentDescriptor {
55+
id?: string;
5456
type: string;
5557
name: string;
5658
configuration: Record<string, unknown>;
59+
agent?: string;
60+
metadata?: Record<string, unknown>;
5761
}
5862

5963
interface AgentRuntimeAckPayload {
@@ -90,6 +94,17 @@ const INITIAL_SSE_RECONNECT_DELAY_MS = 1_000;
9094
const MAX_SSE_RECONNECT_DELAY_MS = 60_000;
9195
const REMOTE_UPDATE_TRIGGER_TYPES = new Set(['docker', 'dockercompose']);
9296

97+
function isContainerUpdateAppliedEventPayload(
98+
data: unknown,
99+
): data is ContainerUpdateAppliedEventPayload {
100+
if (!data || typeof data !== 'object') {
101+
return false;
102+
}
103+
104+
const containerName = (data as { containerName?: unknown }).containerName;
105+
return typeof containerName === 'string' && containerName.length > 0;
106+
}
107+
93108
export class AgentClient {
94109
public name: string;
95110
public config: AgentClientConfig;
@@ -604,6 +619,17 @@ export class AgentClient {
604619
case 'dd:update-applied':
605620
if (typeof data === 'string' && data.length > 0) {
606621
await emitContainerUpdateApplied(data);
622+
} else if (isContainerUpdateAppliedEventPayload(data)) {
623+
await emitContainerUpdateApplied({
624+
containerName: data.containerName,
625+
container:
626+
data.container && typeof data.container === 'object'
627+
? {
628+
...data.container,
629+
agent: this.name,
630+
}
631+
: undefined,
632+
});
607633
}
608634
return;
609635
case 'dd:update-failed': {
@@ -742,6 +768,21 @@ export class AgentClient {
742768
}
743769
}
744770

771+
async getWatcher(watcherType: string, watcherName: string) {
772+
try {
773+
const response = await axios.get<AgentComponentDescriptor>(
774+
`${this.baseUrl}/api/watchers/${encodeURIComponent(watcherType)}/${encodeURIComponent(watcherName)}`,
775+
this.axiosOptions,
776+
);
777+
return response.data;
778+
} catch (error: unknown) {
779+
this.log.error(
780+
`Error fetching watcher on agent: ${sanitizeLogParam(getErrorMessage(error))}`,
781+
);
782+
throw error;
783+
}
784+
}
785+
745786
async watch(watcherType: string, watcherName: string) {
746787
try {
747788
const response = await axios.post<ContainerReport[]>(

ā€Žapp/agent/api/event.test.tsā€Ž

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,28 @@ describe('agent API event', () => {
381381
expect(payload).toContain('"local_nginx"');
382382
});
383383

384+
test('update-applied handler should send object payloads to connected clients', () => {
385+
eventApi.subscribeEvents(req, res);
386+
res.write.mockClear();
387+
eventApi.initEvents();
388+
389+
const updateAppliedHandler = event.registerContainerUpdateApplied.mock.calls[0][0];
390+
updateAppliedHandler({
391+
containerName: 'local_nginx',
392+
container: {
393+
id: 'c1',
394+
name: 'nginx',
395+
watcher: 'local',
396+
},
397+
});
398+
399+
expect(res.write).toHaveBeenCalled();
400+
const payload = res.write.mock.calls[0][0];
401+
expect(payload).toContain('dd:update-applied');
402+
expect(payload).toContain('"containerName":"local_nginx"');
403+
expect(payload).toContain('"name":"nginx"');
404+
});
405+
384406
test('update-failed handler should send SSE to connected clients', () => {
385407
eventApi.subscribeEvents(req, res);
386408
res.write.mockClear();

ā€Žapp/agent/api/event.tsā€Ž

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,8 @@ export function initEvents() {
263263
event.registerWatcherSnapshot((payload: event.WatcherSnapshotEventPayload) =>
264264
sendSseEvent('dd:watcher-snapshot', sanitizeWatcherSnapshotPayloadForAgentSse(payload)),
265265
);
266-
event.registerContainerUpdateApplied((containerName: string) =>
267-
sendSseEvent('dd:update-applied', containerName),
266+
event.registerContainerUpdateApplied((payload: event.ContainerUpdateAppliedEvent) =>
267+
sendSseEvent('dd:update-applied', payload),
268268
);
269269
event.registerContainerUpdateFailed((payload: event.ContainerUpdateFailedEventPayload) =>
270270
sendSseEvent('dd:update-failed', payload),

ā€Žapp/event/audit-subscriptions.test.tsā€Ž

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
import type {
99
AgentDisconnectedEventPayload,
1010
ContainerLifecycleEventPayload,
11+
ContainerUpdateAppliedEvent,
1112
ContainerUpdateFailedEventPayload,
1213
SecurityAlertEventPayload,
1314
} from './index.js';
@@ -54,7 +55,7 @@ function setupAuditSubscriptions(): {
5455

5556
const registrars: AuditSubscriptionRegistrars = {
5657
registerContainerReport: registerOrdered<ContainerReport>(() => {}),
57-
registerContainerUpdateApplied: registerOrdered<string>(() => {}),
58+
registerContainerUpdateApplied: registerOrdered<ContainerUpdateAppliedEvent>(() => {}),
5859
registerContainerUpdateFailed: registerOrdered<ContainerUpdateFailedEventPayload>(() => {}),
5960
registerSecurityAlert: registerOrdered<SecurityAlertEventPayload>((handler) => {
6061
handlers.securityAlert = handler;

ā€Žapp/event/audit-subscriptions.tsā€Ž

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import * as auditStore from '../store/audit.js';
44
import type {
55
AgentDisconnectedEventPayload,
66
ContainerLifecycleEventPayload,
7+
ContainerUpdateAppliedEvent,
78
ContainerUpdateFailedEventPayload,
89
SecurityAlertEventPayload,
910
} from './index.js';
@@ -26,7 +27,7 @@ type EventRegistrarFn<TPayload> = (handler: (payload: TPayload) => void) => void
2627

2728
export interface AuditSubscriptionRegistrars {
2829
registerContainerReport: OrderedEventRegistrarFn<ContainerReport>;
29-
registerContainerUpdateApplied: OrderedEventRegistrarFn<string>;
30+
registerContainerUpdateApplied: OrderedEventRegistrarFn<ContainerUpdateAppliedEvent>;
3031
registerContainerUpdateFailed: OrderedEventRegistrarFn<ContainerUpdateFailedEventPayload>;
3132
registerSecurityAlert: OrderedEventRegistrarFn<SecurityAlertEventPayload>;
3233
registerAgentDisconnected: OrderedEventRegistrarFn<AgentDisconnectedEventPayload>;
@@ -38,6 +39,22 @@ export interface AuditSubscriptionRegistrars {
3839
const securityAlertAuditSeenAt = new Map<string, number>();
3940
const agentDisconnectedAuditSeenAt = new Map<string, number>();
4041

42+
function getContainerUpdateAppliedEventContainerName(
43+
payload: ContainerUpdateAppliedEvent,
44+
): string | undefined {
45+
if (typeof payload === 'string') {
46+
return payload || undefined;
47+
}
48+
49+
if (!payload || typeof payload !== 'object') {
50+
return undefined;
51+
}
52+
53+
return typeof payload.containerName === 'string' && payload.containerName !== ''
54+
? payload.containerName
55+
: undefined;
56+
}
57+
4158
function pruneAuditDedupeCache(
4259
cache: Map<string, number>,
4360
now: number,
@@ -83,12 +100,16 @@ export function registerAuditLogSubscriptions(registrars: AuditSubscriptionRegis
83100
}
84101
}, AUDIT_HANDLER_OPTIONS);
85102

86-
registrars.registerContainerUpdateApplied(async (containerId: string) => {
103+
registrars.registerContainerUpdateApplied(async (payload) => {
104+
const containerName = getContainerUpdateAppliedEventContainerName(payload);
105+
if (!containerName) {
106+
return;
107+
}
87108
auditStore.insertAudit({
88109
id: '',
89110
timestamp: new Date().toISOString(),
90111
action: 'update-applied',
91-
containerName: containerId,
112+
containerName,
92113
status: 'success',
93114
});
94115
getAuditCounter()?.inc({ action: 'update-applied' });

ā€Žapp/event/index.audit.test.tsā€Ž

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ describe('event default audit listeners', () => {
101101
expect(mockInc).toHaveBeenCalledWith({ action: 'update-applied' });
102102
});
103103

104+
test('should record update-applied audits from object payloads', async () => {
105+
const event = await loadEventModule();
106+
107+
await event.emitContainerUpdateApplied({
108+
containerName: 'container-456',
109+
container: { id: 'c1', name: 'nginx', watcher: 'local' },
110+
});
111+
112+
expect(mockInsertAudit).toHaveBeenCalledWith(
113+
expect.objectContaining({
114+
action: 'update-applied',
115+
containerName: 'container-456',
116+
status: 'success',
117+
}),
118+
);
119+
expect(mockInc).toHaveBeenCalledWith({ action: 'update-applied' });
120+
});
121+
104122
test('should record update-failed audits', async () => {
105123
const event = await loadEventModule();
106124

ā€Žapp/event/index.tsā€Ž

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ export interface ContainerUpdateFailedEventPayload {
6969
error: string;
7070
}
7171

72+
export interface ContainerUpdateAppliedEventPayload {
73+
containerName: string;
74+
container?: Container | Record<string, unknown>;
75+
}
76+
77+
export type ContainerUpdateAppliedEvent = string | ContainerUpdateAppliedEventPayload;
78+
7279
export interface SecurityAlertSummary {
7380
unknown: number;
7481
low: number;
@@ -112,7 +119,10 @@ export type ContainerLifecycleEventPayload = Partial<Omit<Container, 'image'>> &
112119
const containerReportHandlers = new Map<number, OrderedEventHandler<ContainerReport>>();
113120
const containerReportsHandlers = new Map<number, OrderedEventHandler<ContainerReport[]>>();
114121
const watcherSnapshotHandlers = new Map<number, OrderedEventHandler<WatcherSnapshotEventPayload>>();
115-
const containerUpdateAppliedHandlers = new Map<number, OrderedEventHandler<string>>();
122+
const containerUpdateAppliedHandlers = new Map<
123+
number,
124+
OrderedEventHandler<ContainerUpdateAppliedEvent>
125+
>();
116126
const containerUpdateFailedHandlers = new Map<
117127
number,
118128
OrderedEventHandler<ContainerUpdateFailedEventPayload>
@@ -229,18 +239,35 @@ export function registerContainerReport(
229239

230240
/**
231241
* Emit ContainerUpdateApplied event.
232-
* @param containerId
242+
* @param payload
233243
*/
234-
export async function emitContainerUpdateApplied(containerId: string): Promise<void> {
235-
await emitOrderedHandlers(containerUpdateAppliedHandlers, containerId);
244+
export async function emitContainerUpdateApplied(
245+
payload: ContainerUpdateAppliedEvent,
246+
): Promise<void> {
247+
await emitOrderedHandlers(containerUpdateAppliedHandlers, payload);
248+
}
249+
250+
export function getContainerUpdateAppliedEventContainerName(
251+
payload: ContainerUpdateAppliedEvent,
252+
): string | undefined {
253+
if (typeof payload === 'string') {
254+
return payload || undefined;
255+
}
256+
257+
if (!payload || typeof payload !== 'object') {
258+
return undefined;
259+
}
260+
261+
const containerName = payload.containerName;
262+
return typeof containerName === 'string' && containerName !== '' ? containerName : undefined;
236263
}
237264

238265
/**
239266
* Register to ContainerUpdateApplied event.
240267
* @param handler
241268
*/
242269
export function registerContainerUpdateApplied(
243-
handler: OrderedEventHandlerFn<string>,
270+
handler: OrderedEventHandlerFn<ContainerUpdateAppliedEvent>,
244271
options: EventHandlerRegistrationOptions = {},
245272
): () => void {
246273
return registerOrderedEventHandler(containerUpdateAppliedHandlers, handler, options);

0 commit comments

Comments
Ā (0)