Skip to content

Commit

Permalink
[ML] Do not count interim buckets towards the total bucket count (#91288
Browse files Browse the repository at this point in the history
) (#91341)
  • Loading branch information
davidkyle committed Nov 7, 2022
1 parent 37f1518 commit 73dd560
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 12 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/91288.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 91288
summary: Interim buckets should not count towards the total bucket count
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,16 @@ public void testProcessResults_ModelSnapshot() throws Exception {
}

public void testProcessResults_TimingStats() throws Exception {
ResultsBuilder resultsBuilder = new ResultsBuilder().addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000));
ResultsBuilder resultsBuilder = new ResultsBuilder().addBucket(createBucket(false, 100))
.addBucket(createBucket(false, 1000))
.addBucket(createBucket(false, 100))
.addBucket(createBucket(false, 1000))
.addBucket(createBucket(false, 100))
.addBucket(createBucket(false, 1000))
.addBucket(createBucket(false, 100))
.addBucket(createBucket(false, 1000))
.addBucket(createBucket(false, 100))
.addBucket(createBucket(false, 1000));
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process();
Expand All @@ -338,6 +338,24 @@ public void testProcessResults_TimingStats() throws Exception {
assertThat(timingStats.getExponentialAvgBucketProcessingTimeMs(), closeTo(143.244, 1e-3));
}

public void testProcessResults_InterimResultsDoNotChangeTimingStats() throws Exception {
ResultsBuilder resultsBuilder = new ResultsBuilder().addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(false, 10000));
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process();
resultProcessor.awaitCompletion();

TimingStats timingStats = resultProcessor.timingStats();
assertThat(timingStats.getBucketCount(), equalTo(1L));
assertThat(timingStats.getMinBucketProcessingTimeMs(), equalTo(10000.0));
assertThat(timingStats.getMaxBucketProcessingTimeMs(), equalTo(10000.0));
assertThat(timingStats.getAvgBucketProcessingTimeMs(), equalTo(10000.0));
assertThat(timingStats.getExponentialAvgBucketProcessingTimeMs(), closeTo(10000.0, 1e-3));
}

public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception {
when(renormalizer.isEnabled()).thenReturn(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,14 @@ void processResult(AutodetectResult result) {
deleteInterimRequired = false;
}

if (bucket.isInterim() == false) {
timingStatsReporter.reportBucket(bucket);
++currentRunBucketCount;
}
// persist after deleting interim results in case the new
// results are also interim
timingStatsReporter.reportBucket(bucket);
bulkResultsPersister.persistBucket(bucket).executeRequest();
bulkAnnotationsPersister.executeRequest();
++currentRunBucketCount;
}
List<AnomalyRecord> records = result.getRecords();
if (records != null && records.isEmpty() == false) {
Expand Down Expand Up @@ -583,4 +585,10 @@ private boolean isAlive() {
void setDeleteInterimRequired(boolean deleteInterimRequired) {
this.deleteInterimRequired = deleteInterimRequired;
}

// For testing only.
// Reading currentRunBucketCount is not thread safe
long getCurrentRunBucketCount() {
return currentRunBucketCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public void testProcessResult_bucket_deleteInterimRequired() {
when(result.getBucket()).thenReturn(bucket);

processorUnderTest.processResult(result);
assertEquals(1L, processorUnderTest.getCurrentRunBucketCount());
assertFalse(processorUnderTest.isDeleteInterimRequired());

verify(bulkResultsPersister).persistTimingStats(any(TimingStats.class));
Expand All @@ -187,6 +188,24 @@ public void testProcessResult_bucket_deleteInterimRequired() {
verify(persister).deleteInterimResults(JOB_ID);
}

public void testProcessResult_bucket_isInterim() {
when(bulkResultsPersister.persistTimingStats(any(TimingStats.class))).thenReturn(bulkResultsPersister);
when(bulkResultsPersister.persistBucket(any(Bucket.class))).thenReturn(bulkResultsPersister);
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = new Bucket(JOB_ID, new Date(), BUCKET_SPAN_MS);
bucket.setInterim(true);
when(result.getBucket()).thenReturn(bucket);

processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
assertEquals(0L, processorUnderTest.getCurrentRunBucketCount());

verify(bulkResultsPersister, never()).persistTimingStats(any(TimingStats.class));
verify(bulkResultsPersister).persistBucket(bucket);
verify(bulkResultsPersister).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}

public void testProcessResult_records() {
AutodetectResult result = mock(AutodetectResult.class);
List<AnomalyRecord> records = Arrays.asList(
Expand Down

0 comments on commit 73dd560

Please sign in to comment.