Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregations: add serial differencing pipeline aggregation #11196

Merged
merged 1 commit into from Jul 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelModule;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffParser;

import java.util.List;

Expand Down Expand Up @@ -120,6 +121,7 @@ public AggregationModule() {
pipelineAggParsers.add(CumulativeSumParser.class);
pipelineAggParsers.add(BucketScriptParser.class);
pipelineAggParsers.add(BucketSelectorParser.class);
pipelineAggParsers.add(SerialDiffParser.class);
}

/**
Expand Down
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.TransportMovAvgModelModule;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;

/**
* A module that registers all the transport streams for the addAggregation
Expand Down Expand Up @@ -133,6 +134,7 @@ protected void configure() {
CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams();
BucketSelectorPipelineAggregator.registerStreams();
SerialDiffPipelineAggregator.registerStreams();
}

@Override
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffBuilder;

public final class PipelineAggregatorBuilders {

Expand Down Expand Up @@ -69,4 +70,8 @@ public static final BucketSelectorBuilder having(String name) {
public static final CumulativeSumBuilder cumulativeSum(String name) {
return new CumulativeSumBuilder(name);
}

public static final SerialDiffBuilder diff(String name) {
return new SerialDiffBuilder(name);
}
}
@@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.pipeline.serialdiff;

import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;

import java.io.IOException;

public class SerialDiffBuilder extends PipelineAggregatorBuilder<SerialDiffBuilder> {

private String format;
private GapPolicy gapPolicy;
private Integer lag;

public SerialDiffBuilder(String name) {
super(name, SerialDiffPipelineAggregator.TYPE.name());
}

public SerialDiffBuilder format(String format) {
this.format = format;
return this;
}

public SerialDiffBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}

public SerialDiffBuilder lag(Integer lag) {
this.lag = lag;
return this;
}

@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(SerialDiffParser.FORMAT.getPreferredName(), format);
}
if (gapPolicy != null) {
builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
if (lag != null) {
builder.field(SerialDiffParser.LAG.getPreferredName(), lag);
}
return builder;
}

}
@@ -0,0 +1,116 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.pipeline.serialdiff;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;

public class SerialDiffParser implements PipelineAggregator.Parser {

public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField LAG = new ParseField("lag");

@Override
public String type() {
return SerialDiffPipelineAggregator.TYPE.name();
}

@Override
public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
GapPolicy gapPolicy = GapPolicy.SKIP;
int lag = 1;

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (context.parseFieldMatcher().match(currentFieldName, FORMAT)) {
format = parser.text();
} else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
bucketsPaths = new String[] { parser.text() };
} else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (context.parseFieldMatcher().match(currentFieldName, LAG)) {
lag = parser.intValue(true);
if (lag <= 0) {
throw new SearchParseException(context, "Lag must be a positive, non-zero integer. Value supplied was" +
lag + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
parser.getTokenLocation());
}
}

if (bucketsPaths == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
}

ValueFormatter formatter;
if (format != null) {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
} else {
formatter = ValueFormatter.RAW;
}

return new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths, formatter, gapPolicy, lag);
}

}
@@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.pipeline.serialdiff;

import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.*;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;

public class SerialDiffPipelineAggregator extends PipelineAggregator {

public final static Type TYPE = new Type("serial_diff");

public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public SerialDiffPipelineAggregator readResult(StreamInput in) throws IOException {
SerialDiffPipelineAggregator result = new SerialDiffPipelineAggregator();
result.readFrom(in);
return result;
}
};

public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
}

private ValueFormatter formatter;
private GapPolicy gapPolicy;
private int lag;

public SerialDiffPipelineAggregator() {
}

public SerialDiffPipelineAggregator(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy,
int lag, Map<String, Object> metadata) {
super(name, bucketsPaths, metadata);
this.formatter = formatter;
this.gapPolicy = gapPolicy;
this.lag = lag;
}

@Override
public Type type() {
return TYPE;
}

@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalHistogram histo = (InternalHistogram) aggregation;
List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();

List newBuckets = new ArrayList<>();
EvictingQueue<Double> lagWindow = EvictingQueue.create(lag);
int counter = 0;

for (InternalHistogram.Bucket bucket : buckets) {
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
InternalHistogram.Bucket newBucket = bucket;

counter += 1;

// Still under the initial lag period, add nothing and move on
Double lagValue;
if (counter <= lag) {
lagValue = Double.NaN;
} else {
lagValue = lagWindow.peek(); // Peek here, because we rely on add'ing to always move the window
}

// Normalize null's to NaN
if (thisBucketValue == null) {
thisBucketValue = Double.NaN;
}

// Both have values, calculate diff and replace the "empty" bucket
if (!Double.isNaN(thisBucketValue) && !Double.isNaN(lagValue)) {
double diff = thisBucketValue - lagValue;

List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<PipelineAggregator>(), metaData()));
newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
aggs), bucket.getKeyed(), bucket.getFormatter());
}


newBuckets.add(newBucket);
lagWindow.add(thisBucketValue);

}
return factory.create(newBuckets, histo);
}

@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
gapPolicy = GapPolicy.readFrom(in);
lag = in.readVInt();
}

@Override
public void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
gapPolicy.writeTo(out);
out.writeVInt(lag);
}

public static class Factory extends PipelineAggregatorFactory {

private final ValueFormatter formatter;
private GapPolicy gapPolicy;
private int lag;

public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, int lag) {
super(name, TYPE.name(), bucketsPaths);
this.formatter = formatter;
this.gapPolicy = gapPolicy;
this.lag = lag;
}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, lag, metaData);
}

}
}