Skip to content

Commit

Permalink
Add parsing for InternalScriptedMetric aggregation (#24738)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbuescher committed May 17, 2017
1 parent ce7326e commit 9fc9db2
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 5 deletions.
Expand Up @@ -412,7 +412,8 @@ public enum ValueType {
OBJECT_OR_STRING(START_OBJECT, VALUE_STRING),
OBJECT_ARRAY_BOOLEAN_OR_STRING(START_OBJECT, START_ARRAY, VALUE_BOOLEAN, VALUE_STRING),
OBJECT_ARRAY_OR_STRING(START_OBJECT, START_ARRAY, VALUE_STRING),
VALUE(VALUE_BOOLEAN, VALUE_NULL, VALUE_EMBEDDED_OBJECT, VALUE_NUMBER, VALUE_STRING);
VALUE(VALUE_BOOLEAN, VALUE_NULL, VALUE_EMBEDDED_OBJECT, VALUE_NUMBER, VALUE_STRING),
VALUE_OBJECT_ARRAY(VALUE_BOOLEAN, VALUE_NULL, VALUE_EMBEDDED_OBJECT, VALUE_NUMBER, VALUE_STRING, START_OBJECT, START_ARRAY);

private final EnumSet<XContentParser.Token> tokens;

Expand Down
Expand Up @@ -124,7 +124,7 @@ public Object getProperty(List<String> path) {

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.field("value", aggregation());
return builder.field(CommonFields.VALUE.getPreferredName(), aggregation());
}

@Override
Expand Down
@@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.metrics.scripted;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.search.aggregations.ParsedAggregation;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class ParsedScriptedMetric extends ParsedAggregation implements ScriptedMetric {
private List<Object> aggregation;

@Override
public String getType() {
return ScriptedMetricAggregationBuilder.NAME;
}

@Override
public Object aggregation() {
assert aggregation.size() == 1; // see InternalScriptedMetric#aggregations() for why we can assume this
return aggregation.get(0);
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.field(CommonFields.VALUE.getPreferredName(), aggregation());
}

private static final ObjectParser<ParsedScriptedMetric, Void> PARSER = new ObjectParser<>(ParsedScriptedMetric.class.getSimpleName(), true,
ParsedScriptedMetric::new);

static {
declareAggregationFields(PARSER);
PARSER.declareField((agg, value) -> agg.aggregation = Collections.singletonList(value),
ParsedScriptedMetric::parseValue, CommonFields.VALUE, ValueType.VALUE_OBJECT_ARRAY);
}

private static Object parseValue(XContentParser parser) throws IOException {
Token token = parser.currentToken();
Object value = null;
if (token == XContentParser.Token.VALUE_NULL) {
value = null;
} else if (token.isValue()) {
if (token == XContentParser.Token.VALUE_STRING) {
//binary values will be parsed back and returned as base64 strings when reading from json and yaml
value = parser.text();
} else if (token == XContentParser.Token.VALUE_NUMBER) {
value = parser.numberValue();
} else if (token == XContentParser.Token.VALUE_BOOLEAN) {
value = parser.booleanValue();
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
//binary values will be parsed back and returned as BytesArray when reading from cbor and smile
value = new BytesArray(parser.binaryValue());
}
} else if (token == XContentParser.Token.START_OBJECT) {
value = parser.map();
} else if (token == XContentParser.Token.START_ARRAY) {
value = parser.list();
}
return value;
}

public static ParsedScriptedMetric fromXContent(XContentParser parser, final String name) {
ParsedScriptedMetric aggregation = PARSER.apply(parser, null);
aggregation.setName(name);
return aggregation;
}
}
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.search.aggregations.metrics.percentiles.hdr.InternalHDRPercentilesTests;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentilesRanksTests;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentilesTests;
import org.elasticsearch.search.aggregations.metrics.scripted.InternalScriptedMetricTests;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSumTests;
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCountTests;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests;
Expand Down Expand Up @@ -130,6 +131,7 @@ private static List<InternalAggregationTestCase> getAggsTests() {
aggsTests.add(new InternalAdjacencyMatrixTests());
aggsTests.add(new SignificantLongTermsTests());
aggsTests.add(new SignificantStringTermsTests());
aggsTests.add(new InternalScriptedMetricTests());
return Collections.unmodifiableList(aggsTests);
}

Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.metrics.scripted;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
Expand All @@ -30,20 +31,46 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptSettings;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalAggregationTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

public class InternalScriptedMetricTests extends InternalAggregationTestCase<InternalScriptedMetric> {

private static final String REDUCE_SCRIPT_NAME = "reduceScript";
// randomized only once so that any random test instance has the same value
private boolean hasReduceScript = randomBoolean();
private boolean hasReduceScript;
private Supplier<Object>[] valueTypes;
private final Supplier<Object>[] leafValueSuppliers = new Supplier[] { () -> randomInt(), () -> randomLong(), () -> randomDouble(),
() -> randomFloat(), () -> randomBoolean(), () -> randomAlphaOfLength(5), () -> new GeoPoint(randomDouble(), randomDouble()),
() -> null };
private final Supplier<Object>[] nestedValueSuppliers = new Supplier[] { () -> new HashMap<String, Object>(),
() -> new ArrayList<>() };

@Override
public void setUp() throws Exception {
super.setUp();
hasReduceScript = randomBoolean();
// we want the same value types (also for nested lists, maps) for all random aggregations
int levels = randomIntBetween(1, 3);
valueTypes = new Supplier[levels];
for (int i = 0; i < levels; i++) {
if (i < levels - 1) {
valueTypes[i] = randomFrom(nestedValueSuppliers);
} else {
// the last one needs to be a leaf value, not map or list
valueTypes[i] = randomFrom(leafValueSuppliers);
}
}
}

@Override
protected InternalScriptedMetric createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Expand All @@ -56,7 +83,27 @@ protected InternalScriptedMetric createTestInstance(String name, List<PipelineAg
if (hasReduceScript) {
reduceScript = new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME, params);
}
return new InternalScriptedMetric(name, randomAlphaOfLength(5), reduceScript, pipelineAggregators, metaData);
Object randomValue = randomValue(valueTypes, 0);
return new InternalScriptedMetric(name, randomValue, reduceScript, pipelineAggregators, metaData);
}

@SuppressWarnings("unchecked")
private static Object randomValue(Supplier<Object>[] valueTypes, int level) {
Object value = valueTypes[level].get();
if (value instanceof Map) {
int elements = randomIntBetween(1, 5);
Map<String, Object> map = (Map<String, Object>) value;
for (int i = 0; i < elements; i++) {
map.put(randomAlphaOfLength(5), randomValue(valueTypes, level + 1));
}
} else if (value instanceof List) {
int elements = randomIntBetween(1,5);
List<Object> list = (List<Object>) value;
for (int i = 0; i < elements; i++) {
list.add(randomValue(valueTypes, level + 1));
}
}
return value;
}

/**
Expand Down Expand Up @@ -105,4 +152,52 @@ protected Reader<InternalScriptedMetric> instanceReader() {
return InternalScriptedMetric::new;
}

@Override
protected void assertFromXContent(InternalScriptedMetric aggregation, ParsedAggregation parsedAggregation) {
assertTrue(parsedAggregation instanceof ParsedScriptedMetric);
ParsedScriptedMetric parsed = (ParsedScriptedMetric) parsedAggregation;

assertValues(aggregation.aggregation(), parsed.aggregation());
}

private static void assertValues(Object expected, Object actual) {
if (expected instanceof Long) {
// longs that fit into the integer range are parsed back as integer
if (actual instanceof Integer) {
assertEquals(((Long) expected).intValue(), actual);
} else {
assertEquals(expected, actual);
}
} else if (expected instanceof Float) {
// based on the xContent type, floats are sometimes parsed back as doubles
if (actual instanceof Double) {
assertEquals(expected, ((Double) actual).floatValue());
} else {
assertEquals(expected, actual);
}
} else if (expected instanceof GeoPoint) {
assertTrue(actual instanceof Map);
GeoPoint point = (GeoPoint) expected;
Map<String, Object> pointMap = (Map<String, Object>) actual;
assertEquals(point.getLat(), pointMap.get("lat"));
assertEquals(point.getLon(), pointMap.get("lon"));
} else if (expected instanceof Map) {
Map<String, Object> expectedMap = (Map<String, Object>) expected;
Map<String, Object> actualMap = (Map<String, Object>) actual;
assertEquals(expectedMap.size(), actualMap.size());
for (String key : expectedMap.keySet()) {
assertValues(expectedMap.get(key), actualMap.get(key));
}
} else if (expected instanceof List) {
List<Object> expectedList = (List<Object>) expected;
List<Object> actualList = (List<Object>) actual;
assertEquals(expectedList.size(), actualList.size());
Iterator<Object> actualIterator = actualList.iterator();
for (Object element : expectedList) {
assertValues(element, actualIterator.next());
}
} else {
assertEquals(expected, actual);
}
}
}
Expand Up @@ -96,6 +96,8 @@
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.ParsedTDigestPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.ParsedTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.scripted.ParsedScriptedMetric;
import org.elasticsearch.search.aggregations.metrics.scripted.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.stats.ParsedStats;
import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsAggregationBuilder;
Expand Down Expand Up @@ -182,6 +184,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
map.put(AdjacencyMatrixAggregationBuilder.NAME, (p, c) -> ParsedAdjacencyMatrix.fromXContent(p, (String) c));
map.put(SignificantLongTerms.NAME, (p, c) -> ParsedSignificantLongTerms.fromXContent(p, (String) c));
map.put(SignificantStringTerms.NAME, (p, c) -> ParsedSignificantStringTerms.fromXContent(p, (String) c));
map.put(ScriptedMetricAggregationBuilder.NAME, (p, c) -> ParsedScriptedMetric.fromXContent(p, (String) c));

namedXContents = map.entrySet().stream()
.map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
Expand Down

0 comments on commit 9fc9db2

Please sign in to comment.