Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Removing old per-partition normalization code #32816

Merged
merged 7 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
private static final ParseField OVERLAPPING_BUCKETS = new ParseField("overlapping_buckets");
private static final ParseField RESULT_FINALIZATION_WINDOW = new ParseField("result_finalization_window");
private static final ParseField MULTIVARIATE_BY_FIELDS = new ParseField("multivariate_by_fields");
private static final ParseField USER_PER_PARTITION_NORMALIZATION = new ParseField("use_per_partition_normalization");

public static final String ML_CATEGORY_FIELD = "mlcategory";
public static final Set<String> AUTO_CREATED_FIELDS = new HashSet<>(Collections.singletonList(ML_CATEGORY_FIELD));
Expand Down Expand Up @@ -98,7 +97,6 @@ private static ConstructingObjectParser<AnalysisConfig.Builder, Void> createPars
parser.declareBoolean(Builder::setOverlappingBuckets, OVERLAPPING_BUCKETS);
parser.declareLong(Builder::setResultFinalizationWindow, RESULT_FINALIZATION_WINDOW);
parser.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS);
parser.declareBoolean(Builder::setUsePerPartitionNormalization, USER_PER_PARTITION_NORMALIZATION);

return parser;
}
Expand All @@ -117,12 +115,11 @@ private static ConstructingObjectParser<AnalysisConfig.Builder, Void> createPars
private final Boolean overlappingBuckets;
private final Long resultFinalizationWindow;
private final Boolean multivariateByFields;
private final boolean usePerPartitionNormalization;

private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List<String> categorizationFilters,
CategorizationAnalyzerConfig categorizationAnalyzerConfig, TimeValue latency, String summaryCountFieldName,
List<Detector> detectors, List<String> influencers, Boolean overlappingBuckets, Long resultFinalizationWindow,
Boolean multivariateByFields, boolean usePerPartitionNormalization) {
Boolean multivariateByFields) {
this.detectors = detectors;
this.bucketSpan = bucketSpan;
this.latency = latency;
Expand All @@ -134,7 +131,6 @@ private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, Lis
this.overlappingBuckets = overlappingBuckets;
this.resultFinalizationWindow = resultFinalizationWindow;
this.multivariateByFields = multivariateByFields;
this.usePerPartitionNormalization = usePerPartitionNormalization;
}

public AnalysisConfig(StreamInput in) throws IOException {
Expand Down Expand Up @@ -164,8 +160,6 @@ public AnalysisConfig(StreamInput in) throws IOException {
}
}
}

usePerPartitionNormalization = in.readBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to check that if we are reading from an older node we consume the boolean (although we do nothing with it).

}

@Override
Expand Down Expand Up @@ -194,8 +188,6 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_5_0)) {
out.writeBoolean(false);
}

out.writeBoolean(usePerPartitionNormalization);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here we need to check that if we are writing to an older node we write a false.

}

