Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #11196 from polyfractal/feature/aggs_2_0_diff
Aggregations: add serial differencing pipeline aggregation
- Loading branch information
Showing
11 changed files
with
750 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
67 changes: 67 additions & 0 deletions
67
...ain/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.search.aggregations.pipeline.serialdiff; | ||
|
||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder; | ||
|
||
import java.io.IOException; | ||
|
||
public class SerialDiffBuilder extends PipelineAggregatorBuilder<SerialDiffBuilder> { | ||
|
||
private String format; | ||
private GapPolicy gapPolicy; | ||
private Integer lag; | ||
|
||
public SerialDiffBuilder(String name) { | ||
super(name, SerialDiffPipelineAggregator.TYPE.name()); | ||
} | ||
|
||
public SerialDiffBuilder format(String format) { | ||
this.format = format; | ||
return this; | ||
} | ||
|
||
public SerialDiffBuilder gapPolicy(GapPolicy gapPolicy) { | ||
this.gapPolicy = gapPolicy; | ||
return this; | ||
} | ||
|
||
public SerialDiffBuilder lag(Integer lag) { | ||
this.lag = lag; | ||
return this; | ||
} | ||
|
||
@Override | ||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { | ||
if (format != null) { | ||
builder.field(SerialDiffParser.FORMAT.getPreferredName(), format); | ||
} | ||
if (gapPolicy != null) { | ||
builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); | ||
} | ||
if (lag != null) { | ||
builder.field(SerialDiffParser.LAG.getPreferredName(), lag); | ||
} | ||
return builder; | ||
} | ||
|
||
} |
116 changes: 116 additions & 0 deletions
116
...main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffParser.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.search.aggregations.pipeline.serialdiff; | ||
|
||
import org.elasticsearch.common.ParseField; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.search.SearchParseException; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormat; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter; | ||
import org.elasticsearch.search.internal.SearchContext; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; | ||
|
||
public class SerialDiffParser implements PipelineAggregator.Parser { | ||
|
||
public static final ParseField FORMAT = new ParseField("format"); | ||
public static final ParseField GAP_POLICY = new ParseField("gap_policy"); | ||
public static final ParseField LAG = new ParseField("lag"); | ||
|
||
@Override | ||
public String type() { | ||
return SerialDiffPipelineAggregator.TYPE.name(); | ||
} | ||
|
||
@Override | ||
public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException { | ||
XContentParser.Token token; | ||
String currentFieldName = null; | ||
String[] bucketsPaths = null; | ||
String format = null; | ||
GapPolicy gapPolicy = GapPolicy.SKIP; | ||
int lag = 1; | ||
|
||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { | ||
if (token == XContentParser.Token.FIELD_NAME) { | ||
currentFieldName = parser.currentName(); | ||
} else if (token == XContentParser.Token.VALUE_STRING) { | ||
if (context.parseFieldMatcher().match(currentFieldName, FORMAT)) { | ||
format = parser.text(); | ||
} else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) { | ||
bucketsPaths = new String[] { parser.text() }; | ||
} else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY)) { | ||
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); | ||
} else { | ||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" | ||
+ currentFieldName + "].", parser.getTokenLocation()); | ||
} | ||
} else if (token == XContentParser.Token.VALUE_NUMBER) { | ||
if (context.parseFieldMatcher().match(currentFieldName, LAG)) { | ||
lag = parser.intValue(true); | ||
if (lag <= 0) { | ||
throw new SearchParseException(context, "Lag must be a positive, non-zero integer. Value supplied was" + | ||
lag + " in [" + reducerName + "]: [" | ||
+ currentFieldName + "].", parser.getTokenLocation()); | ||
} | ||
} else { | ||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" | ||
+ currentFieldName + "].", parser.getTokenLocation()); | ||
} | ||
} else if (token == XContentParser.Token.START_ARRAY) { | ||
if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) { | ||
List<String> paths = new ArrayList<>(); | ||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { | ||
String path = parser.text(); | ||
paths.add(path); | ||
} | ||
bucketsPaths = paths.toArray(new String[paths.size()]); | ||
} else { | ||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" | ||
+ currentFieldName + "].", parser.getTokenLocation()); | ||
} | ||
} else { | ||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].", | ||
parser.getTokenLocation()); | ||
} | ||
} | ||
|
||
if (bucketsPaths == null) { | ||
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() | ||
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation()); | ||
} | ||
|
||
ValueFormatter formatter; | ||
if (format != null) { | ||
formatter = ValueFormat.Patternable.Number.format(format).formatter(); | ||
} else { | ||
formatter = ValueFormatter.RAW; | ||
} | ||
|
||
return new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths, formatter, gapPolicy, lag); | ||
} | ||
|
||
} |
161 changes: 161 additions & 0 deletions
161
...g/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.search.aggregations.pipeline.serialdiff; | ||
|
||
import com.google.common.collect.EvictingQueue; | ||
import com.google.common.collect.Lists; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.search.aggregations.InternalAggregation; | ||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; | ||
import org.elasticsearch.search.aggregations.InternalAggregation.Type; | ||
import org.elasticsearch.search.aggregations.InternalAggregations; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; | ||
import org.elasticsearch.search.aggregations.pipeline.*; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; | ||
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; | ||
|
||
public class SerialDiffPipelineAggregator extends PipelineAggregator { | ||
|
||
public final static Type TYPE = new Type("serial_diff"); | ||
|
||
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { | ||
@Override | ||
public SerialDiffPipelineAggregator readResult(StreamInput in) throws IOException { | ||
SerialDiffPipelineAggregator result = new SerialDiffPipelineAggregator(); | ||
result.readFrom(in); | ||
return result; | ||
} | ||
}; | ||
|
||
public static void registerStreams() { | ||
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); | ||
} | ||
|
||
private ValueFormatter formatter; | ||
private GapPolicy gapPolicy; | ||
private int lag; | ||
|
||
public SerialDiffPipelineAggregator() { | ||
} | ||
|
||
public SerialDiffPipelineAggregator(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, | ||
int lag, Map<String, Object> metadata) { | ||
super(name, bucketsPaths, metadata); | ||
this.formatter = formatter; | ||
this.gapPolicy = gapPolicy; | ||
this.lag = lag; | ||
} | ||
|
||
@Override | ||
public Type type() { | ||
return TYPE; | ||
} | ||
|
||
@Override | ||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { | ||
InternalHistogram histo = (InternalHistogram) aggregation; | ||
List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets(); | ||
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory(); | ||
|
||
List newBuckets = new ArrayList<>(); | ||
EvictingQueue<Double> lagWindow = EvictingQueue.create(lag); | ||
int counter = 0; | ||
|
||
for (InternalHistogram.Bucket bucket : buckets) { | ||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); | ||
InternalHistogram.Bucket newBucket = bucket; | ||
|
||
counter += 1; | ||
|
||
// Still under the initial lag period, add nothing and move on | ||
Double lagValue; | ||
if (counter <= lag) { | ||
lagValue = Double.NaN; | ||
} else { | ||
lagValue = lagWindow.peek(); // Peek here, because we rely on add'ing to always move the window | ||
} | ||
|
||
// Normalize null's to NaN | ||
if (thisBucketValue == null) { | ||
thisBucketValue = Double.NaN; | ||
} | ||
|
||
// Both have values, calculate diff and replace the "empty" bucket | ||
if (!Double.isNaN(thisBucketValue) && !Double.isNaN(lagValue)) { | ||
double diff = thisBucketValue - lagValue; | ||
|
||
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION)); | ||
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<PipelineAggregator>(), metaData())); | ||
newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations( | ||
aggs), bucket.getKeyed(), bucket.getFormatter()); | ||
} | ||
|
||
|
||
newBuckets.add(newBucket); | ||
lagWindow.add(thisBucketValue); | ||
|
||
} | ||
return factory.create(newBuckets, histo); | ||
} | ||
|
||
@Override | ||
public void doReadFrom(StreamInput in) throws IOException { | ||
formatter = ValueFormatterStreams.readOptional(in); | ||
gapPolicy = GapPolicy.readFrom(in); | ||
lag = in.readVInt(); | ||
} | ||
|
||
@Override | ||
public void doWriteTo(StreamOutput out) throws IOException { | ||
ValueFormatterStreams.writeOptional(formatter, out); | ||
gapPolicy.writeTo(out); | ||
out.writeVInt(lag); | ||
} | ||
|
||
public static class Factory extends PipelineAggregatorFactory { | ||
|
||
private final ValueFormatter formatter; | ||
private GapPolicy gapPolicy; | ||
private int lag; | ||
|
||
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, int lag) { | ||
super(name, TYPE.name(), bucketsPaths); | ||
this.formatter = formatter; | ||
this.gapPolicy = gapPolicy; | ||
this.lag = lag; | ||
} | ||
|
||
@Override | ||
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException { | ||
return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, lag, metaData); | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.