Skip to content

Commit

Permalink
[ML] Removing old per-partition normalization code (#32816)
Browse files Browse the repository at this point in the history
[ML] Removing old per-partition normalization code

Per-partition normalization is an old, undocumented feature that was
never used by clients. It has been superseded by per-partition maximum
scoring.

To maintain communication compatibility with nodes prior to 6.5 it is
necessary to maintain/cope with the old wire format
  • Loading branch information
edsavage committed Aug 15, 2018
1 parent 7ff37ae commit 8ce1ab3
Show file tree
Hide file tree
Showing 29 changed files with 99 additions and 596 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
private static final ParseField OVERLAPPING_BUCKETS = new ParseField("overlapping_buckets");
private static final ParseField RESULT_FINALIZATION_WINDOW = new ParseField("result_finalization_window");
private static final ParseField MULTIVARIATE_BY_FIELDS = new ParseField("multivariate_by_fields");
private static final ParseField USER_PER_PARTITION_NORMALIZATION = new ParseField("use_per_partition_normalization");

public static final String ML_CATEGORY_FIELD = "mlcategory";
public static final Set<String> AUTO_CREATED_FIELDS = new HashSet<>(Collections.singletonList(ML_CATEGORY_FIELD));
Expand Down Expand Up @@ -98,7 +97,6 @@ private static ConstructingObjectParser<AnalysisConfig.Builder, Void> createPars
parser.declareBoolean(Builder::setOverlappingBuckets, OVERLAPPING_BUCKETS);
parser.declareLong(Builder::setResultFinalizationWindow, RESULT_FINALIZATION_WINDOW);
parser.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS);
parser.declareBoolean(Builder::setUsePerPartitionNormalization, USER_PER_PARTITION_NORMALIZATION);

return parser;
}
Expand All @@ -117,12 +115,11 @@ private static ConstructingObjectParser<AnalysisConfig.Builder, Void> createPars
private final Boolean overlappingBuckets;
private final Long resultFinalizationWindow;
private final Boolean multivariateByFields;
private final boolean usePerPartitionNormalization;

private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List<String> categorizationFilters,
CategorizationAnalyzerConfig categorizationAnalyzerConfig, TimeValue latency, String summaryCountFieldName,
List<Detector> detectors, List<String> influencers, Boolean overlappingBuckets, Long resultFinalizationWindow,
Boolean multivariateByFields, boolean usePerPartitionNormalization) {
Boolean multivariateByFields) {
this.detectors = detectors;
this.bucketSpan = bucketSpan;
this.latency = latency;
Expand All @@ -134,7 +131,6 @@ private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, Lis
this.overlappingBuckets = overlappingBuckets;
this.resultFinalizationWindow = resultFinalizationWindow;
this.multivariateByFields = multivariateByFields;
this.usePerPartitionNormalization = usePerPartitionNormalization;
}

public AnalysisConfig(StreamInput in) throws IOException {
Expand Down Expand Up @@ -165,7 +161,12 @@ public AnalysisConfig(StreamInput in) throws IOException {
}
}

usePerPartitionNormalization = in.readBoolean();
// BWC for removed per-partition normalization
// Version check is temporarily against the latest to satisfy CI tests
// TODO change to V_6_5_0 after successful backport to 6.x
if (in.getVersion().before(Version.V_7_0_0_alpha1)) {
in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -195,7 +196,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(false);
}

out.writeBoolean(usePerPartitionNormalization);
// BWC for removed per-partition normalization
// Version check is temporarily against the latest to satisfy CI tests
// TODO change to V_6_5_0 after successful backport to 6.x
if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
out.writeBoolean(false);
}
}

