Skip to content

Commit

Permalink
[ML] Add audit warning for 1000 categories found early in job (#51146)
Browse files Browse the repository at this point in the history
If 1000 different category definitions are created for a job in
the first 100 buckets it processes then an audit warning will now
be created.  (This will cause a yellow warning triangle in the
ML UI's jobs list.)

Such a large number of categories suggests that the field that
categorization is working on is not well suited to the ML
categorization functionality.
  • Loading branch information
droberts195 committed Jan 17, 2020
1 parent 0c3bf45 commit 5f12fc1
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public final class Messages {
"Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." +
" This suggests an inappropriate categorization_field_name has been chosen.";

public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class AutodetectResultProcessor {

private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);

static final long EARLY_BUCKET_THRESHOLD = 100;
static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000;

private final Client client;
private final AnomalyDetectionAuditor auditor;
private final String jobId;
Expand All @@ -87,7 +90,9 @@ public class AutodetectResultProcessor {
private final FlushListener flushListener;
private volatile boolean processKilled;
private volatile boolean failed;
private int bucketCount; // only used from the process() thread, so doesn't need to be volatile
private long priorRunsBucketCount;
private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile
private final JobResultsPersister.Builder bulkResultsPersister;
private boolean deleteInterimRequired;

Expand Down Expand Up @@ -122,6 +127,7 @@ public AutodetectResultProcessor(Client client,
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
this.deleteInterimRequired = true;
this.priorRunsBucketCount = timingStats.getBucketCount();
}

public void process() {
Expand All @@ -140,7 +146,7 @@ public void process() {
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
}
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, currentRunBucketCount);

} catch (Exception e) {
failed = true;
Expand All @@ -166,15 +172,15 @@ public void process() {
}

private void readResults() {
bucketCount = 0;
currentRunBucketCount = 0;
try {
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(result);
if (result.getBucket() != null) {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
}
} catch (Exception e) {
if (isAlive() == false) {
Expand Down Expand Up @@ -212,7 +218,7 @@ void processResult(AutodetectResult result) {
// results are also interim
timingStatsReporter.reportBucket(bucket);
bulkResultsPersister.persistBucket(bucket).executeRequest();
++bucketCount;
++currentRunBucketCount;
}
List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) {
Expand All @@ -224,7 +230,7 @@ void processResult(AutodetectResult result) {
}
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
if (categoryDefinition != null) {
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
processCategoryDefinition(categoryDefinition);
}
ModelPlot modelPlot = result.getModelPlot();
if (modelPlot != null) {
Expand Down Expand Up @@ -308,6 +314,22 @@ void processResult(AutodetectResult result) {
}
}

private void processCategoryDefinition(CategoryDefinition categoryDefinition) {
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT &&
priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD &&
excessiveCategoryWarningIssued == false) {
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT,
// Add 1 because category definitions are written before buckets
1L + priorRunsBucketCount + currentRunBucketCount));
// This flag won't be retained if the job is closed and reopened, or if the job migrates to another node.
// This means it's possible the audit message is generated multiple times. However, that's not a
// disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers
// more than 100 buckets.
excessiveCategoryWarningIssued = true;
}
}

private void processModelSizeStats(ModelSizeStats modelSizeStats) {
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testProcess() throws TimeoutException {
verify(persister).commitStateWrites(JOB_ID);
}

public void testProcessResult_bucket() throws Exception {
public void testProcessResult_bucket() {
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutodetectResult result = mock(AutodetectResult.class);
Expand All @@ -150,7 +150,7 @@ public void testProcessResult_bucket() throws Exception {
verify(persister, never()).deleteInterimResults(JOB_ID);
}

public void testProcessResult_bucket_deleteInterimRequired() throws Exception {
public void testProcessResult_bucket_deleteInterimRequired() {
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutodetectResult result = mock(AutodetectResult.class);
Expand All @@ -167,7 +167,7 @@ public void testProcessResult_bucket_deleteInterimRequired() throws Exception {
verify(persister).deleteInterimResults(JOB_ID);
}

public void testProcessResult_records() throws Exception {
public void testProcessResult_records() {
AutodetectResult result = mock(AutodetectResult.class);
List<AnomalyRecord> records =
Arrays.asList(
Expand All @@ -183,7 +183,7 @@ public void testProcessResult_records() throws Exception {
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
}

public void testProcessResult_influencers() throws Exception {
public void testProcessResult_influencers() {
AutodetectResult result = mock(AutodetectResult.class);
List<Influencer> influencers =
Arrays.asList(
Expand All @@ -199,9 +199,10 @@ public void testProcessResult_influencers() throws Exception {
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
}

public void testProcessResult_categoryDefinition() throws Exception {
public void testProcessResult_categoryDefinition() {
AutodetectResult result = mock(AutodetectResult.class);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
when(categoryDefinition.getCategoryId()).thenReturn(1L);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);

processorUnderTest.setDeleteInterimRequired(false);
Expand All @@ -212,7 +213,66 @@ public void testProcessResult_categoryDefinition() throws Exception {
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
}

public void testProcessResult_flushAcknowledgement() throws Exception {
public void testProcessResult_excessiveCategoryDefinitionCountEarly() {
int iterations = 3;
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;

processorUnderTest.setDeleteInterimRequired(false);

AutodetectResult result = mock(AutodetectResult.class);
for (int iteration = 1; iteration <= iterations; ++iteration) {
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
categoryDefinition.setCategoryId(categoryId);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);

processorUnderTest.processResult(result);
}
}

verify(bulkBuilder, never()).executeRequest();
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES,
AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1)));
}

public void testProcessResult_highCategoryDefinitionCountLateOn() {
int iterations = 3;
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;

processorUnderTest.setDeleteInterimRequired(false);

when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);

AutodetectResult bucketResult = mock(AutodetectResult.class);
final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1;
for (int i = 0; i < numPriorBuckets; ++i) {
Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS);
when(bucketResult.getBucket()).thenReturn(bucket);
processorUnderTest.processResult(bucketResult);
}

AutodetectResult categoryResult = mock(AutodetectResult.class);
for (int iteration = 1; iteration <= iterations; ++iteration) {
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
categoryDefinition.setCategoryId(categoryId);
when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition);
processorUnderTest.processResult(categoryResult);
}
}

verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class));
verify(bulkBuilder, times(numPriorBuckets)).executeRequest();
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(auditor, never()).warning(eq(JOB_ID), anyString());
}

public void testProcessResult_flushAcknowledgement() {
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
Expand All @@ -228,12 +288,13 @@ public void testProcessResult_flushAcknowledgement() throws Exception {
verify(bulkBuilder).executeRequest();
}

public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws Exception {
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
when(categoryDefinition.getCategoryId()).thenReturn(1L);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);

processorUnderTest.setDeleteInterimRequired(false);
Expand All @@ -248,7 +309,7 @@ public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws E
inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
}

public void testProcessResult_modelPlot() throws Exception {
public void testProcessResult_modelPlot() {
AutodetectResult result = mock(AutodetectResult.class);
ModelPlot modelPlot = mock(ModelPlot.class);
when(result.getModelPlot()).thenReturn(modelPlot);
Expand All @@ -260,7 +321,7 @@ public void testProcessResult_modelPlot() throws Exception {
verify(bulkBuilder).persistModelPlot(modelPlot);
}

public void testProcessResult_modelSizeStats() throws Exception {
public void testProcessResult_modelSizeStats() {
AutodetectResult result = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
Expand All @@ -273,7 +334,7 @@ public void testProcessResult_modelSizeStats() throws Exception {
verify(persister).persistModelSizeStats(eq(modelSizeStats), any());
}

public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exception {
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
TimeValue delay = TimeValue.timeValueSeconds(5);
// Set up schedule delay time
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
Expand Down Expand Up @@ -313,7 +374,7 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exc
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
}

public void testProcessResult_modelSnapshot() throws Exception {
public void testProcessResult_modelSnapshot() {
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
.setSnapshotId("a_snapshot_id")
Expand All @@ -337,7 +398,7 @@ public void testProcessResult_modelSnapshot() throws Exception {
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
}

public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws Exception {
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
Expand All @@ -354,7 +415,7 @@ public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws E
verify(renormalizer).renormalize(quantiles);
}

public void testProcessResult_quantiles_givenRenormalizationIsDisabled() throws Exception {
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
Expand Down

0 comments on commit 5f12fc1

Please sign in to comment.