Skip to content

Commit

Permalink
SQL: Fix metric aggs on date/time to not return double (#40377)
Browse files Browse the repository at this point in the history
Previously metric aggregations on date fields would return a double
which caused errors when trying to apply scalar functions on top, e.g.:
```
SELECT YEAR(MAX(date)) FROM test
```

Fixes: #40376
  • Loading branch information
matriv committed Mar 23, 2019
1 parent d229624 commit 41d0a03
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 21 deletions.
4 changes: 4 additions & 0 deletions x-pack/plugin/sql/qa/src/main/resources/agg.sql-spec
Expand Up @@ -276,6 +276,8 @@ aggMinWithAlias
SELECT gender g, MIN(emp_no) m FROM "test_emp" GROUP BY g ORDER BY gender;
aggMinOnDateTime
SELECT gender, MIN(birth_date) m FROM "test_emp" GROUP BY gender ORDER BY gender;
aggMinOnDateTimeCastAsDate
SELECT gender, YEAR(CAST(MIN(birth_date) AS DATE)) m FROM "test_emp" GROUP BY gender ORDER BY gender;

// Conditional MIN
aggMinWithHaving
Expand Down Expand Up @@ -332,6 +334,8 @@ aggMaxWithAlias
SELECT gender g, MAX(emp_no) m FROM "test_emp" GROUP BY g ORDER BY gender;
aggMaxOnDateTime
SELECT gender, MAX(birth_date) m FROM "test_emp" GROUP BY gender ORDER BY gender;
aggMaxOnDateTimeCastAsDate
SELECT gender, YEAR(CAST(MAX(birth_date) AS DATE)) m FROM "test_emp" GROUP BY gender ORDER BY gender;
aggAvgAndMaxWithLikeFilter
SELECT CAST(AVG(salary) AS LONG) AS avg, CAST(SUM(salary) AS LONG) AS s FROM "test_emp" WHERE first_name LIKE 'G%';

Expand Down
Expand Up @@ -432,7 +432,7 @@ private BucketExtractor createExtractor(FieldExtraction ref, BucketExtractor tot

if (ref instanceof MetricAggRef) {
MetricAggRef r = (MetricAggRef) ref;
return new MetricAggExtractor(r.name(), r.property(), r.innerKey());
return new MetricAggExtractor(r.name(), r.property(), r.innerKey(), cfg.zoneId(), r.isDateTimeBased());
}

if (ref instanceof TopHitsAggRef) {
Expand Down
Expand Up @@ -18,8 +18,10 @@
import org.elasticsearch.search.aggregations.metrics.Percentiles;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
import org.elasticsearch.xpack.sql.util.DateUtils;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;

Expand All @@ -30,24 +32,32 @@ public class MetricAggExtractor implements BucketExtractor {
private final String name;
private final String property;
private final String innerKey;
private final boolean isDateTimeBased;
private final ZoneId zoneId;

public MetricAggExtractor(String name, String property, String innerKey) {
public MetricAggExtractor(String name, String property, String innerKey, ZoneId zoneId, boolean isDateTimeBased) {
this.name = name;
this.property = property;
this.innerKey = innerKey;
this. isDateTimeBased =isDateTimeBased;
this.zoneId = zoneId;
}

MetricAggExtractor(StreamInput in) throws IOException {
name = in.readString();
property = in.readString();
innerKey = in.readOptionalString();
isDateTimeBased = in.readBoolean();
zoneId = ZoneId.of(in.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(property);
out.writeOptionalString(innerKey);
out.writeBoolean(isDateTimeBased);
out.writeString(zoneId.getId());
}

String name() {
Expand All @@ -62,6 +72,10 @@ String innerKey() {
return innerKey;
}

ZoneId zoneId() {
return zoneId;
}

@Override
public String getWriteableName() {
return NAME;
Expand All @@ -83,20 +97,33 @@ public Object extract(Bucket bucket) {
//if (innerKey == null) {
// throw new SqlIllegalArgumentException("Invalid innerKey {} specified for aggregation {}", innerKey, name);
//}
return ((InternalNumericMetricsAggregation.MultiValue) agg).value(property);
return handleDateTime(((InternalNumericMetricsAggregation.MultiValue) agg).value(property));
} else if (agg instanceof InternalFilter) {
// COUNT(expr) and COUNT(ALL expr) uses this type of aggregation to account for non-null values only
return ((InternalFilter) agg).getDocCount();
}

Object v = agg.getProperty(property);
return innerKey != null && v instanceof Map ? ((Map<?, ?>) v).get(innerKey) : v;
return handleDateTime(innerKey != null && v instanceof Map ? ((Map<?, ?>) v).get(innerKey) : v);
}

private Object handleDateTime(Object object) {
if (isDateTimeBased) {
if (object == null) {
return object;
} else if (object instanceof Number) {
return DateUtils.asDateTime(((Number) object).longValue(), zoneId);
} else {
throw new SqlIllegalArgumentException("Invalid date key returned: {}", object);
}
}
return object;
}

/**
* Check if the given aggregate has been executed and has computed values
* or not (the bucket is null).
*
*
* Waiting on https://github.com/elastic/elasticsearch/issues/34903
*/
private static boolean containsValues(InternalAggregation agg) {
Expand Down Expand Up @@ -130,11 +157,11 @@ public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

MetricAggExtractor other = (MetricAggExtractor) obj;
return Objects.equals(name, other.name)
&& Objects.equals(property, other.property)
Expand All @@ -146,4 +173,4 @@ public String toString() {
String i = innerKey != null ? "[" + innerKey + "]" : "";
return Aggs.ROOT_GROUP_NAME + ">" + name + "." + property + i;
}
}
}
Expand Up @@ -404,7 +404,7 @@ private Tuple<QueryContainer, AggPathInput> addAggFunction(GroupByKey groupingAg
// COUNT(<field_name>)
} else if (!c.distinct()) {
LeafAgg leafAgg = toAgg(functionId, f);
AggPathInput a = new AggPathInput(f, new MetricAggRef(leafAgg.id(), "doc_count", "_count"));
AggPathInput a = new AggPathInput(f, new MetricAggRef(leafAgg.id(), "doc_count", "_count", false));
queryC = queryC.with(queryC.aggs().addAgg(leafAgg));
return new Tuple<>(queryC, a);
}
Expand All @@ -430,14 +430,16 @@ private Tuple<QueryContainer, AggPathInput> addAggFunction(GroupByKey groupingAg
// FIXME: concern leak - hack around MatrixAgg which is not
// generalized (afaik)
aggInput = new AggPathInput(f,
new MetricAggRef(cAggPath, ia.innerName(), ia.innerKey() != null ? QueryTranslator.nameOf(ia.innerKey()) : null));
new MetricAggRef(cAggPath, ia.innerName(),
ia.innerKey() != null ? QueryTranslator.nameOf(ia.innerKey()) : null,
ia.dataType().isDateBased()));
}
else {
LeafAgg leafAgg = toAgg(functionId, f);
if (f instanceof TopHits) {
aggInput = new AggPathInput(f, new TopHitsAggRef(leafAgg.id(), f.dataType()));
} else {
aggInput = new AggPathInput(f, new MetricAggRef(leafAgg.id()));
aggInput = new AggPathInput(f, new MetricAggRef(leafAgg.id(), f.dataType().isDateBased()));
}
queryC = queryC.with(queryC.aggs().addAgg(leafAgg));
}
Expand Down
Expand Up @@ -17,19 +17,21 @@ public class MetricAggRef extends AggRef {
private final String name;
private final String property;
private final String innerKey;
private final boolean isDateTimeBased;

public MetricAggRef(String name) {
this(name, "value");
public MetricAggRef(String name, boolean isDateTimeBased) {
this(name, "value", isDateTimeBased);
}

public MetricAggRef(String name, String property) {
this(name, property, null);
public MetricAggRef(String name, String property, boolean isDateTimeBased) {
this(name, property, null, isDateTimeBased);
}

public MetricAggRef(String name, String property, String innerKey) {
public MetricAggRef(String name, String property, String innerKey, boolean isDateTimeBased) {
this.name = name;
this.property = property;
this.innerKey = innerKey;
this.isDateTimeBased = isDateTimeBased;
}

public String name() {
Expand All @@ -44,6 +46,10 @@ public String innerKey() {
return innerKey;
}

public boolean isDateTimeBased() {
return isDateTimeBased;
}

@Override
public String toString() {
String i = innerKey != null ? "[" + innerKey + "]" : "";
Expand Down
Expand Up @@ -10,9 +10,12 @@
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.SqlException;
import org.elasticsearch.xpack.sql.util.DateUtils;

import java.io.IOException;
import java.time.ZoneId;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand All @@ -22,7 +25,8 @@
public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<MetricAggExtractor> {

public static MetricAggExtractor randomMetricAggExtractor() {
return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16));
return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16),
randomZone(), randomBoolean());
}

@Override
Expand All @@ -37,7 +41,12 @@ protected Reader<MetricAggExtractor> instanceReader() {

@Override
protected MetricAggExtractor mutateInstance(MetricAggExtractor instance) throws IOException {
return new MetricAggExtractor(instance.name() + "mutated", instance.property(), instance.innerKey());
return new MetricAggExtractor(
instance.name() + "mutated",
instance.property() + "mutated",
instance.innerKey() + "mutated",
randomValueOtherThan(instance.zoneId(),
ESTestCase::randomZone), randomBoolean());
}

public void testNoAggs() {
Expand All @@ -48,29 +57,60 @@ public void testNoAggs() {
}

public void testSingleValueProperty() {
MetricAggExtractor extractor = randomMetricAggExtractor();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", null, false);

double value = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()), value);
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(value, extractor.extract(bucket));
}

public void testSingleValuePropertyDate() {
ZoneId zoneId = randomZone();
MetricAggExtractor extractor = new MetricAggExtractor("my_date_field", "property", "innerKey", zoneId, true);

double value = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()), value);
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(DateUtils.asDateTime((long) value , zoneId), extractor.extract(bucket));
}

public void testSingleValueInnerKey() {
MetricAggExtractor extractor = randomMetricAggExtractor();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", null, false);
double innerValue = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()),
singletonMap(extractor.innerKey(), innerValue));
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(innerValue, extractor.extract(bucket));
}

public void testSingleValueInnerKeyDate() {
ZoneId zoneId = randomZone();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", zoneId, true);

double innerValue = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()),
singletonMap(extractor.innerKey(), innerValue));
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(DateUtils.asDateTime((long) innerValue , zoneId), extractor.extract(bucket));
}

public void testMultiValueProperty() {
MetricAggExtractor extractor = randomMetricAggExtractor();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", null, false);

double value = randomDouble();
Aggregation agg = new TestMultiValueAggregation(extractor.name(), singletonMap(extractor.property(), value));
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(value, extractor.extract(bucket));
}

public void testMultiValuePropertyDate() {
ZoneId zoneId = randomZone();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", zoneId, true);

double value = randomDouble();
Aggregation agg = new TestMultiValueAggregation(extractor.name(), singletonMap(extractor.property(), value));
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(DateUtils.asDateTime((long) value , zoneId), extractor.extract(bucket));
}
}

0 comments on commit 41d0a03

Please sign in to comment.