diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java index 5badd073c66c7..135ad755359e6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java @@ -64,7 +64,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable { private static final ParseField OVERLAPPING_BUCKETS = new ParseField("overlapping_buckets"); private static final ParseField RESULT_FINALIZATION_WINDOW = new ParseField("result_finalization_window"); private static final ParseField MULTIVARIATE_BY_FIELDS = new ParseField("multivariate_by_fields"); - private static final ParseField USER_PER_PARTITION_NORMALIZATION = new ParseField("use_per_partition_normalization"); public static final String ML_CATEGORY_FIELD = "mlcategory"; public static final Set AUTO_CREATED_FIELDS = new HashSet<>(Collections.singletonList(ML_CATEGORY_FIELD)); @@ -98,7 +97,6 @@ private static ConstructingObjectParser createPars parser.declareBoolean(Builder::setOverlappingBuckets, OVERLAPPING_BUCKETS); parser.declareLong(Builder::setResultFinalizationWindow, RESULT_FINALIZATION_WINDOW); parser.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS); - parser.declareBoolean(Builder::setUsePerPartitionNormalization, USER_PER_PARTITION_NORMALIZATION); return parser; } @@ -117,12 +115,11 @@ private static ConstructingObjectParser createPars private final Boolean overlappingBuckets; private final Long resultFinalizationWindow; private final Boolean multivariateByFields; - private final boolean usePerPartitionNormalization; private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List categorizationFilters, CategorizationAnalyzerConfig categorizationAnalyzerConfig, TimeValue latency, String summaryCountFieldName, List detectors, List influencers, Boolean overlappingBuckets, Long resultFinalizationWindow, - Boolean multivariateByFields, boolean usePerPartitionNormalization) { + Boolean multivariateByFields) { this.detectors = detectors; this.bucketSpan = bucketSpan; this.latency = latency; @@ -134,7 +131,6 @@ private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, Lis this.overlappingBuckets = overlappingBuckets; this.resultFinalizationWindow = resultFinalizationWindow; this.multivariateByFields = multivariateByFields; - this.usePerPartitionNormalization = usePerPartitionNormalization; } public AnalysisConfig(StreamInput in) throws IOException { @@ -165,7 +161,12 @@ public AnalysisConfig(StreamInput in) throws IOException { } } - usePerPartitionNormalization = in.readBoolean(); + // BWC for removed per-partition normalization + // Version check is temporarily against the latest to satisfy CI tests + // TODO change to V_6_5_0 after successful backport to 6.x + if (in.getVersion().before(Version.V_7_0_0_alpha1)) { + in.readBoolean(); + } } @Override @@ -195,7 +196,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } - out.writeBoolean(usePerPartitionNormalization); + // BWC for removed per-partition normalization + // Version check is temporarily against the latest to satisfy CI tests + // TODO change to V_6_5_0 after successful backport to 6.x + if (out.getVersion().before(Version.V_7_0_0_alpha1)) { + out.writeBoolean(false); + } } /** @@ -299,10 +305,6 @@ public Boolean getMultivariateByFields() { return multivariateByFields; } - public boolean getUsePerPartitionNormalization() { - return usePerPartitionNormalization; - } - /** * Return the set of fields required by the analysis. * These are the influencer fields, metric field, partition field, @@ -403,9 +405,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (multivariateByFields != null) { builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields); } - if (usePerPartitionNormalization) { - builder.field(USER_PER_PARTITION_NORMALIZATION.getPreferredName(), usePerPartitionNormalization); - } builder.endObject(); return builder; } @@ -416,7 +415,6 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; AnalysisConfig that = (AnalysisConfig) o; return Objects.equals(latency, that.latency) && - usePerPartitionNormalization == that.usePerPartitionNormalization && Objects.equals(bucketSpan, that.bucketSpan) && Objects.equals(categorizationFieldName, that.categorizationFieldName) && Objects.equals(categorizationFilters, that.categorizationFilters) && @@ -434,7 +432,7 @@ public int hashCode() { return Objects.hash( bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency, summaryCountFieldName, detectors, influencers, overlappingBuckets, resultFinalizationWindow, - multivariateByFields, usePerPartitionNormalization + multivariateByFields ); } @@ -453,7 +451,6 @@ public static class Builder { private Boolean overlappingBuckets; private Long resultFinalizationWindow; private Boolean multivariateByFields; - private boolean usePerPartitionNormalization = false; public Builder(List detectors) { setDetectors(detectors); @@ -472,7 +469,6 @@ public Builder(AnalysisConfig analysisConfig) { this.overlappingBuckets = analysisConfig.overlappingBuckets; this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow; this.multivariateByFields = analysisConfig.multivariateByFields; - this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization; } public void setDetectors(List detectors) { @@ -535,10 +531,6 @@ public void setMultivariateByFields(Boolean multivariateByFields) { this.multivariateByFields = multivariateByFields; } - public void setUsePerPartitionNormalization(boolean usePerPartitionNormalization) { - this.usePerPartitionNormalization = usePerPartitionNormalization; - } - /** * Checks the configuration is valid *
    @@ -571,16 +563,11 @@ public AnalysisConfig build() { overlappingBuckets = verifyOverlappingBucketsConfig(overlappingBuckets, detectors); - if (usePerPartitionNormalization) { - checkDetectorsHavePartitionFields(detectors); - checkNoInfluencersAreSet(influencers); - } - verifyNoInconsistentNestedFieldNames(); return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency, summaryCountFieldName, detectors, influencers, overlappingBuckets, - resultFinalizationWindow, multivariateByFields, usePerPartitionNormalization); + resultFinalizationWindow, multivariateByFields); } private void verifyNoMetricFunctionsWhenSummaryCountFieldNameIsSet() { @@ -704,23 +691,6 @@ private void verifyCategorizationFiltersAreValidRegex() { } } - private static void checkDetectorsHavePartitionFields(List detectors) { - for (Detector detector : detectors) { - if (!Strings.isNullOrEmpty(detector.getPartitionFieldName())) { - return; - } - } - throw ExceptionsHelper.badRequestException(Messages.getMessage( - Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD)); - } - - private static void checkNoInfluencersAreSet(List influencers) { - if (!influencers.isEmpty()) { - throw ExceptionsHelper.badRequestException(Messages.getMessage( - Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS)); - } - } - private static boolean isValidRegex(String exp) { try { Pattern.compile(exp); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 289839b9342bc..7411115bda358 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -130,10 +130,6 @@ public final class Messages { "over_field_name cannot be used with function ''{0}''"; public static final String JOB_CONFIG_OVERLAPPING_BUCKETS_INCOMPATIBLE_FUNCTION = "Overlapping buckets cannot be used with function ''{0}''"; - public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS = - "A job configured with Per-Partition Normalization cannot use influencers"; - public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD = - "If the job is configured with Per-Partition Normalization enabled a detector must have a partition field"; public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "Unknown function ''{0}''"; public static final String JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED = "Invalid update value for analysis_limits: model_memory_limit cannot be decreased below current usage; " + diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index bc420c658d0b5..316417f4b23aa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -227,23 +227,6 @@ private static void addResultsMapping(XContentBuilder builder) throws IOExceptio .startObject(Bucket.SCHEDULED_EVENTS.getPreferredName()) .field(TYPE, KEYWORD) .endObject() - .startObject(Bucket.PARTITION_SCORES.getPreferredName()) - .field(TYPE, NESTED) - .startObject(PROPERTIES) - .startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName()) - .field(TYPE, KEYWORD) - .endObject() - .startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName()) - .field(TYPE, KEYWORD) - .endObject() - .startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName()) - .field(TYPE, DOUBLE) - .endObject() - .startObject(AnomalyRecord.PROBABILITY.getPreferredName()) - .field(TYPE, DOUBLE) - .endObject() - .endObject() - .endObject() .startObject(Bucket.BUCKET_INFLUENCERS.getPreferredName()) .field(TYPE, NESTED) @@ -328,7 +311,7 @@ private static void addTermFields(XContentBuilder builder, Collection te } private static void addForecastFieldsToMapping(XContentBuilder builder) throws IOException { - + // Forecast Output builder.startObject(Forecast.FORECAST_LOWER.getPreferredName()) .field(TYPE, DOUBLE) @@ -370,7 +353,7 @@ private static void addForecastFieldsToMapping(XContentBuilder builder) throws I .field(TYPE, LONG) .endObject(); } - + /** * AnomalyRecord fields to be added under the 'properties' section of the mapping * @param builder Add properties to this builder diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/Bucket.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/Bucket.java index 8a88232a559d4..8a7fe2395b4e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/Bucket.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/Bucket.java @@ -25,7 +25,6 @@ import java.util.Date; import java.util.List; import java.util.Objects; -import java.util.Optional; /** * Bucket Result POJO @@ -43,7 +42,6 @@ public class Bucket implements ToXContentObject, Writeable { public static final ParseField BUCKET_INFLUENCERS = new ParseField("bucket_influencers"); public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms"); - public static final ParseField PARTITION_SCORES = new ParseField("partition_scores"); public static final ParseField SCHEDULED_EVENTS = new ParseField("scheduled_events"); // Used for QueryPage @@ -58,6 +56,19 @@ public class Bucket implements ToXContentObject, Writeable { public static final ConstructingObjectParser STRICT_PARSER = createParser(false); public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); + /* * + * Read and discard the old (prior to 6.5) perPartitionNormalization values + */ + public static Bucket readOldPerPartitionNormalization(StreamInput in) throws IOException { + in.readString(); + in.readString(); + in.readDouble(); + in.readDouble(); + in.readDouble(); + + return null; + } + private static ConstructingObjectParser createParser(boolean ignoreUnknownFields) { ConstructingObjectParser parser = new ConstructingObjectParser<>(RESULT_TYPE_VALUE, ignoreUnknownFields, a -> new Bucket((String) a[0], (Date) a[1], (long) a[2])); @@ -82,8 +93,6 @@ private static ConstructingObjectParser createParser(boolean ignor parser.declareObjectArray(Bucket::setBucketInfluencers, ignoreUnknownFields ? BucketInfluencer.LENIENT_PARSER : BucketInfluencer.STRICT_PARSER, BUCKET_INFLUENCERS); parser.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS); - parser.declareObjectArray(Bucket::setPartitionScores, ignoreUnknownFields ? - PartitionScore.LENIENT_PARSER : PartitionScore.STRICT_PARSER, PARTITION_SCORES); parser.declareString((bucket, s) -> {}, Result.RESULT_TYPE); parser.declareStringArray(Bucket::setScheduledEvents, SCHEDULED_EVENTS); @@ -100,7 +109,6 @@ private static ConstructingObjectParser createParser(boolean ignor private boolean isInterim; private List bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private long processingTimeMs; - private List partitionScores = Collections.emptyList(); private List scheduledEvents = Collections.emptyList(); public Bucket(String jobId, Date timestamp, long bucketSpan) { @@ -120,7 +128,6 @@ public Bucket(Bucket other) { this.isInterim = other.isInterim; this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); this.processingTimeMs = other.processingTimeMs; - this.partitionScores = new ArrayList<>(other.partitionScores); this.scheduledEvents = new ArrayList<>(other.scheduledEvents); } @@ -143,7 +150,10 @@ public Bucket(StreamInput in) throws IOException { if (in.getVersion().before(Version.V_5_5_0)) { in.readGenericValue(); } - partitionScores = in.readList(PartitionScore::new); + // bwc for perPartitionNormalization + if (in.getVersion().before(Version.V_6_5_0)) { + in.readList(Bucket::readOldPerPartitionNormalization); + } if (in.getVersion().onOrAfter(Version.V_6_2_0)) { scheduledEvents = in.readList(StreamInput::readString); if (scheduledEvents.isEmpty()) { @@ -174,7 +184,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_5_5_0)) { out.writeGenericValue(Collections.emptyMap()); } - out.writeList(partitionScores); + // bwc for perPartitionNormalization + if (out.getVersion().before(Version.V_6_5_0)) { + out.writeList(Collections.emptyList()); + } if (out.getVersion().onOrAfter(Version.V_6_2_0)) { out.writeStringList(scheduledEvents); } @@ -195,9 +208,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Result.IS_INTERIM.getPreferredName(), isInterim); builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers); builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs); - if (partitionScores.isEmpty() == false) { - builder.field(PARTITION_SCORES.getPreferredName(), partitionScores); - } + if (scheduledEvents.isEmpty() == false) { builder.field(SCHEDULED_EVENTS.getPreferredName(), scheduledEvents); } @@ -304,14 +315,6 @@ public void addBucketInfluencer(BucketInfluencer bucketInfluencer) { bucketInfluencers.add(bucketInfluencer); } - public List getPartitionScores() { - return partitionScores; - } - - public void setPartitionScores(List scores) { - partitionScores = Objects.requireNonNull(scores); - } - public List getScheduledEvents() { return scheduledEvents; } @@ -320,24 +323,10 @@ public void setScheduledEvents(List scheduledEvents) { this.scheduledEvents = ExceptionsHelper.requireNonNull(scheduledEvents, SCHEDULED_EVENTS.getPreferredName()); } - public double partitionInitialAnomalyScore(String partitionValue) { - Optional first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue())) - .findFirst(); - - return first.isPresent() ? first.get().getInitialRecordScore() : 0.0; - } - - public double partitionAnomalyScore(String partitionValue) { - Optional first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue())) - .findFirst(); - - return first.isPresent() ? first.get().getRecordScore() : 0.0; - } - @Override public int hashCode() { return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, records, - isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs, scheduledEvents); + isInterim, bucketSpan, bucketInfluencers, processingTimeMs, scheduledEvents); } /** @@ -360,7 +349,6 @@ public boolean equals(Object other) { && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) && Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim) && Objects.equals(this.bucketInfluencers, that.bucketInfluencers) - && Objects.equals(this.partitionScores, that.partitionScores) && (this.processingTimeMs == that.processingTimeMs) && Objects.equals(this.scheduledEvents, that.scheduledEvents); } @@ -374,6 +362,6 @@ public boolean equals(Object other) { * @return true if the bucket should be normalized or false otherwise */ public boolean isNormalizable() { - return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0); + return anomalyScore > 0.0; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/PartitionScore.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/PartitionScore.java deleted file mode 100644 index 3d0acc8fde667..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/PartitionScore.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.job.results; - -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; - -import java.io.IOException; -import java.util.Objects; - -public class PartitionScore implements ToXContentObject, Writeable { - public static final ParseField PARTITION_SCORE = new ParseField("partition_score"); - - private final String partitionFieldValue; - private final String partitionFieldName; - private final double initialRecordScore; - private double recordScore; - private double probability; - - public static final ConstructingObjectParser STRICT_PARSER = createParser(false); - public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); - - private static ConstructingObjectParser createParser(boolean ignoreUnknownFields) { - ConstructingObjectParser parser = new ConstructingObjectParser<>(PARTITION_SCORE.getPreferredName(), - ignoreUnknownFields, a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3], (Double) a[4])); - - parser.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_NAME); - parser.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_VALUE); - parser.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.INITIAL_RECORD_SCORE); - parser.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.RECORD_SCORE); - parser.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.PROBABILITY); - - return parser; - } - - public PartitionScore(String fieldName, String fieldValue, double initialRecordScore, double recordScore, double probability) { - partitionFieldName = fieldName; - partitionFieldValue = fieldValue; - this.initialRecordScore = initialRecordScore; - this.recordScore = recordScore; - this.probability = probability; - } - - public PartitionScore(StreamInput in) throws IOException { - partitionFieldName = in.readString(); - partitionFieldValue = in.readString(); - initialRecordScore = in.readDouble(); - recordScore = in.readDouble(); - probability = in.readDouble(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(partitionFieldName); - out.writeString(partitionFieldValue); - out.writeDouble(initialRecordScore); - out.writeDouble(recordScore); - out.writeDouble(probability); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName); - builder.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue); - builder.field(AnomalyRecord.INITIAL_RECORD_SCORE.getPreferredName(), initialRecordScore); - builder.field(AnomalyRecord.RECORD_SCORE.getPreferredName(), recordScore); - builder.field(AnomalyRecord.PROBABILITY.getPreferredName(), probability); - builder.endObject(); - return builder; - } - - public double getInitialRecordScore() { - return initialRecordScore; - } - - public double getRecordScore() { - return recordScore; - } - - public void setRecordScore(double recordScore) { - this.recordScore = recordScore; - } - - public String getPartitionFieldName() { - return partitionFieldName; - } - - public String getPartitionFieldValue() { - return partitionFieldValue; - } - - public double getProbability() { - return probability; - } - - public void setProbability(double probability) { - this.probability = probability; - } - - @Override - public int hashCode() { - return Objects.hash(partitionFieldName, partitionFieldValue, probability, initialRecordScore, recordScore); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof PartitionScore == false) { - return false; - } - - PartitionScore that = (PartitionScore) other; - - // id is excluded from the test as it is generated by the datastore - return Objects.equals(this.partitionFieldValue, that.partitionFieldValue) - && Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability) - && (this.initialRecordScore == that.initialRecordScore) && (this.recordScore == that.recordScore); - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index fb9a697ac4644..63c4278e541d4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -81,7 +81,6 @@ public final class ReservedFieldNames { Bucket.EVENT_COUNT.getPreferredName(), Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(), Bucket.PROCESSING_TIME_MS.getPreferredName(), - Bucket.PARTITION_SCORES.getPreferredName(), Bucket.SCHEDULED_EVENTS.getPreferredName(), BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetBucketActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetBucketActionResponseTests.java index 4fbb7a9249641..ffd21834e583c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetBucketActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetBucketActionResponseTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; -import org.elasticsearch.xpack.core.ml.job.results.PartitionScore; import java.util.ArrayList; import java.util.Collections; @@ -53,15 +52,6 @@ protected Response createTestInstance() { if (randomBoolean()) { bucket.setInterim(randomBoolean()); } - if (randomBoolean()) { - int size = randomInt(10); - List partitionScores = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - partitionScores.add(new PartitionScore(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20), - randomDouble(), randomDouble(), randomDouble())); - } - bucket.setPartitionScores(partitionScores); - } if (randomBoolean()) { bucket.setProcessingTimeMs(randomLong()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java index 50c60a31427d6..d691124a90a43 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java @@ -97,11 +97,8 @@ public static AnalysisConfig.Builder createRandomized() { builder.setResultFinalizationWindow(randomNonNegativeLong()); } - boolean usePerPartitionNormalisation = randomBoolean(); - builder.setUsePerPartitionNormalization(usePerPartitionNormalisation); - if (!usePerPartitionNormalisation) { // influencers can't be used with per partition normalisation - builder.setInfluencers(Arrays.asList(generateRandomStringArray(10, 10, false))); - } + builder.setInfluencers(Arrays.asList(generateRandomStringArray(10, 10, false))); + return builder; } @@ -690,40 +687,15 @@ public void testVerify_GivenEmptyCategorizationFilter() { assertEquals(Messages.getMessage(Messages.JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY), e.getMessage()); } - - public void testCheckDetectorsHavePartitionFields() { - AnalysisConfig.Builder config = createValidConfig(); - config.setUsePerPartitionNormalization(true); - - ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, config::build); - - assertEquals(Messages.getMessage(Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD), e.getMessage()); - } - public void testCheckDetectorsHavePartitionFields_doesntThrowWhenValid() { AnalysisConfig.Builder config = createValidConfig(); Detector.Builder builder = new Detector.Builder(config.build().getDetectors().get(0)); builder.setPartitionFieldName("pField"); config.build().getDetectors().set(0, builder.build()); - config.setUsePerPartitionNormalization(true); config.build(); } - public void testCheckNoInfluencersAreSet() { - - AnalysisConfig.Builder config = createValidConfig(); - Detector.Builder builder = new Detector.Builder(config.build().getDetectors().get(0)); - builder.setPartitionFieldName("pField"); - config.build().getDetectors().set(0, builder.build()); - config.setInfluencers(Arrays.asList("inf1", "inf2")); - config.setUsePerPartitionNormalization(true); - - ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, config::build); - - assertEquals(Messages.getMessage(Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS), e.getMessage()); - } - public void testVerify_GivenCategorizationFiltersContainInvalidRegex() { AnalysisConfig.Builder config = createValidCategorizationConfig(); config.setCategorizationFilters(Arrays.asList("foo", "(")); @@ -756,7 +728,7 @@ private static AnalysisConfig.Builder createValidCategorizationConfig() { @Override protected AnalysisConfig mutateInstance(AnalysisConfig instance) { AnalysisConfig.Builder builder = new AnalysisConfig.Builder(instance); - switch (between(0, 11)) { + switch (between(0, 10)) { case 0: List detectors = new ArrayList<>(instance.getDetectors()); Detector.Builder detector = new Detector.Builder(); @@ -832,7 +804,6 @@ protected AnalysisConfig mutateInstance(AnalysisConfig instance) { List influencers = new ArrayList<>(instance.getInfluencers()); influencers.add(randomAlphaOfLengthBetween(5, 10)); builder.setInfluencers(influencers); - builder.setUsePerPartitionNormalization(false); break; case 8: if (instance.getOverlappingBuckets() == null) { @@ -855,13 +826,6 @@ protected AnalysisConfig mutateInstance(AnalysisConfig instance) { builder.setMultivariateByFields(instance.getMultivariateByFields() == false); } break; - case 11: - boolean usePerPartitionNormalization = instance.getUsePerPartitionNormalization() == false; - builder.setUsePerPartitionNormalization(usePerPartitionNormalization); - if (usePerPartitionNormalization) { - builder.setInfluencers(Collections.emptyList()); - } - break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f2fdea45ebe24..af7cb4242f1e0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -384,8 +384,8 @@ public Collection createComponents(Client client, ClusterService cluster autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) -> new BlackHoleAutodetectProcess(job.getId()); // factor of 1.0 makes renormalization a no-op - normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization, - executorService) -> new MultiplyingNormalizerProcess(settings, 1.0); + normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> + new MultiplyingNormalizerProcess(settings, 1.0); } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 200cb08512572..0094eba97cecb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -61,7 +61,6 @@ public class AutodetectBuilder { public static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput"; public static final String MODEL_CONFIG_ARG = "--modelconfig="; public static final String QUANTILES_STATE_PATH_ARG = "--quantilesState="; - public static final String PER_PARTITION_NORMALIZATION = "--perPartitionNormalization"; private static final String CONF_EXTENSION = ".conf"; static final String JOB_ID_ARG = "--jobid="; @@ -207,10 +206,6 @@ List buildAutodetectCommand() { if (Boolean.TRUE.equals(analysisConfig.getMultivariateByFields())) { command.add(MULTIVARIATE_BY_FIELDS_ARG); } - - if (analysisConfig.getUsePerPartitionNormalization()) { - command.add(PER_PARTITION_NORMALIZATION); - } } // Input is always length encoded diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 063ab3b49d146..fa05c2e63ee11 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -499,7 +499,7 @@ AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, - renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); + renormalizerExecutorService); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, onProcessCrash(jobTask)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java index 7ef23cb513b7f..9b54d01de5fa6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizable.java @@ -16,12 +16,10 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.BUCKET_INFLUENCER; -import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.PARTITION_SCORE; - public class BucketNormalizable extends Normalizable { - private static final List CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, PARTITION_SCORE); + private static final List CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER); private final Bucket bucket; @@ -117,11 +115,6 @@ public List getChildren(ChildType type) { .map(bi -> new BucketInfluencerNormalizable(bi, getOriginatingIndex())) .collect(Collectors.toList())); break; - case PARTITION_SCORE: - children.addAll(bucket.getPartitionScores().stream() - .map(ps -> new PartitionScoreNormalizable(ps, getOriginatingIndex())) - .collect(Collectors.toList())); - break; default: throw new IllegalArgumentException("Invalid type: " + type); } @@ -135,8 +128,6 @@ public boolean setMaxChildrenScore(ChildType childrenType, double maxScore) { double oldScore = bucket.getAnomalyScore(); bucket.setAnomalyScore(maxScore); return maxScore != oldScore; - case PARTITION_SCORE: - return false; default: throw new IllegalArgumentException("Invalid type: " + childrenType); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index c96a3b48fe1d8..60f52d3f44288 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -38,20 +38,19 @@ public NativeNormalizerProcessFactory(Environment env, Settings settings, Native @Override public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, - boolean perPartitionNormalization, ExecutorService executorService) { + ExecutorService executorService) { ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, jobId, true, false, true, true, false, false); - createNativeProcess(jobId, quantilesState, processPipes, bucketSpan, perPartitionNormalization); + createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); return new NativeNormalizerProcess(jobId, settings, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), executorService); } - private void createNativeProcess(String jobId, String quantilesState, ProcessPipes processPipes, Integer bucketSpan, - boolean perPartitionNormalization) { + private void createNativeProcess(String jobId, String quantilesState, ProcessPipes processPipes, Integer bucketSpan) { try { - List command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan, perPartitionNormalization).build(); + List command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build(); processPipes.addArgs(command); nativeController.startProcess(command); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java index 7efadf2961308..0d88372de17e4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizable.java @@ -11,7 +11,7 @@ import java.util.Objects; public abstract class Normalizable implements ToXContentObject { - public enum ChildType {BUCKET_INFLUENCER, RECORD, PARTITION_SCORE}; + public enum ChildType {BUCKET_INFLUENCER, RECORD}; private final String indexName; private boolean hadBigNormalizedUpdate; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java index 2d4e2135478f3..22e7d3ba99598 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Normalizer.java @@ -46,15 +46,14 @@ public Normalizer(String jobId, NormalizerProcessFactory processFactory, Executo * and normalizes the given results. * * @param bucketSpan If null the default is used - * @param perPartitionNormalization Is normalization per partition (rather than per job)? * @param results Will be updated with the normalized results * @param quantilesState The state to be used to seed the system change * normalizer */ - public void normalize(Integer bucketSpan, boolean perPartitionNormalization, + public void normalize(Integer bucketSpan, List results, String quantilesState) { NormalizerProcess process = processFactory.createNormalizerProcess(jobId, quantilesState, bucketSpan, - perPartitionNormalization, executorService); + executorService); NormalizerResultHandler resultsHandler = process.createNormalizedResultsHandler(); Future resultsHandlerFuture = executorService.submit(() -> { try { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilder.java index 5630a75127506..37ffd92a3d72f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilder.java @@ -29,15 +29,12 @@ public class NormalizerBuilder { private final String jobId; private final String quantilesState; private final Integer bucketSpan; - private final boolean perPartitionNormalization; - public NormalizerBuilder(Environment env, String jobId, String quantilesState, Integer bucketSpan, - boolean perPartitionNormalization) { + public NormalizerBuilder(Environment env, String jobId, String quantilesState, Integer bucketSpan) { this.env = env; this.jobId = jobId; this.quantilesState = quantilesState; this.bucketSpan = bucketSpan; - this.perPartitionNormalization = perPartitionNormalization; } /** @@ -49,9 +46,6 @@ public List build() throws IOException { command.add(NORMALIZE_PATH); addIfNotNull(bucketSpan, AutodetectBuilder.BUCKET_SPAN_ARG, command); command.add(AutodetectBuilder.LENGTH_ENCODED_INPUT_ARG); - if (perPartitionNormalization) { - command.add(AutodetectBuilder.PER_PARTITION_NORMALIZATION); - } if (quantilesState != null) { Path quantilesStateFilePath = AutodetectBuilder.writeNormalizerInitState(jobId, quantilesState, env); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcessFactory.java index bdb63b778971e..cf08190a99392 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcessFactory.java @@ -17,6 +17,5 @@ public interface NormalizerProcessFactory { * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process * @return The process */ - NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, boolean perPartitionNormalization, - ExecutorService executorService); + NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, ExecutorService executorService); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java deleted file mode 100644 index 91b2a7a505e35..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/PartitionScoreNormalizable.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.process.normalizer; - -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.xpack.core.ml.job.results.PartitionScore; - -import java.io.IOException; -import java.util.Objects; - - -public class PartitionScoreNormalizable extends AbstractLeafNormalizable { - private final PartitionScore score; - - public PartitionScoreNormalizable(PartitionScore score, String indexName) { - super(indexName); - this.score = Objects.requireNonNull(score); - } - - @Override - public String getId() { - throw new UnsupportedOperationException("PartitionScore has no ID as it should not be persisted outside of the owning bucket"); - } - - @Override - public Level getLevel() { - return Level.PARTITION; - } - - @Override - public String getPartitionFieldName() { - return score.getPartitionFieldName(); - } - - @Override - public String getPartitionFieldValue() { - return score.getPartitionFieldValue(); - } - - @Override - public String getPersonFieldName() { - return null; - } - - @Override - public String getPersonFieldValue() { - return null; - } - - @Override - public String getFunctionName() { - return null; - } - - @Override - public String getValueFieldName() { - return null; - } - - @Override - public double getProbability() { - return score.getProbability(); - } - - @Override - public double getNormalizedScore() { - return score.getRecordScore(); - } - - @Override - public void setNormalizedScore(double normalizedScore) { - score.setRecordScore(normalizedScore); - } - - @Override - public void setParentScore(double parentScore) { - // Do nothing as it is not holding the parent score. - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return score.toXContent(builder, params); - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index cfb5660c911b5..c2ef2fab7f8a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -79,12 +79,12 @@ private long getNormalizationWindowOrDefault(Job job) { * Update the anomaly score field on all previously persisted buckets * and all contained records */ - public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) { + public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs) { Normalizer normalizer = normalizerFactory.create(jobId); int[] counts = {0, 0}; - updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); - updateRecords(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); - updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization); + updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts); + updateRecords(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts); + updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts); // The updates will have been persisted in batches throughout the renormalization // process - this call just catches any leftovers @@ -94,7 +94,7 @@ public void update(String quantilesState, long endBucketEpochMs, long windowExte } private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs, - long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { + long windowExtensionMs, int[] counts) { BatchedDocumentsIterator> bucketsIterator = jobResultsProvider.newBatchedBucketsIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) @@ -114,14 +114,14 @@ private void updateBuckets(Normalizer normalizer, String quantilesState, long en if (current.result.isNormalizable()) { bucketsToRenormalize.add(new BucketNormalizable(current.result, current.index)); if (bucketsToRenormalize.size() >= TARGET_BUCKETS_TO_RENORMALIZE) { - normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization); + normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts); bucketsToRenormalize.clear(); } } } } if (!bucketsToRenormalize.isEmpty()) { - normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization); + normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts); } } @@ -130,8 +130,8 @@ private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionM } private void normalizeBuckets(Normalizer normalizer, List normalizableBuckets, - String quantilesState, int[] counts, boolean perPartitionNormalization) { - normalizer.normalize(bucketSpan, perPartitionNormalization, normalizableBuckets, quantilesState); + String quantilesState, int[] counts) { + normalizer.normalize(bucketSpan, normalizableBuckets, quantilesState); for (BucketNormalizable bucketNormalizable : normalizableBuckets) { if (bucketNormalizable.hadBigNormalizedUpdate()) { @@ -144,7 +144,7 @@ private void normalizeBuckets(Normalizer normalizer, List no } private void updateRecords(Normalizer normalizer, String quantilesState, long endBucketEpochMs, - long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { + long windowExtensionMs, int[] counts) { BatchedDocumentsIterator> recordsIterator = jobResultsProvider.newBatchedRecordsIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); @@ -160,14 +160,14 @@ private void updateRecords(Normalizer normalizer, String quantilesState, long en List asNormalizables = records.stream() .map(recordResultIndex -> new RecordNormalizable(recordResultIndex.result, recordResultIndex.index)) .collect(Collectors.toList()); - normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); + normalizer.normalize(bucketSpan, asNormalizables, quantilesState); persistChanged(counts, asNormalizables); } } private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs, - long windowExtensionMs, int[] counts, boolean perPartitionNormalization) { + long windowExtensionMs, int[] counts) { BatchedDocumentsIterator> influencersIterator = jobResultsProvider.newBatchedInfluencersIterator(jobId) .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs) .includeInterim(false); @@ -183,7 +183,7 @@ private void updateInfluencers(Normalizer normalizer, String quantilesState, lon List asNormalizables = influencers.stream() .map(influencerResultIndex -> new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index)) .collect(Collectors.toList()); - normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState); + normalizer.normalize(bucketSpan, asNormalizables, quantilesState); persistChanged(counts, asNormalizables); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java index 057ed18cd696f..7db66387db8e0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java @@ -26,7 +26,6 @@ public class ShortCircuitingRenormalizer implements Renormalizer { private final String jobId; private final ScoresUpdater scoresUpdater; private final ExecutorService executorService; - private final boolean isPerPartitionNormalization; private final Deque quantilesDeque = new ConcurrentLinkedDeque<>(); private final Deque latchDeque = new ConcurrentLinkedDeque<>(); /** @@ -34,12 +33,10 @@ public class ShortCircuitingRenormalizer implements Renormalizer { */ private final Semaphore semaphore = new Semaphore(1); - public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService, - boolean isPerPartitionNormalization) { + public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService) { this.jobId = jobId; this.scoresUpdater = scoresUpdater; this.executorService = executorService; - this.isPerPartitionNormalization = isPerPartitionNormalization; } @Override @@ -161,8 +158,7 @@ private void doRenormalizations() { jobId, latestBucketTimeMs, earliestBucketTimeMs); windowExtensionMs = 0; } - scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs, - isPerPartitionNormalization); + scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs); latch.countDown(); latch = null; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java index 0f83106441185..325ad52864bfa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java @@ -56,7 +56,6 @@ public void testBuildAutodetectCommand() { acBuilder.setSummaryCountFieldName("summaryField"); acBuilder.setOverlappingBuckets(true); acBuilder.setMultivariateByFields(true); - acBuilder.setUsePerPartitionNormalization(true); job.setAnalysisConfig(acBuilder); DataDescription.Builder dd = new DataDescription.Builder(); @@ -66,7 +65,7 @@ public void testBuildAutodetectCommand() { job.setDataDescription(dd); List command = autodetectBuilder(job.build()).buildAutodetectCommand(); - assertEquals(13, command.size()); + assertEquals(12, command.size()); assertTrue(command.contains(AutodetectBuilder.AUTODETECT_PATH)); assertTrue(command.contains(AutodetectBuilder.BUCKET_SPAN_ARG + "120")); assertTrue(command.contains(AutodetectBuilder.LATENCY_ARG + "360")); @@ -80,8 +79,6 @@ public void testBuildAutodetectCommand() { assertTrue(command.contains(AutodetectBuilder.TIME_FIELD_ARG + "tf")); assertTrue(command.contains(AutodetectBuilder.JOB_ID_ARG + "unit-test-job")); - assertTrue(command.contains(AutodetectBuilder.PER_PARTITION_NORMALIZATION)); - int expectedPersistInterval = 10800 + AutodetectBuilder.calculateStaggeringInterval(job.getId()); assertTrue(command.contains(AutodetectBuilder.PERSIST_INTERVAL_ARG + expectedPersistInterval)); int expectedMaxQuantileInterval = 21600 + AutodetectBuilder.calculateStaggeringInterval(job.getId()); @@ -116,4 +113,4 @@ public void testBuildAutodetectCommand_givenPersistModelState() { private AutodetectBuilder autodetectBuilder(Job job) { return new AutodetectBuilder(job, filesToDelete, logger, env, settings, nativeController, processPipes); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index c5a459f70f3f2..05b6bc7209b87 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -172,25 +172,6 @@ public void testProcessResult_records() { verifyNoMoreInteractions(persister); } - public void testProcessResult_records_isPerPartitionNormalization() { - JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); - when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); - - AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder); - context.deleteInterimRequired = false; - AutodetectResult result = mock(AutodetectResult.class); - AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123); - record1.setPartitionFieldValue("pValue"); - AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123); - record2.setPartitionFieldValue("pValue"); - List records = Arrays.asList(record1, record2); - when(result.getRecords()).thenReturn(records); - processorUnderTest.processResult(context, result); - - verify(bulkBuilder, times(1)).persistRecords(records); - verify(bulkBuilder, never()).executeRequest(); - verifyNoMoreInteractions(persister); - } public void testProcessResult_influencers() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java index 4436fcc7026fe..f2cd6572db69d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/BucketNormalizableTests.java @@ -9,10 +9,8 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; -import org.elasticsearch.xpack.core.ml.job.results.PartitionScore; import org.junit.Before; -import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -46,11 +44,6 @@ public void setUpBucket() { AnomalyRecord record2 = new AnomalyRecord("foo", bucket.getTimestamp(), 600); record2.setRecordScore(2.0); bucket.setRecords(Arrays.asList(record1, record2)); - - List partitionScores = new ArrayList<>(); - partitionScores.add(new PartitionScore("pf1", "pv1", 0.3, 0.2, 0.1)); - partitionScores.add(new PartitionScore("pf1", "pv2", 0.5, 0.4, 0.01)); - bucket.setPartitionScores(partitionScores); } public void testIsContainerOnly() { @@ -106,15 +99,11 @@ public void testGetChildren() { BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME); List children = bn.getChildren(); - assertEquals(4, children.size()); + assertEquals(2, children.size()); assertTrue(children.get(0) instanceof BucketInfluencerNormalizable); assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON); assertTrue(children.get(1) instanceof BucketInfluencerNormalizable); assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON); - assertTrue(children.get(2) instanceof PartitionScoreNormalizable); - assertEquals(0.2, children.get(2).getNormalizedScore(), EPSILON); - assertTrue(children.get(3) instanceof PartitionScoreNormalizable); - assertEquals(0.4, children.get(3).getNormalizedScore(), EPSILON); } public void testGetChildren_GivenTypeBucketInfluencer() { @@ -132,7 +121,6 @@ public void testSetMaxChildrenScore_GivenDifferentScores() { BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 95.0)); - assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 42.0)); assertEquals(95.0, bucket.getAnomalyScore(), EPSILON); } @@ -141,7 +129,6 @@ public void testSetMaxChildrenScore_GivenSameScores() { BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME); assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 88.0)); - assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 2.0)); assertEquals(88.0, bucket.getAnomalyScore(), EPSILON); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilderTests.java index 64e595fd5a043..5d9c183c738e4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerBuilderTests.java @@ -21,11 +21,10 @@ public void testBuildNormalizerCommand() throws IOException { Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build()); String jobId = "unit-test-job"; - List command = new NormalizerBuilder(env, jobId, null, 300, true).build(); - assertEquals(4, command.size()); + List command = new NormalizerBuilder(env, jobId, null, 300).build(); + assertEquals(3, command.size()); assertTrue(command.contains("./normalize")); assertTrue(command.contains(AutodetectBuilder.BUCKET_SPAN_ARG + "300")); assertTrue(command.contains(AutodetectBuilder.LENGTH_ENCODED_INPUT_ARG)); - assertTrue(command.contains(AutodetectBuilder.PER_PARTITION_NORMALIZATION)); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java index 1b34226e33640..d06146ad53fdb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java @@ -49,7 +49,7 @@ public void testNormalize() throws IOException, InterruptedException { ExecutorService threadpool = Executors.newScheduledThreadPool(1); try { NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class); - when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), eq(false), + when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR)); Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool); @@ -58,7 +58,7 @@ public void testNormalize() throws IOException, InterruptedException { bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE)); List asNormalizables = Arrays.asList(new BucketNormalizable(bucket, INDEX_NAME)); - normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE); + normalizer.normalize(BUCKET_SPAN, asNormalizables, QUANTILES_STATE); assertEquals(1, asNormalizables.size()); assertEquals(FACTOR * INITIAL_SCORE, asNormalizables.get(0).getNormalizedScore(), 0.0001); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java index 2acaf97359477..9836cf93718e5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java @@ -33,7 +33,6 @@ import java.util.List; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyListOf; @@ -95,7 +94,7 @@ public void testUpdate_GivenBucketWithZeroScoreAndNoRecords() throws IOException buckets.add(bucket); givenProviderReturnsBuckets(buckets); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(0); verifyNothingWasUpdated(); @@ -113,7 +112,7 @@ public void testUpdate_GivenTwoBucketsOnlyOneUpdated() throws IOException { givenProviderReturnsBuckets(buckets); givenProviderReturnsRecords(new ArrayDeque<>()); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(1); verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any()); @@ -129,7 +128,7 @@ public void testUpdate_GivenSingleBucketWithAnomalyScoreAndNoRecords() throws IO givenProviderReturnsBuckets(buckets); givenProviderReturnsRecords(new ArrayDeque<>()); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(1); verifyBucketWasUpdated(1); @@ -150,7 +149,7 @@ public void testUpdate_GivenSingleBucketAndRecords() throws IOException { givenProviderReturnsBuckets(buckets); givenProviderReturnsRecords(records); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(2); verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any()); @@ -176,7 +175,7 @@ public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() thro givenProviderReturnsBuckets(batch1, batch2); givenProviderReturnsRecords(new ArrayDeque<>()); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(1); @@ -212,7 +211,7 @@ public void testUpdate_GivenTwoBucketsWithFirstHavingEnoughRecordsToForceSecondN recordIter.requireIncludeInterim(false); when(jobResultsProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(2); } @@ -224,7 +223,7 @@ public void testUpdate_GivenInfluencerWithBigChange() throws IOException { influencers.add(influencer); givenProviderReturnsInfluencers(influencers); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(1); verify(jobRenormalizedResultsPersister, times(1)).updateResults(any()); @@ -253,7 +252,7 @@ public void testUpdate_GivenShutdown() throws IOException { givenProviderReturnsRecords(records); scoresUpdater.shutdown(); - scoresUpdater.update(QUANTILES_STATE, 3600, 0, false); + scoresUpdater.update(QUANTILES_STATE, 3600, 0); verifyNormalizerWasInvoked(0); verify(jobRenormalizedResultsPersister, never()).updateBucket(any()); @@ -272,7 +271,7 @@ public void testDefaultRenormalizationWindowBasedOnTime() throws IOException { givenProviderReturnsRecords(new ArrayDeque<>()); givenProviderReturnsNoInfluencers(); - scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0, false); + scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0); verifyNormalizerWasInvoked(1); verifyBucketWasUpdated(1); @@ -289,7 +288,7 @@ public void testManualRenormalizationWindow() throws IOException { givenProviderReturnsRecords(new ArrayDeque<>()); givenProviderReturnsNoInfluencers(); - scoresUpdater.update(QUANTILES_STATE, 90000000L, 0, false); + scoresUpdater.update(QUANTILES_STATE, 90000000L, 0); verifyNormalizerWasInvoked(1); verifyBucketWasUpdated(1); @@ -307,7 +306,7 @@ public void testManualRenormalizationWindow_GivenExtension() throws IOException givenProviderReturnsRecords(new ArrayDeque<>()); givenProviderReturnsNoInfluencers(); - scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000, false); + scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000); verifyNormalizerWasInvoked(1); verifyBucketWasUpdated(1); @@ -339,7 +338,7 @@ private void givenNormalizerRaisesBigChangeFlag() { doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - List normalizables = (List) invocationOnMock.getArguments()[2]; + List normalizables = (List) invocationOnMock.getArguments()[1]; for (Normalizable normalizable : normalizables) { normalizable.raiseBigChangeFlag(); for (Normalizable child : normalizable.getChildren()) { @@ -348,7 +347,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { } return null; } - }).when(normalizer).normalize(anyInt(), anyBoolean(), anyList(), anyString()); + }).when(normalizer).normalize(anyInt(), anyList(), anyString()); } private void givenProviderReturnsBuckets(Deque batch1, Deque batch2) { @@ -416,7 +415,7 @@ private void givenProviderReturnsInfluencers(Deque influencers) { private void verifyNormalizerWasInvoked(int times) throws IOException { int bucketSpan = job.getAnalysisConfig() == null ? 0 : ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue(); verify(normalizer, times(times)).normalize( - eq(bucketSpan), eq(false), anyListOf(Normalizable.class), + eq(bucketSpan), anyListOf(Normalizable.class), eq(QUANTILES_STATE)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java index 769d3657b0497..e10d7191acc42 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java @@ -18,7 +18,6 @@ import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -43,10 +42,7 @@ public void setUpMocks() { public void testNormalize() throws InterruptedException { ExecutorService threadpool = Executors.newScheduledThreadPool(10); try { - boolean isPerPartitionNormalization = randomBoolean(); - - ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, threadpool, - isPerPartitionNormalization); + ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, threadpool); // Blast through many sets of quantiles in quick succession, faster than the normalizer can process them for (int i = 1; i < TEST_SIZE / 2; ++i) { @@ -61,7 +57,7 @@ public void testNormalize() throws InterruptedException { renormalizer.waitUntilIdle(); ArgumentCaptor stateCaptor = ArgumentCaptor.forClass(String.class); - verify(scoresUpdater, atLeastOnce()).update(stateCaptor.capture(), anyLong(), anyLong(), eq(isPerPartitionNormalization)); + verify(scoresUpdater, atLeastOnce()).update(stateCaptor.capture(), anyLong(), anyLong()); List quantilesUsed = stateCaptor.getAllValues(); assertFalse(quantilesUsed.isEmpty()); @@ -91,7 +87,7 @@ public void testNormalize() throws InterruptedException { public void testIsEnabled_GivenNormalizationWindowIsZero() { ScoresUpdater scoresUpdater = mock(ScoresUpdater.class); when(scoresUpdater.getNormalizationWindow()).thenReturn(0L); - ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean()); + ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null); assertThat(renormalizer.isEnabled(), is(false)); } @@ -99,7 +95,7 @@ public void testIsEnabled_GivenNormalizationWindowIsZero() { public void testIsEnabled_GivenNormalizationWindowGreaterThanZero() { ScoresUpdater scoresUpdater = mock(ScoresUpdater.class); when(scoresUpdater.getNormalizationWindow()).thenReturn(1L); - ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean()); + ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null); assertThat(renormalizer.isEnabled(), is(true)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java index 966501db43ff6..65343b0a068ac 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecordTests; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; -import org.elasticsearch.xpack.core.ml.job.results.PartitionScore; import java.io.IOException; import java.util.ArrayList; @@ -61,15 +60,6 @@ public Bucket createTestInstance(String jobId) { if (randomBoolean()) { bucket.setInterim(randomBoolean()); } - if (randomBoolean()) { - int size = randomInt(10); - List partitionScores = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - partitionScores.add(new PartitionScore(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20), randomDouble(), - randomDouble(), randomDouble())); - } - bucket.setPartitionScores(partitionScores); - } if (randomBoolean()) { bucket.setProcessingTimeMs(randomLong()); } @@ -235,15 +225,6 @@ public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() { assertFalse(bucket.isNormalizable()); } - public void testIsNormalizable_GivenAnomalyScoreIsZeroAndPartitionsScoresAreNonZero() { - Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123)); - bucket.setAnomalyScore(0.0); - bucket.setPartitionScores(Collections.singletonList(new PartitionScore("n", "v", 50.0, 40.0, 0.01))); - - assertTrue(bucket.isNormalizable()); - } - public void testIsNormalizable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() { Bucket bucket = new Bucket("foo", new Date(123), 123); bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123)); @@ -260,35 +241,7 @@ public void testIsNormalizable_GivenAnomalyScoreIsNonZeroAndRecordCountIsNonZero assertTrue(bucket.isNormalizable()); } - public void testPartitionAnomalyScore() { - List pScore = new ArrayList<>(); - pScore.add(new PartitionScore("pf", "pv1", 11.0, 10.0, 0.1)); - pScore.add(new PartitionScore("pf", "pv3", 51.0, 50.0, 0.1)); - pScore.add(new PartitionScore("pf", "pv4", 61.0, 60.0, 0.1)); - pScore.add(new PartitionScore("pf", "pv2", 41.0, 40.0, 0.1)); - - Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.setPartitionScores(pScore); - - double initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv1"); - assertEquals(11.0, initialAnomalyScore, 0.001); - double anomalyScore = bucket.partitionAnomalyScore("pv1"); - assertEquals(10.0, anomalyScore, 0.001); - initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv2"); - assertEquals(41.0, initialAnomalyScore, 0.001); - anomalyScore = bucket.partitionAnomalyScore("pv2"); - assertEquals(40.0, anomalyScore, 0.001); - initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv3"); - assertEquals(51.0, initialAnomalyScore, 0.001); - anomalyScore = bucket.partitionAnomalyScore("pv3"); - assertEquals(50.0, anomalyScore, 0.001); - initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv4"); - assertEquals(61.0, initialAnomalyScore, 0.001); - anomalyScore = bucket.partitionAnomalyScore("pv4"); - assertEquals(60.0, anomalyScore, 0.001); - } - - public void testId() { + public void testId() { Bucket bucket = new Bucket("foo", new Date(123), 60L); assertEquals("foo_bucket_123_60", bucket.getId()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/PartitionScoreTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/PartitionScoreTests.java deleted file mode 100644 index 74c3934c532dd..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/PartitionScoreTests.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.results; - -import org.elasticsearch.common.io.stream.Writeable.Reader; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.test.AbstractSerializingTestCase; -import org.elasticsearch.xpack.core.ml.job.results.PartitionScore; - -import java.io.IOException; - -import static org.hamcrest.Matchers.containsString; - -public class PartitionScoreTests extends AbstractSerializingTestCase { - - @Override - protected PartitionScore createTestInstance() { - return new PartitionScore(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20), randomDouble(), randomDouble(), - randomDouble()); - } - - @Override - protected Reader instanceReader() { - return PartitionScore::new; - } - - @Override - protected PartitionScore doParseInstance(XContentParser parser) { - return PartitionScore.STRICT_PARSER.apply(parser, null); - } - - public void testStrictParser() throws IOException { - String json = "{\"partition_field_name\":\"field_1\", \"partition_field_value\":\"x\", \"initial_record_score\": 3," + - " \"record_score\": 3, \"probability\": 0.001, \"foo\":\"bar\"}"; - try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> PartitionScore.STRICT_PARSER.apply(parser, null)); - - assertThat(e.getMessage(), containsString("unknown field [foo]")); - } - } - - public void testLenientParser() throws IOException { - String json = "{\"partition_field_name\":\"field_1\", \"partition_field_value\":\"x\", \"initial_record_score\": 3," + - " \"record_score\": 3, \"probability\": 0.001, \"foo\":\"bar\"}"; - try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { - PartitionScore.LENIENT_PARSER.apply(parser, null); - } - } -}