Skip to content

Commit

Permalink
Add support for range index rule recommendation(#7034) (#7063)
Browse files Browse the repository at this point in the history
  • Loading branch information
GSharayu committed Jun 21, 2021
1 parent ce419d2 commit 9e26c24
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.controller.recommender.rules.io.params.InvertedSortedIndexJointRuleParams;
import org.apache.pinot.controller.recommender.rules.io.params.NoDictionaryOnHeapDictionaryJointRuleParams;
import org.apache.pinot.controller.recommender.rules.io.params.PartitionRuleParams;
import org.apache.pinot.controller.recommender.rules.io.params.RangeIndexRuleParams;
import org.apache.pinot.controller.recommender.rules.io.params.RealtimeProvisioningRuleParams;
import org.apache.pinot.controller.recommender.rules.io.params.SegmentSizeRuleParams;
import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class InputManager {
public InvertedSortedIndexJointRuleParams _invertedSortedIndexJointRuleParams =
new InvertedSortedIndexJointRuleParams();
public BloomFilterRuleParams _bloomFilterRuleParams = new BloomFilterRuleParams();
public RangeIndexRuleParams _rangeIndexRuleParams = new RangeIndexRuleParams();
public NoDictionaryOnHeapDictionaryJointRuleParams _noDictionaryOnHeapDictionaryJointRuleParams =
new NoDictionaryOnHeapDictionaryJointRuleParams();
public FlagQueryRuleParams _flagQueryRuleParams = new FlagQueryRuleParams();
Expand Down Expand Up @@ -454,6 +456,10 @@ public BloomFilterRuleParams getBloomFilterRuleParams() {
return _bloomFilterRuleParams;
}

public RangeIndexRuleParams getRangeIndexRuleParams() {
return _rangeIndexRuleParams;
}

public RealtimeProvisioningRuleParams getRealtimeProvisioningRuleParams() {
return _realtimeProvisioningRuleParams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.controller.recommender.rules.impl.KafkaPartitionRule;
import org.apache.pinot.controller.recommender.rules.impl.NoDictionaryOnHeapDictionaryJointRule;
import org.apache.pinot.controller.recommender.rules.impl.PinotTablePartitionRule;
import org.apache.pinot.controller.recommender.rules.impl.RangeIndexRule;
import org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule;
import org.apache.pinot.controller.recommender.rules.impl.SegmentSizeRule;
import org.apache.pinot.controller.recommender.rules.impl.VariedLengthDictionaryRule;
Expand Down Expand Up @@ -58,6 +59,8 @@ public static AbstractRule getRule(Rule rule, InputManager inputManager, ConfigM
return new PinotTablePartitionRule(inputManager, outputManager);
case BloomFilterRule:
return new BloomFilterRule(inputManager, outputManager);
case RangeIndexRule:
return new RangeIndexRule(inputManager, outputManager);
case NoDictionaryOnHeapDictionaryJointRule:
return new NoDictionaryOnHeapDictionaryJointRule(inputManager, outputManager);
case VariedLengthDictionaryRule:
Expand All @@ -77,6 +80,7 @@ public static AbstractRule getRule(Rule rule, InputManager inputManager, ConfigM
boolean _recommendPinotTablePartition = DEFAULT_RECOMMEND_PINOT_TABLE_PARTITION;
boolean _recommendInvertedSortedIndexJoint = DEFAULT_RECOMMEND_INVERTED_SORTED_INDEX_JOINT;
boolean _recommendBloomFilter = DEFAULT_RECOMMEND_BLOOM_FILTER;
boolean _recommendRangeIndex = DEFAULT_RECOMMEND_RANGE_INDEX;
boolean _recommendNoDictionaryOnHeapDictionaryJoint = DEFAULT_RECOMMEND_NO_DICTIONARY_ONHEAP_DICTIONARY_JOINT;
boolean _recommendVariedLengthDictionary = DEFAULT_RECOMMEND_VARIED_LENGTH_DICTIONARY;
boolean _recommendFlagQuery = DEFAULT_RECOMMEND_FLAG_QUERY;
Expand Down Expand Up @@ -118,6 +122,11 @@ public void setRecommendBloomFilter(boolean recommendBloomFilter) {
_recommendBloomFilter = recommendBloomFilter;
}

@JsonSetter(nulls = Nulls.SKIP)
public void setRecommendRangeIndex(boolean recommendRangeIndex) {
_recommendRangeIndex = recommendRangeIndex;
}

@JsonSetter(nulls = Nulls.SKIP)
public void setRecommendAggregateMetrics(boolean aggregateMetrics) {
_recommendAggregateMetrics = aggregateMetrics;
Expand Down Expand Up @@ -161,6 +170,10 @@ public boolean isRecommendBloomFilter() {
return _recommendBloomFilter;
}

public boolean isRecommendRangeIndex() {
return _recommendRangeIndex;
}

public boolean isRecommendAggregateMetrics() {
return _recommendAggregateMetrics;
}
Expand All @@ -184,6 +197,7 @@ public enum Rule {
VariedLengthDictionaryRule, // VariedLengthDictionaryRule must go after NoDictionaryOnHeapDictionaryJointRule since we do not recommend dictionary on NoDictionary cols
PinotTablePartitionRule, // PinotTablePartitionRule must go after KafkaPartitionRule to recommend realtime partitions, after NoDictionaryOnHeapDictionaryJointRule to correctly calculate record size
BloomFilterRule,
RangeIndexRule,
AggregateMetricsRule,
RealtimeProvisioningRule // this rule must be the last one because it needs the output of other rules as its input
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void run() {
// Exclude cols already having index
noDictCols.removeAll(_output.getIndexConfig().getInvertedIndexColumns());
noDictCols.remove(_output.getIndexConfig().getSortedColumn());
noDictCols.removeAll(_output.getIndexConfig()
.getRangeIndexColumns()); // TODO: Remove this after range index is implemented for no-dictionary
noDictCols.removeAll(_output.getIndexConfig().getRangeIndexColumns());
LOGGER.debug("noDictCols {}", noDictCols);

// Exclude MV cols TODO: currently no index column is only applicable for SV columns, change this after it's supported for MV
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.recommender.rules.impl;

import com.google.common.util.concurrent.AtomicDouble;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.controller.recommender.io.ConfigManager;
import org.apache.pinot.controller.recommender.io.InputManager;
import org.apache.pinot.controller.recommender.rules.AbstractRule;
import org.apache.pinot.controller.recommender.rules.io.params.RangeIndexRuleParams;
import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Create range-index for columns used in <, > and between predicate
* Skip the inverted and sorted index columns if recommended
*/
public class RangeIndexRule extends AbstractRule {
private final Logger LOGGER = LoggerFactory.getLogger(RangeIndexRule.class);
private final RangeIndexRuleParams _params;

public RangeIndexRule(InputManager input, ConfigManager output) {
super(input, output);
_params = input.getRangeIndexRuleParams();
}

@Override
public void run() {
int numCols = _input.getNumCols();
double[] weights = new double[numCols];
AtomicDouble totalWeight = new AtomicDouble(0);

// For each query, find out the range columns
// and accumulate the (weighted) frequencies
_input.getParsedQueries().forEach(query -> {
Double weight = _input.getQueryWeight(query);
totalWeight.addAndGet(weight);
FixedLenBitset fixedLenBitset = parseQuery(_input.getQueryContext(query));
LOGGER.debug("fixedLenBitset {}", fixedLenBitset);
for (Integer i : fixedLenBitset.getOffsets()) {
weights[i] += weight;
}
});
LOGGER.info("Weight: {}, Total {}", weights, totalWeight);

for (int i = 0; i < numCols; i++) {
String colName = _input.intToColName(i);
// This checks if column is not already recommended for inverted index or sorted index
// As currently, only numeric columns are selected in range index creation, we will skip non numeric columns
if (((weights[i] / totalWeight.get()) > _params.THRESHOLD_MIN_PERCENT_RANGE_INDEX) &&
!_output.getIndexConfig().getSortedColumn().equals(colName) &&
!_output.getIndexConfig().getInvertedIndexColumns().contains(colName) &&
_input.getFieldType(colName).isNumeric()) {
_output.getIndexConfig().getRangeIndexColumns().add(colName);
}
}
}

public FixedLenBitset parseQuery(QueryContext queryContext) {
if (queryContext.getFilter() == null) {
return FixedLenBitset.IMMUTABLE_EMPTY_SET;
}

return parsePredicateList(queryContext.getFilter());
}

/**
* As cardinality does not matter, AND and OR columns will be considered
* @param filterContext filterContext
* @return FixedLenBitset for range predicates in this query
*/
private FixedLenBitset parsePredicateList(FilterContext filterContext) {
FilterContext.Type type = filterContext.getType();
FixedLenBitset ret = MUTABLE_EMPTY_SET();
if (type == FilterContext.Type.AND || type == FilterContext.Type.OR) {
for (int i = 0; i < filterContext.getChildren().size(); i++) {
FixedLenBitset childResult = parsePredicateList(filterContext.getChildren().get(i));
ret.union(childResult);
}
} else {
ExpressionContext lhs = filterContext.getPredicate().getLhs();
String colName = lhs.toString();
if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
LOGGER.trace("Skipping the function {}", colName);
} else if (filterContext.getPredicate().getType() == Predicate.Type.RANGE) {
ret.add(_input.colNameToInt(colName));
}
}
return ret;
}

private FixedLenBitset MUTABLE_EMPTY_SET() {
return new FixedLenBitset(_input.getNumCols());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.recommender.rules.io.params;

import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;

/**
* Thresholds and parameters used in @RangeIndexRule
*/
public class RangeIndexRuleParams {
public Double THRESHOLD_MIN_PERCENT_RANGE_INDEX = RecommenderConstants.RangeIndexRule.DEFAULT_THRESHOLD_MIN_PERCENT_RANGE_INDEX;

public Double getTHRESHOLD_MIN_PERCENT_RANGE_INDEX() {
return THRESHOLD_MIN_PERCENT_RANGE_INDEX;
}

@JsonSetter(value = "THRESHOLD_MIN_PERCENT_RANGE_INDEX", nulls = Nulls.SKIP)
public void setTHRESHOLD_MIN_PERCENT_RANGE_INDEX(Double THRESHOLD_MIN_PERCENT_RANGE_INDEX) {
this.THRESHOLD_MIN_PERCENT_RANGE_INDEX = THRESHOLD_MIN_PERCENT_RANGE_INDEX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static class RulesToExecute {
public static final boolean DEFAULT_RECOMMEND_PINOT_TABLE_PARTITION = true;
public static final boolean DEFAULT_RECOMMEND_INVERTED_SORTED_INDEX_JOINT = true;
public static final boolean DEFAULT_RECOMMEND_BLOOM_FILTER = true;
public static final boolean DEFAULT_RECOMMEND_RANGE_INDEX = true;
public static final boolean DEFAULT_RECOMMEND_NO_DICTIONARY_ONHEAP_DICTIONARY_JOINT = true;
public static final boolean DEFAULT_RECOMMEND_AGGREGATE_METRICS = true;
public static final boolean DEFAULT_RECOMMEND_REALTIME_PROVISIONING = true;
Expand All @@ -65,6 +66,10 @@ public static class BloomFilterRule {
public static final double DEFAULT_THRESHOLD_MIN_PERCENT_EQ_BLOOMFILTER = 0.5d;
}

public static class RangeIndexRule {
public static final double DEFAULT_THRESHOLD_MIN_PERCENT_RANGE_INDEX = 0.4;
}

public static class NoDictionaryOnHeapDictionaryJointRule {
public static final double DEFAULT_THRESHOLD_MIN_FILTER_FREQ_DICTIONARY = 0d;
public static final double DEFAULT_THRESHOLD_MIN_SELECTION_FREQ_NO_DICTIONARY = 0.3d;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ void testBloomFilterRuleWithTimeSpecColumn()
assertEquals(output.getIndexConfig().getBloomFilterColumns().toString(), "[b, t, x]");
}

@Test
void testRangeIndexRule()
throws InvalidInputException, IOException {
loadInput("recommenderInput/RangeIndexInput.json");
ConfigManager output = new ConfigManager();
AbstractRule abstractRule =
RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.RangeIndexRule, _input, output);
abstractRule.run();
// Although column i has highest weight, it being string column, range index recommender will skip it and select next winner
assertNotEquals(output.getIndexConfig().getRangeIndexColumns().toString(), "[i]");
// index can be supported on dimension, date-time and metric columns
assertEquals(output.getIndexConfig().getRangeIndexColumns().toString(), "[t, j]");
}

@Test
void testNoDictionaryOnHeapDictionaryJointRule()
throws InvalidInputException, IOException {
Expand Down

0 comments on commit 9e26c24

Please sign in to comment.