Skip to content

Commit

Permalink
Creating a pluggable interface for Table config tuner (#6255)
Browse files Browse the repository at this point in the history
* Creating a pluggable interface for indexing config resolution

* Adding a simple auto tune indexing config resolver

* Adding ability to deregister indexing config resolver

* Enabling dynamically registered TableConfigTuner methods

* - Adding a TableConfigTuner interface
 - Adding a Tuner annotation type for auto-discovering such tuners

* Bug fix in TunerConfig. Adding serde test for TunerConfig
  • Loading branch information
icefury71 committed Dec 11, 2020
1 parent 75f9fd3 commit f2c37d5
Show file tree
Hide file tree
Showing 14 changed files with 508 additions and 3 deletions.
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.config.tuner;

import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TunerConfig;
import org.apache.pinot.spi.config.table.tuner.TableConfigTuner;
import org.apache.pinot.spi.config.table.tuner.Tuner;
import org.apache.pinot.spi.data.Schema;


@Tuner(name = "noopConfigTuner")
public class NoOpTableTableConfigTuner implements TableConfigTuner {
@Override
public void init(TunerConfig props, Schema schema) {
}

@Override
public TableConfig apply(TableConfig initialConfig) {
return initialConfig;
}
}
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.config.tuner;

import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TunerConfig;
import org.apache.pinot.spi.config.table.tuner.TableConfigTuner;
import org.apache.pinot.spi.config.table.tuner.Tuner;
import org.apache.pinot.spi.data.Schema;


/**
* Used to auto-tune the table indexing config. It takes the original table
* config, table schema and adds the following to indexing config:
* - Inverted indices for all dimensions
* - No dictionary index for all metrics
*/
@Tuner(name = "realtimeAutoIndexTuner")
public class RealTimeAutoIndexTuner implements TableConfigTuner {
private Schema _schema;

@Override
public void init(TunerConfig props, Schema schema) {
_schema = schema;
}

@Override
public TableConfig apply(TableConfig initialConfig) {
IndexingConfig initialIndexingConfig = initialConfig.getIndexingConfig();
initialIndexingConfig.setInvertedIndexColumns(_schema.getDimensionNames());
initialIndexingConfig.setNoDictionaryColumns(_schema.getMetricNames());
return initialConfig;
}
}
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.config.tuner;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.config.table.tuner.TableConfigTuner;
import org.apache.pinot.spi.config.table.tuner.Tuner;
import org.reflections.Reflections;
import org.reflections.scanners.ResourcesScanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.scanners.TypeAnnotationsScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.reflections.util.FilterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Helper class to dynamically register all annotated {@link Tuner} methods
*/
public class TableConfigTunerRegistry {
private TableConfigTunerRegistry() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigTunerRegistry.class);
private static final Map<String, TableConfigTuner> _configTunerMap = new HashMap<>();

static {
Reflections reflections = new Reflections(
new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
.filterInputsBy(new FilterBuilder.Include(".*\\.tuner\\..*"))
.setScanners(new ResourcesScanner(), new TypeAnnotationsScanner(), new SubTypesScanner()));
Set<Class<?>> classes = reflections.getTypesAnnotatedWith(Tuner.class);
classes.forEach(tunerClass -> {
Tuner tunerAnnotation = tunerClass.getAnnotation(Tuner.class);
if (tunerAnnotation.enabled()) {
if (tunerAnnotation.name().isEmpty()) {
LOGGER.error("Cannot register an unnamed config tuner for annotation {} ", tunerAnnotation.toString());
} else {
String tunerName = tunerAnnotation.name();
TableConfigTuner tuner;
try {
tuner = (TableConfigTuner) tunerClass.newInstance();
_configTunerMap.putIfAbsent(tunerName, tuner);
} catch (Exception e) {
LOGGER.error(String.format("Unable to register tuner %s . Cannot instantiate.", tunerName), e);
}
}
}
});
LOGGER.info("Initialized TableConfigTunerRegistry with {} tuners: {}", _configTunerMap.size(),
_configTunerMap.keySet());
}

public static TableConfigTuner getTuner(String name) {
return _configTunerMap.get(name);
}
}
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.TunerConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
Expand Down Expand Up @@ -139,9 +140,15 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
});
}

TunerConfig tunerConfig = null;
String tunerConfigString = simpleFields.get(TableConfig.TUNER_CONFIG);
if (tunerConfigString != null) {
tunerConfig = JsonUtils.stringToObject(tunerConfigString, TunerConfig.class);
}

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
ingestionConfig, tierConfigList, isDimTable);
ingestionConfig, tierConfigList, isDimTable, tunerConfig);
}