/**
Expand Down Expand Up @@ -299,10 +291,6 @@ public Boolean getMultivariateByFields() {
return multivariateByFields;
}

public boolean getUsePerPartitionNormalization() {
return usePerPartitionNormalization;
}

/**
* Return the set of fields required by the analysis.
* These are the influencer fields, metric field, partition field,
Expand Down Expand Up @@ -403,9 +391,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (multivariateByFields != null) {
builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields);
}
if (usePerPartitionNormalization) {
builder.field(USER_PER_PARTITION_NORMALIZATION.getPreferredName(), usePerPartitionNormalization);
}
builder.endObject();
return builder;
}
Expand All @@ -416,7 +401,6 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
AnalysisConfig that = (AnalysisConfig) o;
return Objects.equals(latency, that.latency) &&
usePerPartitionNormalization == that.usePerPartitionNormalization &&
Objects.equals(bucketSpan, that.bucketSpan) &&
Objects.equals(categorizationFieldName, that.categorizationFieldName) &&
Objects.equals(categorizationFilters, that.categorizationFilters) &&
Expand All @@ -434,7 +418,7 @@ public int hashCode() {
return Objects.hash(
bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency,
summaryCountFieldName, detectors, influencers, overlappingBuckets, resultFinalizationWindow,
multivariateByFields, usePerPartitionNormalization
multivariateByFields
);
}

Expand All @@ -453,7 +437,6 @@ public static class Builder {
private Boolean overlappingBuckets;
private Long resultFinalizationWindow;
private Boolean multivariateByFields;
private boolean usePerPartitionNormalization = false;

public Builder(List<Detector> detectors) {
setDetectors(detectors);
Expand All @@ -472,7 +455,6 @@ public Builder(AnalysisConfig analysisConfig) {
this.overlappingBuckets = analysisConfig.overlappingBuckets;
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
this.multivariateByFields = analysisConfig.multivariateByFields;
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
}

public void setDetectors(List<Detector> detectors) {
Expand Down Expand Up @@ -535,10 +517,6 @@ public void setMultivariateByFields(Boolean multivariateByFields) {
this.multivariateByFields = multivariateByFields;
}

public void setUsePerPartitionNormalization(boolean usePerPartitionNormalization) {
this.usePerPartitionNormalization = usePerPartitionNormalization;
}

/**
* Checks the configuration is valid
* <ol>
Expand Down Expand Up @@ -571,16 +549,11 @@ public AnalysisConfig build() {

overlappingBuckets = verifyOverlappingBucketsConfig(overlappingBuckets, detectors);

if (usePerPartitionNormalization) {
checkDetectorsHavePartitionFields(detectors);
checkNoInfluencersAreSet(influencers);
}

verifyNoInconsistentNestedFieldNames();

return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig,
latency, summaryCountFieldName, detectors, influencers, overlappingBuckets,
resultFinalizationWindow, multivariateByFields, usePerPartitionNormalization);
resultFinalizationWindow, multivariateByFields);
}

private void verifyNoMetricFunctionsWhenSummaryCountFieldNameIsSet() {
Expand Down Expand Up @@ -704,23 +677,6 @@ private void verifyCategorizationFiltersAreValidRegex() {
}
}

private static void checkDetectorsHavePartitionFields(List<Detector> detectors) {
for (Detector detector : detectors) {
if (!Strings.isNullOrEmpty(detector.getPartitionFieldName())) {
return;
}
}
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD));
}

private static void checkNoInfluencersAreSet(List<String> influencers) {
if (!influencers.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS));
}
}

private static boolean isValidRegex(String exp) {
try {
Pattern.compile(exp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ public final class Messages {
"over_field_name cannot be used with function ''{0}''";
public static final String JOB_CONFIG_OVERLAPPING_BUCKETS_INCOMPATIBLE_FUNCTION =
"Overlapping buckets cannot be used with function ''{0}''";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS =
"A job configured with Per-Partition Normalization cannot use influencers";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD =
"If the job is configured with Per-Partition Normalization enabled a detector must have a partition field";
public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "Unknown function ''{0}''";
public static final String JOB_CONFIG_UPDATE_ANALYSIS_LIMITS_MODEL_MEMORY_LIMIT_CANNOT_BE_DECREASED =
"Invalid update value for analysis_limits: model_memory_limit cannot be decreased below current usage; " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,23 +227,6 @@ private static void addResultsMapping(XContentBuilder builder) throws IOExceptio
.startObject(Bucket.SCHEDULED_EVENTS.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(AnomalyRecord.PROBABILITY.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()

.startObject(Bucket.BUCKET_INFLUENCERS.getPreferredName())
.field(TYPE, NESTED)
Expand Down Expand Up @@ -328,7 +311,7 @@ private static void addTermFields(XContentBuilder builder, Collection<String> te
}

private static void addForecastFieldsToMapping(XContentBuilder builder) throws IOException {

// Forecast Output
builder.startObject(Forecast.FORECAST_LOWER.getPreferredName())
.field(TYPE, DOUBLE)
Expand Down Expand Up @@ -370,7 +353,7 @@ private static void addForecastFieldsToMapping(XContentBuilder builder) throws I
.field(TYPE, LONG)
.endObject();
}

/**
* AnomalyRecord fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Bucket Result POJO
Expand All @@ -43,7 +42,6 @@ public class Bucket implements ToXContentObject, Writeable {
public static final ParseField BUCKET_INFLUENCERS = new ParseField("bucket_influencers");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
public static final ParseField PARTITION_SCORES = new ParseField("partition_scores");
public static final ParseField SCHEDULED_EVENTS = new ParseField("scheduled_events");

// Used for QueryPage
Expand Down Expand Up @@ -82,8 +80,6 @@ private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignor
parser.declareObjectArray(Bucket::setBucketInfluencers, ignoreUnknownFields ?
BucketInfluencer.LENIENT_PARSER : BucketInfluencer.STRICT_PARSER, BUCKET_INFLUENCERS);
parser.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS);
parser.declareObjectArray(Bucket::setPartitionScores, ignoreUnknownFields ?
PartitionScore.LENIENT_PARSER : PartitionScore.STRICT_PARSER, PARTITION_SCORES);
parser.declareString((bucket, s) -> {}, Result.RESULT_TYPE);
parser.declareStringArray(Bucket::setScheduledEvents, SCHEDULED_EVENTS);

Expand All @@ -100,7 +96,6 @@ private static ConstructingObjectParser<Bucket, Void> createParser(boolean ignor
private boolean isInterim;
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
private long processingTimeMs;
private List<PartitionScore> partitionScores = Collections.emptyList();
private List<String> scheduledEvents = Collections.emptyList();

public Bucket(String jobId, Date timestamp, long bucketSpan) {
Expand All @@ -120,7 +115,6 @@ public Bucket(Bucket other) {
this.isInterim = other.isInterim;
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
this.processingTimeMs = other.processingTimeMs;
this.partitionScores = new ArrayList<>(other.partitionScores);
this.scheduledEvents = new ArrayList<>(other.scheduledEvents);
}

Expand All @@ -143,7 +137,6 @@ public Bucket(StreamInput in) throws IOException {
if (in.getVersion().before(Version.V_5_5_0)) {
in.readGenericValue();
}
partitionScores = in.readList(PartitionScore::new);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can get away without doing anything for BWC for the buckets because they are not being transferred between nodes. But I would like @droberts195 to confirm as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do need to consider BWC for these lists. If you look at the implementation of readList() and writeList() they start by reading/writing the list length. So we need to write an empty list to versions before 6.5, and read a list of something. We can replace PartitionScore::new with a function in Bucket that reads the same stuff that PartitionScore::new read but just discards it.

Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou Aug 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true for when there is a transport client which I didn't think of at the first place. So, yes, we'll need to do the trick of reading the scores. There is another place where I'm doing this: https://github.com/elastic/elasticsearch/blob/6.x/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java#L253. You can take a look and follow a similar approach. Note we only need that code in the 6.x branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dimitris-athanasiou ! That all makes sense.

if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
scheduledEvents = in.readList(StreamInput::readString);
if (scheduledEvents.isEmpty()) {
Expand Down Expand Up @@ -174,7 +167,6 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_5_5_0)) {
out.writeGenericValue(Collections.emptyMap());
}
out.writeList(partitionScores);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeStringList(scheduledEvents);
}
Expand All @@ -195,9 +187,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Result.IS_INTERIM.getPreferredName(), isInterim);
builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers);
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs);
if (partitionScores.isEmpty() == false) {
builder.field(PARTITION_SCORES.getPreferredName(), partitionScores);
}

if (scheduledEvents.isEmpty() == false) {
builder.field(SCHEDULED_EVENTS.getPreferredName(), scheduledEvents);
}
Expand Down Expand Up @@ -304,14 +294,6 @@ public void addBucketInfluencer(BucketInfluencer bucketInfluencer) {
bucketInfluencers.add(bucketInfluencer);
}

public List<PartitionScore> getPartitionScores() {
return partitionScores;
}

public void setPartitionScores(List<PartitionScore> scores) {
partitionScores = Objects.requireNonNull(scores);
}

public List<String> getScheduledEvents() {
return scheduledEvents;
}
Expand All @@ -320,24 +302,10 @@ public void setScheduledEvents(List<String> scheduledEvents) {
this.scheduledEvents = ExceptionsHelper.requireNonNull(scheduledEvents, SCHEDULED_EVENTS.getPreferredName());
}

public double partitionInitialAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();

return first.isPresent() ? first.get().getInitialRecordScore() : 0.0;
}

public double partitionAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();

return first.isPresent() ? first.get().getRecordScore() : 0.0;
}

@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, records,
isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs, scheduledEvents);
isInterim, bucketSpan, bucketInfluencers, processingTimeMs, scheduledEvents);
}

/**
Expand All @@ -360,7 +328,6 @@ public boolean equals(Object other) {
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
&& Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim)
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers)
&& Objects.equals(this.partitionScores, that.partitionScores)
&& (this.processingTimeMs == that.processingTimeMs)
&& Objects.equals(this.scheduledEvents, that.scheduledEvents);
}
Expand All @@ -374,6 +341,6 @@ public boolean equals(Object other) {
* @return true if the bucket should be normalized or false otherwise
*/
public boolean isNormalizable() {
return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0);
return anomalyScore > 0.0;
}
}
Loading