Skip to content
Permalink
Browse files
Convert simple min/max SQL queries on __time to timeBoundary queries (#…
…12472)

* Support array based results in timeBoundary query

* Fix bug with query interval in timeBoundary

* Convert min(__time) and max(__time) SQL queries to timeBoundary

* Add tests for timeBoundary backed SQL queries

* Fix query plans for existing tests

* fixup! Convert min(__time) and max(__time) SQL queries to timeBoundary

* fixup! Add tests for timeBoundary backed SQL queries

* fixup! Fix bug with query interval in timeBoundary
  • Loading branch information
rohangarg committed Apr 25, 2022
1 parent b47316b commit 95694b5afa505aea906f05db41c8901559b8bd2b
Showing 9 changed files with 441 additions and 62 deletions.
@@ -48,6 +48,8 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
private static final QuerySegmentSpec DEFAULT_SEGMENT_SPEC = new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY);
public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime";
public static final String MAX_TIME_ARRAY_OUTPUT_NAME = "maxTimeArrayOutputName";
public static final String MIN_TIME_ARRAY_OUTPUT_NAME = "minTimeArrayOutputName";

private static final byte CACHE_TYPE_ID = 0x0;

@@ -42,6 +42,8 @@
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment;

import java.nio.ByteBuffer;
@@ -224,4 +226,40 @@ public Result<TimeBoundaryResultValue> apply(Object input)
}
};
}

@Override
public RowSignature resultArraySignature(TimeBoundaryQuery query)
{
if (query.isMinTime() || query.isMaxTime()) {
RowSignature.Builder builder = RowSignature.builder();
String outputName = query.isMinTime() ?
query.getContextValue(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MIN_TIME) :
query.getContextValue(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MAX_TIME);
return builder.add(outputName, ColumnType.LONG).build();
}
return super.resultArraySignature(query);
}

@Override
public Sequence<Object[]> resultsAsArrays(
TimeBoundaryQuery query,
Sequence<Result<TimeBoundaryResultValue>> resultSequence
)
{
if (query.isMaxTime()) {
return Sequences.map(
resultSequence,
result -> result == null || result.getValue() == null || result.getValue().getMaxTime() == null ? null :
new Object[]{result.getValue().getMaxTime().getMillis()}
);
} else if (query.isMinTime()) {
return Sequences.map(
resultSequence,
result -> result == null || result.getValue() == null || result.getValue().getMinTime() == null ? null :
new Object[]{result.getValue().getMinTime().getMillis()}
);
} else {
return super.resultsAsArrays(query, resultSequence);
}
}
}
@@ -22,6 +22,7 @@
import com.google.common.base.Function;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.BaseSequence;
@@ -45,6 +46,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.Iterator;
import java.util.List;
@@ -155,7 +157,7 @@ public Iterator<Result<TimeBoundaryResultValue>> make()
final DateTime minTime;
final DateTime maxTime;

if (legacyQuery.getFilter() != null) {
if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) {
minTime = getTimeBoundary(adapter, legacyQuery, false);
if (minTime == null) {
maxTime = null;
@@ -183,6 +185,15 @@ public void cleanup(Iterator<Result<TimeBoundaryResultValue>> toClean)
{

}

private boolean queryIntervalContainsAdapterInterval()
{
List<Interval> queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals();
if (queryIntervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", queryIntervals);
}
return queryIntervals.get(0).contains(adapter.getInterval());
}
}
);
}
@@ -25,11 +25,14 @@
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -289,6 +292,43 @@ public void testFilteredFilterSegments()
Assert.assertEquals(7, segments.size());
}

@Test(expected = UOE.class)
public void testResultArraySignature()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.build();
new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
}

@Test
public void testResultArraySignatureWithMinTime()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MIN_TIME)
.context(ImmutableMap.of(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "foo"))
.build();
RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
}

@Test
public void testResultArraySignatureWithMaxTime()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MAX_TIME)
.context(ImmutableMap.of(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "foo"))
.build();
RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
}

@Test
public void testCacheStrategy() throws Exception
{
@@ -19,11 +19,14 @@

package org.apache.druid.query.timeboundary;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.io.CharSource;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
@@ -35,6 +38,7 @@
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
@@ -47,6 +51,7 @@
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@@ -183,6 +188,32 @@ public void testFilteredTimeBoundaryQuery() throws IOException
Assert.assertEquals(DateTimes.of("2011-01-16T00:00:00.000Z"), maxTime);
}

@Test
@SuppressWarnings("unchecked")
public void testTimeFilteredTimeBoundaryQuery() throws IOException
{
QueryRunner customRunner = getCustomRunner();
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
)
)
.build();
List<Result<TimeBoundaryResultValue>> results =
customRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList();

Assert.assertTrue(Iterables.size(results) > 0);

TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();

Assert.assertEquals(DateTimes.of("2011-01-15T00:00:00.000Z"), minTime);
Assert.assertEquals(DateTimes.of("2011-01-15T01:00:00.000Z"), maxTime);
}

@Test
@SuppressWarnings("unchecked")
public void testFilteredTimeBoundaryQueryNoMatches() throws IOException
@@ -216,6 +247,22 @@ public void testTimeBoundary()
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
}

@Test(expected = UOE.class)
@SuppressWarnings("unchecked")
public void testTimeBoundaryArrayResults()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(null)
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
timeBoundaryQuery,
runner.run(QueryPlus.wrap(timeBoundaryQuery), context)
).toList();
}

@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMax()
@@ -235,6 +282,26 @@ public void testTimeBoundaryMax()
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
}

@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMaxArraysResults()
{
TimeBoundaryQuery maxTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MAX_TIME)
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
List<Object[]> maxTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
maxTimeBoundaryQuery,
runner.run(QueryPlus.wrap(maxTimeBoundaryQuery), context)
).toList();

Long maxTimeMillis = (Long) maxTime.get(0)[0];
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), new DateTime(maxTimeMillis, DateTimeZone.UTC));
Assert.assertEquals(1, maxTime.size());
}

@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMin()
@@ -254,6 +321,26 @@ public void testTimeBoundaryMin()
Assert.assertNull(maxTime);
}

@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMinArraysResults()
{
TimeBoundaryQuery minTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(TimeBoundaryQuery.MIN_TIME)
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
List<Object[]> minTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
minTimeBoundaryQuery,
runner.run(QueryPlus.wrap(minTimeBoundaryQuery), context)
).toList();

Long minTimeMillis = (Long) minTime.get(0)[0];
Assert.assertEquals(DateTimes.of("2011-01-12T00:00:00.000Z"), new DateTime(minTimeMillis, DateTimeZone.UTC));
Assert.assertEquals(1, minTime.size());
}

@Test
public void testMergeResults()
{
@@ -55,6 +55,8 @@
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -344,6 +346,7 @@ public int getMinTopNThreshold()
)
)
.put(GroupByQuery.class, groupByQueryRunnerFactory)
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER))
.build()
);

0 comments on commit 95694b5

Please sign in to comment.