public static ZNRecord toZNRecord(TableConfig tableConfig)
Expand Down Expand Up @@ -196,6 +203,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
if (tierConfigList != null) {
simpleFields.put(TableConfig.TIER_CONFIGS_LIST_KEY, JsonUtils.objectToString(tierConfigList));
}
TunerConfig tunerConfig = tableConfig.getTunerConfig();
if (tunerConfig != null) {
simpleFields.put(TableConfig.TUNER_CONFIG, JsonUtils.objectToString(tunerConfig));
}

ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
znRecord.setSimpleFields(simpleFields);
Expand Down
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.config.tuner;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TunerConfig;
import org.apache.pinot.spi.config.table.tuner.TableConfigTuner;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class RealTimeAutoIndexTunerTest {

private static final String TABLE_NAME = "test_table";
private static final String TUNER_NAME = "realtimeAutoIndexTuner";
private TunerConfig _tunerConfig;
private Schema schema;
private String dimensionColumns[] = {"col1", "col2"};
private String metricColumns[] = {"count"};

@BeforeClass
public void setup() {
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addSingleValueDimension(dimensionColumns[0], FieldSpec.DataType.STRING)
.addSingleValueDimension(dimensionColumns[1], FieldSpec.DataType.STRING)
.addMetric(metricColumns[0], FieldSpec.DataType.INT).build();
Map<String, String> props = new HashMap<>();
_tunerConfig = new TunerConfig(TUNER_NAME, props);
}

@Test
public void testTuner() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTunerConfig(_tunerConfig).build();
TableConfigTuner tuner = TableConfigTunerRegistry.getTuner(TUNER_NAME);
tuner.init(_tunerConfig, schema);
TableConfig result = tuner.apply(tableConfig);

IndexingConfig newConfig = result.getIndexingConfig();
List<String> invertedIndexColumns = newConfig.getInvertedIndexColumns();
Assert.assertTrue(invertedIndexColumns.size() == 2);
for (int i = 0; i < dimensionColumns.length; i++) {
Assert.assertTrue(invertedIndexColumns.contains(dimensionColumns[i]));
}

List<String> noDictionaryColumns = newConfig.getNoDictionaryColumns();
Assert.assertTrue(noDictionaryColumns.size() == 1);
Assert.assertEquals(noDictionaryColumns.get(0), metricColumns[0]);
}
}
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.config.tuner;

import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TunerConfig;
import org.apache.pinot.spi.config.table.tuner.TableConfigTuner;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TunerRegistryTest {

private static final String TUNER_NAME = "noopConfigTuner";
private static TunerConfig _tunerConfig;

@BeforeClass
public void setup() {
Map<String, String> props = new HashMap<>();
_tunerConfig = new TunerConfig(TUNER_NAME, props);
}

@Test
public void testNoOpTableConfigTuner() {
Schema schema = new Schema.SchemaBuilder().build();
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTunerConfig(_tunerConfig).build();
TableConfigTuner tuner = TableConfigTunerRegistry.getTuner(TUNER_NAME);
tuner.init(_tunerConfig, schema);
TableConfig result = tuner.apply(tableConfig);
Assert.assertEquals(result, tableConfig);
}
}
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.TunerConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
Expand Down Expand Up @@ -308,6 +309,27 @@ public void testSerDe()
assertEquals(tableConfigToCompare, tableConfig);
checkTierConfigList(tableConfigToCompare);
}
{
// With tuner config
String name = "testTuner";
Map<String, String> props = new HashMap<>();
props.put("key", "value");
TunerConfig tunerConfig = new TunerConfig(name, props);
TableConfig tableConfig = tableConfigBuilder.setTunerConfig(tunerConfig).build();

// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
TunerConfig tunerConfigToCompare = tableConfigToCompare.getTunerConfig();
assertEquals(tunerConfigToCompare.getName(), name);
assertEquals(tunerConfigToCompare.getTunerProperties(), props);

tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
tunerConfigToCompare = tableConfigToCompare.getTunerConfig();
assertEquals(tunerConfigToCompare.getName(), name);
assertEquals(tunerConfigToCompare.getTunerProperties(), props);
}
}

private void checkSegmentsValidationAndRetentionConfig(TableConfig tableConfig) {
Expand Down

0 comments on commit f2c37d5

Please sign in to comment.