/**
Expand Down Expand Up @@ -299,10 +305,6 @@ public Boolean getMultivariateByFields() {
return multivariateByFields;
}

public boolean getUsePerPartitionNormalization() {
return usePerPartitionNormalization;
}

/**
* Return the set of fields required by the analysis.
* These are the influencer fields, metric field, partition field,
Expand Down Expand Up @@ -403,9 +405,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (multivariateByFields != null) {
builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields);
}
if (usePerPartitionNormalization) {
builder.field(USER_PER_PARTITION_NORMALIZATION.getPreferredName(), usePerPartitionNormalization);
}
builder.endObject();
return builder;
}
Expand All @@ -416,7 +415,6 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
AnalysisConfig that = (AnalysisConfig) o;
return Objects.equals(latency, that.latency) &&
usePerPartitionNormalization == that.usePerPartitionNormalization &&
Objects.equals(bucketSpan, that.bucketSpan) &&
Objects.equals(categorizationFieldName, that.categorizationFieldName) &&
Objects.equals(categorizationFilters, that.categorizationFilters) &&
Expand All @@ -434,7 +432,7 @@ public int hashCode() {
return Objects.hash(
bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency,
summaryCountFieldName, detectors, influencers, overlappingBuckets, resultFinalizationWindow,
multivariateByFields, usePerPartitionNormalization
multivariateByFields
);
}

Expand All @@ -453,7 +451,6 @@ public static class Builder {
private Boolean overlappingBuckets;
private Long resultFinalizationWindow;
private Boolean multivariateByFields;
private boolean usePerPartitionNormalization = false;

public Builder(List<Detector> detectors) {
setDetectors(detectors);
Expand All @@ -472,7 +469,6 @@ public Builder(AnalysisConfig analysisConfig) {
this.overlappingBuckets = analysisConfig.overlappingBuckets;
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
this.multivariateByFields = analysisConfig.multivariateByFields;
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
}

public void setDetectors(List<Detector> detectors) {
Expand Down Expand Up @@ -535,10 +531,6 @@ public void setMultivariateByFields(Boolean multivariateByFields) {
this.multivariateByFields = multivariateByFields;
}

public void setUsePerPartitionNormalization(boolean usePerPartitionNormalization) {
this.usePerPartitionNormalization = usePerPartitionNormalization;
}

/**
* Checks the configuration is valid
* <ol>
Expand Down Expand Up @@ -571,16 +563,11 @@ public AnalysisConfig build() {

overlappingBuckets = verifyOverlappingBucketsConfig(overlappingBuckets, detectors);

if (usePerPartitionNormalization) {
checkDetectorsHavePartitionFields(detectors);
checkNoInfluencersAreSet(influencers);
}

verifyNoInconsistentNestedFieldNames();

return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig,
latency, summaryCountFieldName, detectors, influencers, overlappingBuckets,
resultFinalizationWindow, multivariateByFields, usePerPartitionNormalization);
resultFinalizationWindow, multivariateByFields);
}

private void verifyNoMetricFunctionsWhenSummaryCountFieldNameIsSet() {
Expand Down Expand Up @@ -704,23 +691,6 @@ private void verifyCategorizationFiltersAreValidRegex() {
}
}

private static void checkDetectorsHavePartitionFields(List<Detector> detectors) {
for (Detector detector : detectors) {
if (!Strings.isNullOrEmpty(detector.getPartitionFieldName())) {
return;
}
}
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD));
}

private static void checkNoInfluencersAreSet(List<String> influencers) {
if (!influencers.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS));
}
}

private static boolean isValidRegex(String exp) {
try {
Pattern.compile(exp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ public final class Messages {
"over_field_name cannot be used with function ''{0}''";
public static final String JOB_CONFIG_OVERLAPPING_BUCKETS_INCOMPATIBLE_FUNCTION =
"Overlapping buckets cannot be used with function ''{0}''";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS =
"A job configured with Per-Partition Normalization cannot use influencers";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD =
"If the job is configured with Per-Partition Normalization enabled a detector must have a partition field";
public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "Unknown function ''{0}''";
public static final String JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED =
"Invalid update value for analysis_limits: model_memory_limit cannot be decreased below current usage; " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,23 +227,6 @@ private static void addResultsMapping(XContentBuilder builder) throws IOExceptio
.startObject(Bucket.SCHEDULED_EVENTS.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()

.startObject(Bucket.BUCKET_INFLUENCERS.getPreferredName())
.field(TYPE, NESTED)
Expand Down Expand Up @@ -328,7 +311,7 @@ private static void addTermFields(XContentBuilder builder, Collection<String> te
}

private static void addForecastFieldsToMapping(XContentBuilder builder) throws IOException {

// Forecast Output
builder.startObject(Forecast.FORECAST_LOWER.getPreferredName())
.field(TYPE, DOUBLE)
Expand Down Expand Up @@ -370,7 +353,7 @@ private static void addForecastFieldsToMapping(XContentBuilder builder) throws I
.field(TYPE, LONG)
.endObject();
}

/**
* AnomalyRecord fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Bucket Result POJO
Expand All @@ -43,7 +42,6 @@ public class Bucket implements ToXContentObject, Writeable {
public static final ParseField BUCKET_INFLUENCERS = new ParseField("bucket_influencers");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
public static final ParseField PARTITION_SCORES = new ParseField("partition_scores");
public static final ParseField SCHEDULED_EVENTS = new ParseField("scheduled_events");

// Used for QueryPage
Expand All @@ -58,6 +56,19 @@ public class Bucket implements ToXContentObject, Writeable {
public static final ConstructingObjectParser<Bucket, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<Bucket, Void> LENIENT_PARSER = createParser(true);

/* *
* Read and discard the old (prior to 6.5) perPartitionNormalization values
*/
public static Bucket readOldPerPartitionNormalization(StreamInput in) throws IOException {
in.readString();
in.readString();
in.readDouble();
in.readDouble();
in.readDouble();

return null;
}

