Skip to content

Commit

Permalink
Only resetting metrics if metrics have not been collected for metrics…
Browse files Browse the repository at this point in the history
…_reset_interval
  • Loading branch information
ymao1 committed Apr 29, 2024
1 parent 89b7a65 commit 70b962e
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 4 deletions.
188 changes: 188 additions & 0 deletions x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,194 @@ describe('createAggregator', () => {
clock.restore();
});
});

test('does not reset count when configured metrics reset interval expires if metrics have been reset via reset$ event', async () => {
const reset$ = new Subject<boolean>();
const clock = sinon.useFakeTimers();
clock.tick(0);
const events1 = [
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
taskClaimFailureEvent,
taskClaimSuccessEvent,
];

const events2 = [
taskClaimSuccessEvent,
taskClaimFailureEvent,
taskClaimFailureEvent,
taskClaimSuccessEvent,
taskClaimSuccessEvent,
];
const events$ = new Subject<TaskLifecycleEvent>();

const taskClaimAggregator = createAggregator({
key: 'task_claim',
events$,
config: {
...config,
metrics_reset_interval: 50,
},
reset$,
eventFilter: (event: TaskLifecycleEvent) => isTaskPollingCycleEvent(event),
metricsAggregator: new TaskClaimMetricsAggregator(),
});

return new Promise<void>((resolve) => {
taskClaimAggregator
.pipe(
// skip initial metric which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
take(events1.length + events2.length + 1),
bufferCount(events1.length + events2.length + 1)
)
.subscribe((metrics: Array<AggregatedStat<TaskClaimMetric>>) => {
expect(metrics[0]).toEqual({
key: 'task_claim',
value: {
success: 1,
total: 1,
total_errors: 0,
duration: { counts: [1], values: [100] },
duration_values: [10],
},
});
expect(metrics[1]).toEqual({
key: 'task_claim',
value: {
success: 2,
total: 2,
total_errors: 0,
duration: { counts: [2], values: [100] },
duration_values: [10, 10],
},
});
expect(metrics[2]).toEqual({
key: 'task_claim',
value: {
success: 3,
total: 3,
total_errors: 0,
duration: { counts: [3], values: [100] },
duration_values: [10, 10, 10],
},
});
expect(metrics[3]).toEqual({
key: 'task_claim',
value: {
success: 4,
total: 4,
total_errors: 0,
duration: { counts: [4], values: [100] },
duration_values: [10, 10, 10, 10],
},
});
expect(metrics[4]).toEqual({
key: 'task_claim',
value: {
success: 4,
total: 5,
total_errors: 1,
duration: { counts: [4], values: [100] },
duration_values: [10, 10, 10, 10],
},
});
expect(metrics[5]).toEqual({
key: 'task_claim',
value: {
success: 5,
total: 6,
total_errors: 1,
duration: { counts: [5], values: [100] },
duration_values: [10, 10, 10, 10, 10],
},
});
// reset interval fired here but stats should not clear
expect(metrics[6]).toEqual({
key: 'task_claim',
value: {
success: 6,
total: 7,
total_errors: 1,
duration: { counts: [6], values: [100] },
duration_values: [10, 10, 10, 10, 10, 10],
},
});
expect(metrics[7]).toEqual({
key: 'task_claim',
value: {
success: 6,
total: 8,
total_errors: 2,
duration: { counts: [6], values: [100] },
duration_values: [10, 10, 10, 10, 10, 10],
},
});
expect(metrics[8]).toEqual({
key: 'task_claim',
value: {
success: 6,
total: 9,
total_errors: 3,
duration: { counts: [6], values: [100] },
duration_values: [10, 10, 10, 10, 10, 10],
},
});
expect(metrics[9]).toEqual({
key: 'task_claim',
value: {
success: 7,
total: 10,
total_errors: 3,
duration: { counts: [7], values: [100] },
duration_values: [10, 10, 10, 10, 10, 10, 10],
},
});
expect(metrics[10]).toEqual({
key: 'task_claim',
value: {
success: 8,
total: 11,
total_errors: 3,
duration: { counts: [8], values: [100] },
duration_values: [10, 10, 10, 10, 10, 10, 10, 10],
},
});
// reset interval fired here and stats should have cleared
expect(metrics[11]).toEqual({
key: 'task_claim',
value: {
success: 1,
total: 1,
total_errors: 0,
duration: { counts: [1], values: [100] },
duration_values: [10],
},
});
resolve();
});

// reset$ event at 10 seconds
clock.tick(10);
reset$.next(true);
for (const event of events1) {
events$.next(event);
}
// metrics reset event but counts should not reset
clock.tick(40);
for (const event of events2) {
events$.next(event);
}
// metric reset event should clear
clock.tick(50);
events$.next(taskClaimSuccessEvent);

clock.restore();
});
});
});

describe('with TaskRunMetricsAggregator', () => {
Expand Down
31 changes: 27 additions & 4 deletions x-pack/plugins/task_manager/server/metrics/create_aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,31 @@ export function createAggregator<T extends JsonValue>({
metricsAggregator,
}: CreateMetricsAggregatorOpts<T>): AggregatedStatProvider<T> {
if (reset$) {
let lastResetTime: Date = new Date();
// Resets the aggregators either when the reset interval has passed or
// a reset$ event is received
merge(
interval(config.metrics_reset_interval).pipe(map(() => true)),
reset$.pipe(map(() => true))
).subscribe(() => {
metricsAggregator.reset();
interval(config.metrics_reset_interval).pipe(
map(() => {
if (intervalHasPassedSince(lastResetTime, config.metrics_reset_interval)) {
lastResetTime = new Date();
return true;
}

return false;
})
),
reset$.pipe(
map((value: boolean) => {
// keep track of the last time we reset due to collection
lastResetTime = new Date();
return true;
})
)
).subscribe((shouldReset: boolean) => {
if (shouldReset) {
metricsAggregator.reset();
}
});
}

Expand All @@ -57,3 +75,8 @@ export function createAggregator<T extends JsonValue>({
})
);
}

function intervalHasPassedSince(date: Date, intervalInMs: number) {
const now = new Date().valueOf();
return now - date.valueOf() > intervalInMs;
}

0 comments on commit 70b962e

Please sign in to comment.