Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
* < 50 OR firstName = Jason')
* Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND
* time > 100')
* Sorted time intervals without overlapping if time condition is valid
*
* TODO: 1. Merge adjacent intervals
* 2. Set interval boundary using time granularity instead of millis
*/
@Nullable
private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
Expand Down Expand Up @@ -233,6 +237,14 @@ private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
}
}
return getUnionSortedIntervals(orIntervals);
case NOT:
assert operands.size() == 1;
List<Interval> childIntervals = getFilterTimeIntervals(operands.get(0));
if (childIntervals == null) {
return null;
} else {
return getComplementSortedIntervals(childIntervals);
}
case EQUALS: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
Expand Down Expand Up @@ -478,6 +490,25 @@ private List<Interval> getUnionTwoSortedIntervals(List<Interval> intervals1, Lis
return res;
}

/**
* Returns the complement (non-overlapping sorted intervals) of the given non-overlapping sorted intervals.
*/
private List<Interval> getComplementSortedIntervals(List<Interval> intervals) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) might want to add a unit test just for this function alone ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have test for individual interval set operations currently. We may add them as a separate PR

List<Interval> res = new ArrayList<>();
long startTime = MIN_START_TIME;
for (Interval interval : intervals) {
if (interval._min > startTime) {
res.add(new Interval(startTime, interval._min - 1));
}
if (interval._max == MAX_END_TIME) {
return res;
}
startTime = interval._max + 1;
}
res.add(new Interval(startTime, MAX_END_TIME));
return res;
}

/**
* Parse interval to millisecond as [min, max] with both sides included.
* E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as [1456, 16310]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String QUERY_2 = "SELECT * FROM testTable where memberId = 0";
private static final String QUERY_3 = "SELECT * FROM testTable where memberId IN (1, 2)";

private static final String QUERY_5 = "SELECT * FROM testTable where timeColumn = 40";
private static final String QUERY_6 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30";
private static final String QUERY_7 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50";
private static final String QUERY_8 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45";
private static final String QUERY_9 =
private static final String TIME_QUERY_1 = "SELECT * FROM testTable where timeColumn = 40";
private static final String TIME_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30";
private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50";
private static final String TIME_QUERY_4 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45";
private static final String TIME_QUERY_5 =
"SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)";
private static final String QUERY_10 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0";
private static final String TIME_QUERY_6 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0";

private static final String SDF_QUERY_1 = "SELECT * FROM testTable where timeColumn = 20200131";
private static final String SDF_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20200101 AND 20200331";
Expand All @@ -97,6 +97,9 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String SDF_QUERY_5 =
"SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";

private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30";
private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable WHERE NOT timeColumn > 30";

// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";
Expand Down Expand Up @@ -234,8 +237,8 @@ public void testEnablingEmptySegmentPruner() {
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);

// When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList(
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(
Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
when(indexingConfig.getStreamConfigs()).thenReturn(
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
Expand Down Expand Up @@ -332,12 +335,12 @@ public void testPartitionAwareSegmentPruner(QueryCompiler compiler) {
@Test(dataProvider = "compilerProvider")
public void testTimeSegmentPruner(QueryCompiler compiler) {
BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_5);
BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_6);
BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(QUERY_7);
BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(QUERY_8);
BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(QUERY_9);
BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(QUERY_10);
BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(TIME_QUERY_1);
BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(TIME_QUERY_2);
BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(TIME_QUERY_3);
BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(TIME_QUERY_4);
BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(TIME_QUERY_5);
BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(TIME_QUERY_6);
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
Expand Down Expand Up @@ -516,6 +519,35 @@ public void testTimeSegmentPrunerSimpleDateFormat(QueryCompiler compiler) {
assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet());
}

@Test
public void testTimeSegmentPrunerSql() {
CalciteSqlCompiler compiler = new CalciteSqlCompiler();
BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_1);
BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_2);
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);

TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);

TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore);
Set<String> onlineSegments = new HashSet<>();
String segment0 = "segment0";
onlineSegments.add(segment0);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS);
String segment1 = "segment1";
onlineSegments.add(segment1);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS);
String segment2 = "segment2";
onlineSegments.add(segment2);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS);
segmentPruner.init(idealState, externalView, onlineSegments);

assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment2)));
assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1)));
}

@Test(dataProvider = "compilerProvider")
public void testEmptySegmentPruner(QueryCompiler compiler) {
BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
Expand Down