Skip to content

Commit

Permalink
Support counter type in ESQL
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Apr 24, 2024
1 parent 0ac10c9 commit 87dcd03
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1709,10 +1709,6 @@ public Function<byte[], Number> pointReaderIfPossible() {

@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (indexMode == IndexMode.TIME_SERIES && metricType == TimeSeriesParams.MetricType.COUNTER) {
// Counters are not supported by ESQL so we load them in null
return BlockLoader.CONSTANT_NULLS;
}
if (hasDocValues()) {
return type.blockLoaderFromDocValues(name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -1655,6 +1657,53 @@ public void testMaxTruncationSizeSetting() {
}
}

public void testMetricsCounterField() {
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("sensor")).build();
client().admin()
.indices()
.prepareCreate("sensors")
.setSettings(settings)
.setMapping(
"@timestamp",
"type=date",
"sensor",
"type=keyword,time_series_dimension=true",
"my_counter",
"type=long,time_series_metric=counter"
)
.get();
long timestamp = between(1000, 5000);
long counter = between(0, 10);
int numDocs = between(1, 10);
List<Long> expectedCounters = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
counter += between(1, 10);
if (rarely()) {
counter = 0;
}
expectedCounters.add(counter);
timestamp += between(1, 100);
client().prepareIndex("sensors")
.setSource("@timestamp", timestamp, "sensor", randomFrom("s1", "s2"), "my_counter", counter)
.get();
}
client().admin().indices().prepareRefresh("sensors").get();
try (EsqlQueryResponse resp = run("FROM sensors | SORT @timestamp | LIMIT 1000 | KEEP my_counter")) {
int index = 0;
for (Page page : resp.pages()) {
LongBlock block = page.getBlock(0);
for (int i = 0; i < block.getPositionCount(); i++) {
assertThat(block.getLong(i), equalTo(expectedCounters.get(index++)));
}
}
}
for (String aggregate : List.of("max", "min", "sum", "avg")) {
String q = "FROM sensors | STATS " + aggregate + "(my_counter)";
var failure = expectThrows(Exception.class, () -> run(q).close());
assertThat(failure.getMessage(), containsString("illegal agg type: counter#long"));
}
}

private void clearPersistentSettings(Setting<?>... settings) {
Settings.Builder clearedSettings = Settings.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
Expand Down Expand Up @@ -338,7 +339,7 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
List<Layout.ChannelSet> inverse = source.layout.inverse();
for (int channel = 0; channel < inverse.size(); channel++) {
elementTypes[channel] = PlannerUtils.toElementType(inverse.get(channel).type());
encoders[channel] = switch (inverse.get(channel).type().typeName()) {
encoders[channel] = switch (EsqlDataTypes.unwrapType(inverse.get(channel).type()).typeName()) {
case "ip" -> TopNEncoder.IP;
case "text", "keyword" -> TopNEncoder.UTF8;
case "version" -> TopNEncoder.VERSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ public static ElementType toElementType(DataType dataType, MappedFieldType.Field
// TODO: support forStats for shape aggregations, like st_centroid
return ElementType.BYTES_REF;
}
DataType unwrapped = EsqlDataTypes.unwrapType(dataType);
if (unwrapped != dataType) {
return toElementType(unwrapped, fieldExtractPreference);
}
throw EsqlIllegalArgumentException.illegalDataType(dataType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.type.DataTypeRegistry;
import org.elasticsearch.xpack.ql.type.DataTypes;

import java.util.Collection;

Expand All @@ -37,10 +36,10 @@ public Collection<DataType> dataTypes() {
@Override
public DataType fromEs(String typeName, TimeSeriesParams.MetricType metricType) {
if (metricType == TimeSeriesParams.MetricType.COUNTER) {
// Counter fields will be a counter type, for now they are unsupported
return DataTypes.UNSUPPORTED;
return EsqlDataTypes.getCounterType(typeName);
} else {
return EsqlDataTypes.fromName(typeName);
}
return EsqlDataTypes.fromName(typeName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public final class EsqlDataTypes {
public static final DataType GEO_SHAPE = new DataType("geo_shape", Integer.MAX_VALUE, false, false, true);
public static final DataType CARTESIAN_SHAPE = new DataType("cartesian_shape", Integer.MAX_VALUE, false, false, true);

private static final String COUNTER_PREFIX = "counter#";
public static final DataType COUNTER_LONG = defineCounterType(LONG);
public static final DataType COUNTER_INTEGER = defineCounterType(INTEGER);
public static final DataType COUNTER_DOUBLE = defineCounterType(DOUBLE);

private static final Collection<DataType> TYPES = Stream.of(
BOOLEAN,
UNSUPPORTED,
Expand All @@ -77,7 +82,10 @@ public final class EsqlDataTypes {
GEO_POINT,
CARTESIAN_POINT,
CARTESIAN_SHAPE,
GEO_SHAPE
GEO_SHAPE,
COUNTER_LONG,
COUNTER_INTEGER,
COUNTER_DOUBLE
).sorted(Comparator.comparing(DataType::typeName)).toList();

private static final Map<String, DataType> NAME_TO_TYPE = TYPES.stream().collect(toUnmodifiableMap(DataType::typeName, t -> t));
Expand Down Expand Up @@ -232,4 +240,26 @@ public static DataType widenSmallNumericTypes(DataType type) {
}
return type;
}

public static DataType getCounterType(String typeName) {
return fromTypeName(COUNTER_PREFIX + typeName);
}

/**
* Returns the enclosing type if the given type is a counter type; otherwise just returns the input.
*/
public static DataType unwrapType(DataType dataType) {
String typeName = dataType.typeName();
if (typeName.startsWith(COUNTER_PREFIX)) {
final DataType unwrapped = fromTypeName(typeName.replace(COUNTER_PREFIX, ""));
if (unwrapped != null) {
return unwrapped;
}
}
return dataType;
}

private static DataType defineCounterType(DataType root) {
return new DataType(COUNTER_PREFIX + root.typeName(), null, root.size(), root.isInteger(), root.isRational(), root.hasDocValues());
}
}

0 comments on commit 87dcd03

Please sign in to comment.