Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Aggregations: Adds cumulative sum aggregation
This adds a new pipeline aggregation, the cumulative sum aggregation. This is a parent aggregation which must be specified as a sub-aggregation to a histogram or date_histogram aggregation. It will add a new aggregation to each bucket containing the sum of a specified metrics over this and all previous buckets.
- Loading branch information
Showing
9 changed files
with
573 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
48 changes: 48 additions & 0 deletions
48
...va/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.search.aggregations.pipeline.cumulativesum; | ||
|
||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder; | ||
|
||
import java.io.IOException; | ||
|
||
public class CumulativeSumBuilder extends PipelineAggregatorBuilder<CumulativeSumBuilder> { | ||
|
||
private String format; | ||
|
||
public CumulativeSumBuilder(String name) { | ||
super(name, CumulativeSumPipelineAggregator.TYPE.name()); | ||
} | ||
|
||
public CumulativeSumBuilder format(String format) { | ||
this.format = format; | ||
return this; | ||
} | ||
|
||
@Override | ||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { | ||
if (format != null) { | ||
builder.field(CumulativeSumParser.FORMAT.getPreferredName(), format); | ||
} | ||
return builder; | ||
} | ||
|
||
} |
96 changes: 96 additions & 0 deletions
96
...ava/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumParser.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.search.aggregations.pipeline.cumulativesum; | ||
|
||
import org.elasticsearch.common.ParseField; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.search.SearchParseException; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormat; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter; | ||
import org.elasticsearch.search.internal.SearchContext; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class CumulativeSumParser implements PipelineAggregator.Parser { | ||
|
||
public static final ParseField FORMAT = new ParseField("format"); | ||
public static final ParseField GAP_POLICY = new ParseField("gap_policy"); | ||
public static final ParseField UNIT = new ParseField("unit"); | ||
|
||
@Override | ||
public String type() { | ||
return CumulativeSumPipelineAggregator.TYPE.name(); | ||
} | ||
|
||
@Override | ||
public PipelineAggregatorFactory parse(String pipelineAggregatorName, XContentParser parser, SearchContext context) throws IOException { | ||
XContentParser.Token token; | ||
String currentFieldName = null; | ||
String[] bucketsPaths = null; | ||
String format = null; | ||
|
||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { | ||
if (token == XContentParser.Token.FIELD_NAME) { | ||
currentFieldName = parser.currentName(); | ||
} else if (token == XContentParser.Token.VALUE_STRING) { | ||
if (FORMAT.match(currentFieldName)) { | ||
format = parser.text(); | ||
} else if (BUCKETS_PATH.match(currentFieldName)) { | ||
bucketsPaths = new String[] { parser.text() }; | ||
} else { | ||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" | ||
+ currentFieldName + "].", parser.getTokenLocation()); | ||
} | ||
} else if (token == XContentParser.Token.START_ARRAY) { | ||
if (BUCKETS_PATH.match(currentFieldName)) { | ||
List<String> paths = new ArrayList<>(); | ||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { | ||
String path = parser.text(); | ||
paths.add(path); | ||
} | ||
bucketsPaths = paths.toArray(new String[paths.size()]); | ||
} else { | ||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" | ||
+ currentFieldName + "].", parser.getTokenLocation()); | ||
} | ||
} else { | ||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + pipelineAggregatorName + "].", | ||
parser.getTokenLocation()); | ||
} | ||
} | ||
|
||
if (bucketsPaths == null) { | ||
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() | ||
+ "] for derivative aggregation [" + pipelineAggregatorName + "]", parser.getTokenLocation()); | ||
} | ||
|
||
ValueFormatter formatter = null; | ||
if (format != null) { | ||
formatter = ValueFormat.Patternable.Number.format(format).formatter(); | ||
} | ||
|
||
return new CumulativeSumPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter); | ||
} | ||
|
||
} |
146 changes: 146 additions & 0 deletions
146
...ticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.search.aggregations.pipeline.cumulativesum; | ||
|
||
import com.google.common.collect.Lists; | ||
|
||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.search.aggregations.AggregatorFactory; | ||
import org.elasticsearch.search.aggregations.InternalAggregation; | ||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; | ||
import org.elasticsearch.search.aggregations.InternalAggregation.Type; | ||
import org.elasticsearch.search.aggregations.InternalAggregations; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator; | ||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; | ||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; | ||
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter; | ||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; | ||
|
||
public class CumulativeSumPipelineAggregator extends PipelineAggregator { | ||
|
||
public final static Type TYPE = new Type("cumulative_sum"); | ||
|
||
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { | ||
@Override | ||
public CumulativeSumPipelineAggregator readResult(StreamInput in) throws IOException { | ||
CumulativeSumPipelineAggregator result = new CumulativeSumPipelineAggregator(); | ||
result.readFrom(in); | ||
return result; | ||
} | ||
}; | ||
|
||
public static void registerStreams() { | ||
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); | ||
} | ||
|
||
private ValueFormatter formatter; | ||
|
||
public CumulativeSumPipelineAggregator() { | ||
} | ||
|
||
public CumulativeSumPipelineAggregator(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, | ||
Map<String, Object> metadata) { | ||
super(name, bucketsPaths, metadata); | ||
this.formatter = formatter; | ||
} | ||
|
||
@Override | ||
public Type type() { | ||
return TYPE; | ||
} | ||
|
||
@Override | ||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { | ||
InternalHistogram histo = (InternalHistogram) aggregation; | ||
List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets(); | ||
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory(); | ||
|
||
List newBuckets = new ArrayList<>(); | ||
double sum = 0; | ||
for (InternalHistogram.Bucket bucket : buckets) { | ||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.INSERT_ZEROS); | ||
sum += thisBucketValue; | ||
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), | ||
AGGREGATION_TRANFORM_FUNCTION)); | ||
aggs.add(new InternalSimpleValue(name(), sum, formatter, new ArrayList<PipelineAggregator>(), metaData())); | ||
InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), | ||
new InternalAggregations(aggs), bucket.getKeyed(), bucket.getFormatter()); | ||
newBuckets.add(newBucket); | ||
} | ||
return factory.create(newBuckets, histo); | ||
} | ||
|
||
@Override | ||
public void doReadFrom(StreamInput in) throws IOException { | ||
formatter = ValueFormatterStreams.readOptional(in); | ||
} | ||
|
||
@Override | ||
public void doWriteTo(StreamOutput out) throws IOException { | ||
ValueFormatterStreams.writeOptional(formatter, out); | ||
} | ||
|
||
public static class Factory extends PipelineAggregatorFactory { | ||
|
||
private final ValueFormatter formatter; | ||
|
||
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter) { | ||
super(name, TYPE.name(), bucketsPaths); | ||
this.formatter = formatter; | ||
} | ||
|
||
@Override | ||
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException { | ||
return new CumulativeSumPipelineAggregator(name, bucketsPaths, formatter, metaData); | ||
} | ||
|
||
@Override | ||
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List<PipelineAggregatorFactory> pipelineAggregatorFactories) { | ||
if (bucketsPaths.length != 1) { | ||
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() | ||
+ " must contain a single entry for aggregation [" + name + "]"); | ||
} | ||
if (!(parent instanceof HistogramAggregator.Factory)) { | ||
throw new IllegalStateException("cumulative sum aggregation [" + name | ||
+ "] must have a histogram or date_histogram as parent"); | ||
} else { | ||
HistogramAggregator.Factory histoParent = (HistogramAggregator.Factory) parent; | ||
if (histoParent.minDocCount() != 0) { | ||
throw new IllegalStateException("parent histogram of cumulative sum aggregation [" + name | ||
+ "] must have min_doc_count of 0"); | ||
} | ||
} | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.