private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<Bucket, Void> parser = new ConstructingObjectParser<>(RESULT_TYPE_VALUE, ignoreUnknownFields,
a -> new Bucket((String) a[0], (Date) a[1], (long) a[2]));
Expand All @@ -82,8 +93,6 @@ private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignor
parser.declareObjectArray(Bucket::setBucketInfluencers, ignoreUnknownFields ?
BucketInfluencer.LENIENT_PARSER : BucketInfluencer.STRICT_PARSER, BUCKET_INFLUENCERS);
parser.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS);
parser.declareObjectArray(Bucket::setPartitionScores, ignoreUnknownFields ?
PartitionScore.LENIENT_PARSER : PartitionScore.STRICT_PARSER, PARTITION_SCORES);
parser.declareString((bucket, s) -> {}, Result.RESULT_TYPE);
parser.declareStringArray(Bucket::setScheduledEvents, SCHEDULED_EVENTS);

Expand All @@ -100,7 +109,6 @@ private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignor
private boolean isInterim;
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
private long processingTimeMs;
private List<PartitionScore> partitionScores = Collections.emptyList();
private List<String> scheduledEvents = Collections.emptyList();

public Bucket(String jobId, Date timestamp, long bucketSpan) {
Expand All @@ -120,7 +128,6 @@ public Bucket(Bucket other) {
this.isInterim = other.isInterim;
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
this.processingTimeMs = other.processingTimeMs;
this.partitionScores = new ArrayList<>(other.partitionScores);
this.scheduledEvents = new ArrayList<>(other.scheduledEvents);
}

Expand All @@ -143,7 +150,10 @@ public Bucket(StreamInput in) throws IOException {
if (in.getVersion().before(Version.V_5_5_0)) {
in.readGenericValue();
}
partitionScores = in.readList(PartitionScore::new);
// bwc for perPartitionNormalization
if (in.getVersion().before(Version.V_6_5_0)) {
in.readList(Bucket::readOldPerPartitionNormalization);
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
scheduledEvents = in.readList(StreamInput::readString);
if (scheduledEvents.isEmpty()) {
Expand Down Expand Up @@ -174,7 +184,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_5_5_0)) {
out.writeGenericValue(Collections.emptyMap());
}
out.writeList(partitionScores);
// bwc for perPartitionNormalization
if (out.getVersion().before(Version.V_6_5_0)) {
out.writeList(Collections.emptyList());
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeStringList(scheduledEvents);
}
Expand All @@ -195,9 +208,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Result.IS_INTERIM.getPreferredName(), isInterim);
builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers);
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs);
if (partitionScores.isEmpty() == false) {
builder.field(PARTITION_SCORES.getPreferredName(), partitionScores);
}

if (scheduledEvents.isEmpty() == false) {
builder.field(SCHEDULED_EVENTS.getPreferredName(), scheduledEvents);
}
Expand Down Expand Up @@ -304,14 +315,6 @@ public void addBucketInfluencer(BucketInfluencer bucketInfluencer) {
bucketInfluencers.add(bucketInfluencer);
}

public List<PartitionScore> getPartitionScores() {
return partitionScores;
}

public void setPartitionScores(List<PartitionScore> scores) {
partitionScores = Objects.requireNonNull(scores);
}

public List<String> getScheduledEvents() {
return scheduledEvents;
}
Expand All @@ -320,24 +323,10 @@ public void setScheduledEvents(List<String> scheduledEvents) {
this.scheduledEvents = ExceptionsHelper.requireNonNull(scheduledEvents, SCHEDULED_EVENTS.getPreferredName());
}

public double partitionInitialAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();

return first.isPresent() ? first.get().getInitialRecordScore() : 0.0;
}

public double partitionAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();

return first.isPresent() ? first.get().getRecordScore() : 0.0;
}

@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, records,
isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs, scheduledEvents);
isInterim, bucketSpan, bucketInfluencers, processingTimeMs, scheduledEvents);
}

/**
Expand All @@ -360,7 +349,6 @@ public boolean equals(Object other) {
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
&& Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim)
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers)
&& Objects.equals(this.partitionScores, that.partitionScores)
&& (this.processingTimeMs == that.processingTimeMs)
&& Objects.equals(this.scheduledEvents, that.scheduledEvents);
}
Expand All @@ -374,6 +362,6 @@ public boolean equals(Object other) {
* @return true if the bucket should be normalized or false otherwise
*/
public boolean isNormalizable() {
return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0);
return anomalyScore > 0.0;
}
}
Loading

0 comments on commit 8ce1ab3

Please sign in to comment.