Skip to content

Commit

Permalink
Table indexing config validation (#6017)
Browse files Browse the repository at this point in the history
* Adding validation for table indexing config to check for valid column names

* * Addressing review comments: adding validation for FieldConfigList. Adding javadocs
* Bug fix in integration test regarding removal of existing column from schema

* Adding validation for StarTreeIndexingConfig

* Addressing review comments
  • Loading branch information
icefury71 committed Sep 18, 2020
1 parent d9aec17 commit 5548e79
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.IngestionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
Expand All @@ -55,8 +61,9 @@ private TableConfigUtils() {
* 1. Validation config
* 2. IngestionConfig
* 3. TierConfigs
* 4. Indexing config
*
* TODO: Add more validations for each section (e.g. verify column names used in the indexing, validate conditions are met for aggregateMetrics etc)
* TODO: Add more validations for each section (e.g. validate conditions are met for aggregateMetrics)
*/
public static void validate(TableConfig tableConfig, @Nullable Schema schema) {
if (tableConfig.getTableType() == TableType.REALTIME) {
Expand All @@ -65,6 +72,8 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema) {
validateValidationConfig(tableConfig, schema);
validateIngestionConfig(tableConfig.getIngestionConfig(), schema);
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
validateFieldConfigList(tableConfig.getFieldConfigList(), schema);
}

/**
Expand All @@ -75,8 +84,8 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema) {
*/
public static void validateTableName(TableConfig tableConfig) {
String tableName = tableConfig.getTableName();
if (tableName.contains(".")) {
throw new IllegalStateException("Table name: '" + tableName + "' containing '.' is not allowed");
if (tableName.contains(".") || tableName.contains(" ")) {
throw new IllegalStateException("Table name: '" + tableName + "' containing '.' or space is not allowed");
}
}

Expand Down Expand Up @@ -225,4 +234,103 @@ private static void validateTierConfigList(@Nullable List<TierConfig> tierConfig
}
}
}

/**
* Validates the Indexing Config
* Ensures that every referred column name exists in the corresponding schema
*/
private static void validateIndexingConfig(@Nullable IndexingConfig indexingConfig, @Nullable Schema schema) {
if (indexingConfig == null || schema == null) {
return;
}
Map<String, String> columnNameToConfigMap = new HashMap<>();

if (indexingConfig.getBloomFilterColumns() != null) {
for (String columnName : indexingConfig.getBloomFilterColumns()) {
columnNameToConfigMap.put(columnName, "Bloom Filter Config");
}
}
if (indexingConfig.getInvertedIndexColumns() != null) {
for (String columnName : indexingConfig.getInvertedIndexColumns()) {
columnNameToConfigMap.put(columnName, "Inverted Index Config");
}
}
if (indexingConfig.getNoDictionaryColumns() != null) {
for (String columnName : indexingConfig.getNoDictionaryColumns()) {
columnNameToConfigMap.put(columnName, "No Dictionary Column Config");
}
}
if (indexingConfig.getOnHeapDictionaryColumns() != null) {
for (String columnName : indexingConfig.getOnHeapDictionaryColumns()) {
columnNameToConfigMap.put(columnName, "On Heap Dictionary Column Config");
}
}
if (indexingConfig.getRangeIndexColumns() != null) {
for (String columnName : indexingConfig.getRangeIndexColumns()) {
columnNameToConfigMap.put(columnName, "Range Column Config");
}
}
if (indexingConfig.getSortedColumn() != null) {
for (String columnName : indexingConfig.getSortedColumn()) {
columnNameToConfigMap.put(columnName, "Sorted Column Config");
}
}
if (indexingConfig.getVarLengthDictionaryColumns() != null) {
for (String columnName : indexingConfig.getVarLengthDictionaryColumns()) {
columnNameToConfigMap.put(columnName, "Var Length Column Config");
}
}
List<StarTreeIndexConfig> starTreeIndexConfigList = indexingConfig.getStarTreeIndexConfigs();
if (starTreeIndexConfigList != null) {
for (StarTreeIndexConfig starTreeIndexConfig : starTreeIndexConfigList) {
// Dimension split order cannot be null
for (String columnName : starTreeIndexConfig.getDimensionsSplitOrder()) {
columnNameToConfigMap.put(columnName, "StarTreeIndex Config");
}
// Function column pairs cannot be null
for (String functionColumnPair : starTreeIndexConfig.getFunctionColumnPairs()) {
AggregationFunctionColumnPair columnPair;
try {
columnPair = AggregationFunctionColumnPair.fromColumnName(functionColumnPair);
} catch (Exception e) {
throw new IllegalStateException("Invalid StarTreeIndex config: " + functionColumnPair + ". Must be"
+ "in the form <Aggregation function>__<Column name>");
}
String columnName = columnPair.getColumn();
if (!columnName.equals(AggregationFunctionColumnPair.STAR)) {
columnNameToConfigMap.put(columnName, "StarTreeIndex Config");
}
}
List<String> skipDimensionList = starTreeIndexConfig.getSkipStarNodeCreationForDimensions();
if (skipDimensionList != null) {
for (String columnName : skipDimensionList) {
columnNameToConfigMap.put(columnName, "StarTreeIndex Config");
}
}
}
}

for (Map.Entry<String, String> entry : columnNameToConfigMap.entrySet()) {
String columnName = entry.getKey();
String configName = entry.getValue();
Preconditions.checkState(schema.getFieldSpecFor(columnName) != null,
"Column Name " + columnName + " defined in " + configName + " must be a valid column defined in the schema");
}
}

/**
* Validates the Field Config List in the given TableConfig
* Ensures that every referred column name exists in the corresponding schema
*/
private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList, @Nullable Schema schema) {
if (fieldConfigList == null || schema == null) {
return;
}

for (FieldConfig fieldConfig : fieldConfigList) {
String columnName = fieldConfig.getName();
Preconditions.checkState(schema.getFieldSpecFor(columnName) != null,
"Column Name " + columnName + " defined in field config list must be a valid column defined in the schema");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.pinot.core.util;

import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.Collections;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IngestionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
Expand Down Expand Up @@ -456,4 +459,155 @@ public void validateTierConfigs() {
// expected
}
}

@Test
public void testTableName() {
String[] malformedTableName = {"test.table", "test table"};
for (int i = 0; i < 2; i++) {
String tableName = malformedTableName[i];
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build();
try {
TableConfigUtils.validateTableName(tableConfig);
Assert.fail("Should fail for malformed table name : " + tableName);
} catch (IllegalStateException e) {
// expected
}
}
}

@Test
public void testValidateIndexingConfig() {
Schema schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.build();
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setBloomFilterColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Bloom filter column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInvertedIndexColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Inverted Index column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setNoDictionaryColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid No Dictionary column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setOnHeapDictionaryColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid On Heap Dictionary column name");
} catch (Exception e) {
// expected
}

tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(Arrays.asList("myCol2"))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Range Index column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setSortedColumn("myCol2").build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Sorted column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setVarLengthDictionaryColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Var Length Dictionary column name");
} catch (Exception e) {
// expected
}

// Although this config makes no sense, it should pass the validation phase
StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"),
Arrays.asList("myCol"),
Arrays.asList("SUM__myCol"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
Assert.fail("Should fail for valid StarTreeIndex config column name");
// expected
}

starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol2"),
Arrays.asList("myCol"),
Arrays.asList("SUM__myCol"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid StarTreeIndex config column name in dimension split order");
} catch (Exception e) {
// expected
}

starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"),
Arrays.asList("myCol2"),
Arrays.asList("SUM__myCol"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid StarTreeIndex config column name in skip star node for dimension");
} catch (Exception e) {
// expected
}

starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"),
Arrays.asList("myCol"),
Arrays.asList("SUM__myCol2"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid StarTreeIndex config column name in function column pair");
} catch (Exception e) {
// expected
}

FieldConfig fieldConfig = new FieldConfig("myCol2", null, null, null);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setFieldConfigList(Arrays.asList(fieldConfig)).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid column name in Field Config List");
} catch (Exception e) {
// expected
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@
],
"metricFieldSpecs": [
{
"name": "ArrDel15",
"name": "ActualElapsedTime",
"dataType": "INT"
},
{
Expand Down Expand Up @@ -316,4 +316,4 @@
"timeType": "DAYS"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class TableConfigBuilder {
private Map<String, String> _streamConfigs;
private SegmentPartitionConfig _segmentPartitionConfig;
private boolean _nullHandlingEnabled;
private List<String> _varLengthDictionaryColumns;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;

private TableCustomConfig _customConfig;
private QuotaConfig _quotaConfig;
Expand Down Expand Up @@ -247,6 +250,16 @@ public TableConfigBuilder setRangeIndexColumns(List<String> rangeIndexColumns) {
return this;
}

public TableConfigBuilder setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumns) {
_varLengthDictionaryColumns = varLengthDictionaryColumns;
return this;
}

public TableConfigBuilder setStarTreeIndexConfigs(List<StarTreeIndexConfig> starTreeIndexConfigs) {
_starTreeIndexConfigs = starTreeIndexConfigs;
return this;
}

public TableConfigBuilder setStreamConfigs(Map<String, String> streamConfigs) {
Preconditions.checkState(_tableType == TableType.REALTIME);
_streamConfigs = streamConfigs;
Expand Down Expand Up @@ -358,6 +371,8 @@ public TableConfig build() {
indexingConfig.setStreamConfigs(_streamConfigs);
indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);

if (_customConfig == null) {
_customConfig = new TableCustomConfig(null);
Expand Down

0 comments on commit 5548e79

Please sign in to comment.