Skip to content

Commit 982b4d7

Browse files
committed
🐛 fix(triggers): evict digest buffer entries after successful batch send (#282)
In batch+digest mode, a watcher cycle fires both the per-container event (digest handler buffers) and the batch event (batch handler sends immediately). Without coordination, the same container was sent twice: once immediately via batch and again at the next digest cron flush. After a successful triggerBatch(), evict all sent containers from the digest buffer. On failure, retain them so the digest flush acts as fallback delivery.
1 parent b7dc2a2 commit 982b4d7

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed

app/triggers/providers/Trigger.test.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4133,6 +4133,94 @@ describe('batch+digest mode', () => {
41334133
expect(triggerBatchSpy).toHaveBeenCalledWith([report.container]);
41344134
});
41354135

4136+
test('batch handler should evict sent containers from digest buffer in batch+digest mode', async () => {
4137+
let batchCallback;
4138+
vi.mocked(event.registerContainerReports).mockImplementation((cb) => {
4139+
batchCallback = cb;
4140+
return vi.fn();
4141+
});
4142+
4143+
await trigger.register('trigger', 'test', 'combined-trigger', {
4144+
...configurationValid,
4145+
mode: 'batch+digest',
4146+
});
4147+
4148+
const container = {
4149+
id: 'c1',
4150+
name: 'app',
4151+
watcher: 'test',
4152+
updateAvailable: true,
4153+
updateKind: { kind: 'tag', localValue: '1.0', remoteValue: '2.0' },
4154+
};
4155+
4156+
// Pre-populate the digest buffer (as if the digest handler already buffered it)
4157+
await trigger.handleContainerReportDigest({
4158+
container,
4159+
changed: true,
4160+
} as any);
4161+
4162+
const triggerBatchSpy = vi.spyOn(trigger, 'triggerBatch').mockResolvedValue(undefined);
4163+
4164+
// Batch handler fires for the same container
4165+
await batchCallback?.([{ container, changed: true }] as any);
4166+
4167+
expect(triggerBatchSpy).toHaveBeenCalledWith([container]);
4168+
4169+
// The digest buffer should now be empty — batch evicted the entry
4170+
const cronCallback = vi.mocked(mockCron.schedule).mock.calls[0]?.[1];
4171+
triggerBatchSpy.mockClear();
4172+
cronCallback?.();
4173+
await Promise.resolve();
4174+
4175+
// Digest flush should NOT send because the buffer was evicted
4176+
expect(triggerBatchSpy).not.toHaveBeenCalled();
4177+
});
4178+
4179+
test('batch handler should not evict from digest buffer when batch send fails', async () => {
4180+
let batchCallback;
4181+
vi.mocked(event.registerContainerReports).mockImplementation((cb) => {
4182+
batchCallback = cb;
4183+
return vi.fn();
4184+
});
4185+
4186+
await trigger.register('trigger', 'test', 'combined-trigger', {
4187+
...configurationValid,
4188+
mode: 'batch+digest',
4189+
});
4190+
4191+
const container = {
4192+
id: 'c1',
4193+
name: 'app',
4194+
watcher: 'test',
4195+
updateAvailable: true,
4196+
updateKind: { kind: 'tag', localValue: '1.0', remoteValue: '2.0' },
4197+
};
4198+
4199+
// Pre-populate the digest buffer
4200+
await trigger.handleContainerReportDigest({
4201+
container,
4202+
changed: true,
4203+
} as any);
4204+
4205+
// Batch send fails
4206+
const triggerBatchSpy = vi
4207+
.spyOn(trigger, 'triggerBatch')
4208+
.mockRejectedValue(new Error('SMTP timeout'));
4209+
4210+
await batchCallback?.([{ container, changed: true }] as any);
4211+
4212+
// Buffer should still have the entry since batch failed
4213+
triggerBatchSpy.mockResolvedValue(undefined);
4214+
storeContainer.getContainersRaw.mockReturnValue([container]);
4215+
4216+
const cronCallback = vi.mocked(mockCron.schedule).mock.calls[0]?.[1];
4217+
cronCallback?.();
4218+
await Promise.resolve();
4219+
4220+
// Digest flush SHOULD still send because batch failed — entry was retained
4221+
expect(triggerBatchSpy).toHaveBeenCalledWith([container]);
4222+
});
4223+
41364224
test('digest handler should buffer containers and flush them on cron in batch+digest mode', async () => {
41374225
let digestCallback;
41384226
vi.mocked(event.registerContainerReport).mockImplementation((cb) => {

app/triggers/providers/Trigger.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,13 @@ class Trigger extends Component {
954954
if (containersFiltered.length > 0) {
955955
this.log.debug('Run batch');
956956
await this.triggerBatch(containersFiltered);
957+
// In batch+digest mode, evict successfully-batched containers from the
958+
// digest buffer so they are not sent again at the next digest flush.
959+
if (this.digestBuffer.size > 0) {
960+
for (const container of containersFiltered) {
961+
this.digestBuffer.delete(fullName(container));
962+
}
963+
}
957964
}
958965
} catch (e: unknown) {
959966
const errorMessage = Trigger.getErrorMessage(e);

0 commit comments

Comments
 (0)