Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table indexing config validation #6017

Merged
merged 4 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.IngestionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
Expand All @@ -55,8 +61,9 @@ private TableConfigUtils() {
* 1. Validation config
* 2. IngestionConfig
* 3. TierConfigs
* 4. Indexing config
*
* TODO: Add more validations for each section (e.g. verify column names used in the indexing, validate conditions are met for aggregateMetrics etc)
* TODO: Add more validations for each section (e.g. validate conditions are met for aggregateMetrics)
*/
public static void validate(TableConfig tableConfig, @Nullable Schema schema) {
if (tableConfig.getTableType() == TableType.REALTIME) {
Expand All @@ -65,6 +72,8 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema) {
validateValidationConfig(tableConfig, schema);
validateIngestionConfig(tableConfig.getIngestionConfig(), schema);
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
icefury71 marked this conversation as resolved.
Show resolved Hide resolved
validateFieldConfigList(tableConfig.getFieldConfigList(), schema);
}

/**
Expand All @@ -75,8 +84,8 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema) {
*/
public static void validateTableName(TableConfig tableConfig) {
String tableName = tableConfig.getTableName();
if (tableName.contains(".")) {
throw new IllegalStateException("Table name: '" + tableName + "' containing '.' is not allowed");
if (tableName.contains(".") || tableName.contains(" ")) {
throw new IllegalStateException("Table name: '" + tableName + "' containing '.' or space is not allowed");
}
}

Expand Down Expand Up @@ -225,4 +234,103 @@ private static void validateTierConfigList(@Nullable List<TierConfig> tierConfig
}
}
}

/**
* Validates the Indexing Config
* Ensures that every referred column name exists in the corresponding schema
*/
private static void validateIndexingConfig(@Nullable IndexingConfig indexingConfig, @Nullable Schema schema) {
if (indexingConfig == null || schema == null) {
icefury71 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
Map<String, String> columnNameToConfigMap = new HashMap<>();

if (indexingConfig.getBloomFilterColumns() != null) {
for (String columnName : indexingConfig.getBloomFilterColumns()) {
columnNameToConfigMap.put(columnName, "Bloom Filter Config");
}
}
if (indexingConfig.getInvertedIndexColumns() != null) {
for (String columnName : indexingConfig.getInvertedIndexColumns()) {
columnNameToConfigMap.put(columnName, "Inverted Index Config");
}
}
if (indexingConfig.getNoDictionaryColumns() != null) {
for (String columnName : indexingConfig.getNoDictionaryColumns()) {
columnNameToConfigMap.put(columnName, "No Dictionary Column Config");
}
}
if (indexingConfig.getOnHeapDictionaryColumns() != null) {
for (String columnName : indexingConfig.getOnHeapDictionaryColumns()) {
columnNameToConfigMap.put(columnName, "On Heap Dictionary Column Config");
}
}
if (indexingConfig.getRangeIndexColumns() != null) {
for (String columnName : indexingConfig.getRangeIndexColumns()) {
columnNameToConfigMap.put(columnName, "Range Column Config");
}
}
if (indexingConfig.getSortedColumn() != null) {
for (String columnName : indexingConfig.getSortedColumn()) {
columnNameToConfigMap.put(columnName, "Sorted Column Config");
}
}
if (indexingConfig.getVarLengthDictionaryColumns() != null) {
for (String columnName : indexingConfig.getVarLengthDictionaryColumns()) {
columnNameToConfigMap.put(columnName, "Var Length Column Config");
}
}
List<StarTreeIndexConfig> starTreeIndexConfigList = indexingConfig.getStarTreeIndexConfigs();
if (starTreeIndexConfigList != null) {
for (StarTreeIndexConfig starTreeIndexConfig : starTreeIndexConfigList) {
// Dimension split order cannot be null
for (String columnName : starTreeIndexConfig.getDimensionsSplitOrder()) {
columnNameToConfigMap.put(columnName, "StarTreeIndex Config");
}
// Function column pairs cannot be null
for (String functionColumnPair : starTreeIndexConfig.getFunctionColumnPairs()) {
AggregationFunctionColumnPair columnPair;
try {
columnPair = AggregationFunctionColumnPair.fromColumnName(functionColumnPair);
} catch (Exception e) {
throw new IllegalStateException("Invalid StarTreeIndex config: " + functionColumnPair + ". Must be"
+ "in the form <Aggregation function>__<Column name>");
}
String columnName = columnPair.getColumn();
if (!columnName.equals(AggregationFunctionColumnPair.STAR)) {
columnNameToConfigMap.put(columnName, "StarTreeIndex Config");
}
}
List<String> skipDimensionList = starTreeIndexConfig.getSkipStarNodeCreationForDimensions();
if (skipDimensionList != null) {
for (String columnName : skipDimensionList) {
columnNameToConfigMap.put(columnName, "StarTreeIndex Config");
}
}
}
}

icefury71 marked this conversation as resolved.
Show resolved Hide resolved
for (Map.Entry<String, String> entry : columnNameToConfigMap.entrySet()) {
String columnName = entry.getKey();
String configName = entry.getValue();
Preconditions.checkState(schema.getFieldSpecFor(columnName) != null,
"Column Name " + columnName + " defined in " + configName + " must be a valid column defined in the schema");
}
}

/**
* Validates the Field Config List in the given TableConfig
* Ensures that every referred column name exists in the corresponding schema
*/
private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList, @Nullable Schema schema) {
if (fieldConfigList == null || schema == null) {
return;
}

for (FieldConfig fieldConfig : fieldConfigList) {
String columnName = fieldConfig.getName();
Preconditions.checkState(schema.getFieldSpecFor(columnName) != null,
"Column Name " + columnName + " defined in field config list must be a valid column defined in the schema");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.pinot.core.util;

import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.Collections;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IngestionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
Expand Down Expand Up @@ -456,4 +459,155 @@ public void validateTierConfigs() {
// expected
}
}

@Test
public void testTableName() {
String[] malformedTableName = {"test.table", "test table"};
for (int i = 0; i < 2; i++) {
String tableName = malformedTableName[i];
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build();
try {
TableConfigUtils.validateTableName(tableConfig);
Assert.fail("Should fail for malformed table name : " + tableName);
} catch (IllegalStateException e) {
// expected
}
}
}

@Test
public void testValidateIndexingConfig() {
Schema schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.build();
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setBloomFilterColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Bloom filter column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInvertedIndexColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Inverted Index column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setNoDictionaryColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid No Dictionary column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setOnHeapDictionaryColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid On Heap Dictionary column name");
} catch (Exception e) {
// expected
}

tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setRangeIndexColumns(Arrays.asList("myCol2"))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Range Index column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setSortedColumn("myCol2").build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Sorted column name");
} catch (Exception e) {
// expected
}

tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setVarLengthDictionaryColumns(Arrays.asList("myCol2")).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid Var Length Dictionary column name");
} catch (Exception e) {
// expected
}

