From 580597c2fc295fd3bc335d97dcfda1a5ad2efa24 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Mon, 21 Mar 2022 17:53:12 -0700 Subject: [PATCH] Support NOT in TimeSegmentPruner --- .../segmentpruner/TimeSegmentPruner.java | 31 ++++++++++ .../segmentpruner/SegmentPrunerTest.java | 60 ++++++++++++++----- 2 files changed, 77 insertions(+), 14 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java index e84dabcad98b..0b1984ed7921 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java @@ -200,6 +200,10 @@ public Set prune(BrokerRequest brokerRequest, Set segments) { * < 50 OR firstName = Jason') * Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND * time > 100') + * Sorted time intervals without overlapping if time condition is valid + * + * TODO: 1. Merge adjacent intervals + * 2. Set interval boundary using time granularity instead of millis */ @Nullable private List getFilterTimeIntervals(Expression filterExpression) { @@ -233,6 +237,14 @@ private List getFilterTimeIntervals(Expression filterExpression) { } } return getUnionSortedIntervals(orIntervals); + case NOT: + assert operands.size() == 1; + List childIntervals = getFilterTimeIntervals(operands.get(0)); + if (childIntervals == null) { + return null; + } else { + return getComplementSortedIntervals(childIntervals); + } case EQUALS: { Identifier identifier = operands.get(0).getIdentifier(); if (identifier != null && identifier.getName().equals(_timeColumn)) { @@ -478,6 +490,25 @@ private List getUnionTwoSortedIntervals(List intervals1, Lis return res; } + /** + * Returns the complement (non-overlapping sorted intervals) of the given non-overlapping sorted intervals. + */ + private List getComplementSortedIntervals(List intervals) { + List res = new ArrayList<>(); + long startTime = MIN_START_TIME; + for (Interval interval : intervals) { + if (interval._min > startTime) { + res.add(new Interval(startTime, interval._min - 1)); + } + if (interval._max == MAX_END_TIME) { + return res; + } + startTime = interval._max + 1; + } + res.add(new Interval(startTime, MAX_END_TIME)); + return res; + } + /** * Parse interval to millisecond as [min, max] with both sides included. * E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as [1456, 16310] diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java index 9f330e0a47e4..ad0f99537623 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java @@ -80,13 +80,13 @@ public class SegmentPrunerTest extends ControllerTest { private static final String QUERY_2 = "SELECT * FROM testTable where memberId = 0"; private static final String QUERY_3 = "SELECT * FROM testTable where memberId IN (1, 2)"; - private static final String QUERY_5 = "SELECT * FROM testTable where timeColumn = 40"; - private static final String QUERY_6 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30"; - private static final String QUERY_7 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50"; - private static final String QUERY_8 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45"; - private static final String QUERY_9 = + private static final String TIME_QUERY_1 = "SELECT * FROM testTable where timeColumn = 40"; + private static final String TIME_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30"; + private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50"; + private static final String TIME_QUERY_4 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45"; + private static final String TIME_QUERY_5 = "SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)"; - private static final String QUERY_10 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0"; + private static final String TIME_QUERY_6 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0"; private static final String SDF_QUERY_1 = "SELECT * FROM testTable where timeColumn = 20200131"; private static final String SDF_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20200101 AND 20200331"; @@ -97,6 +97,9 @@ public class SegmentPrunerTest extends ControllerTest { private static final String SDF_QUERY_5 = "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530"; + private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30"; + private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable WHERE NOT timeColumn > 30"; + // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. private static final String KINESIS_STREAM_TYPE = "kinesis"; @@ -234,8 +237,8 @@ public void testEnablingEmptySegmentPruner() { assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner); // When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned. - when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList( - Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE))); + when(streamIngestionConfig.getStreamConfigMaps()).thenReturn( + Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE))); when(indexingConfig.getStreamConfigs()).thenReturn( Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)); segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore); @@ -332,12 +335,12 @@ public void testPartitionAwareSegmentPruner(QueryCompiler compiler) { @Test(dataProvider = "compilerProvider") public void testTimeSegmentPruner(QueryCompiler compiler) { BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1); - BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_5); - BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_6); - BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(QUERY_7); - BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(QUERY_8); - BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(QUERY_9); - BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(QUERY_10); + BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(TIME_QUERY_1); + BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(TIME_QUERY_2); + BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(TIME_QUERY_3); + BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(TIME_QUERY_4); + BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(TIME_QUERY_5); + BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(TIME_QUERY_6); // NOTE: Ideal state and external view are not used in the current implementation IdealState idealState = Mockito.mock(IdealState.class); ExternalView externalView = Mockito.mock(ExternalView.class); @@ -516,6 +519,35 @@ public void testTimeSegmentPrunerSimpleDateFormat(QueryCompiler compiler) { assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet()); } + @Test + public void testTimeSegmentPrunerSql() { + CalciteSqlCompiler compiler = new CalciteSqlCompiler(); + BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_1); + BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_2); + // NOTE: Ideal state and external view are not used in the current implementation + IdealState idealState = Mockito.mock(IdealState.class); + ExternalView externalView = Mockito.mock(ExternalView.class); + + TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME); + setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS); + + TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore); + Set onlineSegments = new HashSet<>(); + String segment0 = "segment0"; + onlineSegments.add(segment0); + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS); + String segment1 = "segment1"; + onlineSegments.add(segment1); + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS); + String segment2 = "segment2"; + onlineSegments.add(segment2); + setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS); + segmentPruner.init(idealState, externalView, onlineSegments); + + assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment2))); + assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1))); + } + @Test(dataProvider = "compilerProvider") public void testEmptySegmentPruner(QueryCompiler compiler) { BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);