// Although this config makes no sense, it should pass the validation phase
StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"),
Arrays.asList("myCol"),
Arrays.asList("SUM__myCol"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
Assert.fail("Should fail for valid StarTreeIndex config column name");
// expected
}

starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol2"),
Arrays.asList("myCol"),
Arrays.asList("SUM__myCol"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid StarTreeIndex config column name in dimension split order");
} catch (Exception e) {
// expected
}

starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"),
Arrays.asList("myCol2"),
Arrays.asList("SUM__myCol"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid StarTreeIndex config column name in skip star node for dimension");
} catch (Exception e) {
// expected
}

starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"),
Arrays.asList("myCol"),
Arrays.asList("SUM__myCol2"),
1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid StarTreeIndex config column name in function column pair");
} catch (Exception e) {
// expected
}

FieldConfig fieldConfig = new FieldConfig("myCol2", null, null, null);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setFieldConfigList(Arrays.asList(fieldConfig)).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid column name in Field Config List");
} catch (Exception e) {
// expected
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@
],
"metricFieldSpecs": [
{
"name": "ArrDel15",
"name": "ActualElapsedTime",
"dataType": "INT"
},
{
Expand Down Expand Up @@ -316,4 +316,4 @@
"timeType": "DAYS"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class TableConfigBuilder {
private Map<String, String> _streamConfigs;
private SegmentPartitionConfig _segmentPartitionConfig;
private boolean _nullHandlingEnabled;
private List<String> _varLengthDictionaryColumns;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;

private TableCustomConfig _customConfig;
private QuotaConfig _quotaConfig;
Expand Down Expand Up @@ -247,6 +250,16 @@ public TableConfigBuilder setRangeIndexColumns(List<String> rangeIndexColumns) {
return this;
}

public TableConfigBuilder setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumns) {
_varLengthDictionaryColumns = varLengthDictionaryColumns;
return this;
}

public TableConfigBuilder setStarTreeIndexConfigs(List<StarTreeIndexConfig> starTreeIndexConfigs) {
_starTreeIndexConfigs = starTreeIndexConfigs;
return this;
}

public TableConfigBuilder setStreamConfigs(Map<String, String> streamConfigs) {
Preconditions.checkState(_tableType == TableType.REALTIME);
_streamConfigs = streamConfigs;
Expand Down Expand Up @@ -358,6 +371,8 @@ public TableConfig build() {
indexingConfig.setStreamConfigs(_streamConfigs);
indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);

if (_customConfig == null) {
_customConfig = new TableCustomConfig(null);
Expand Down