Skip to content

Commit

Permalink
Move table config into pinot-spi
Browse files Browse the repository at this point in the history
Motivation:
- Table config should be moved to pinot-spi so that user interface can access it (e.g. segment generation spec)

Changes:
- Make all configs POJO like, and Json serializable for backward-compatibility
- De-couple the Helix properties and utils from the configs
- Add the TableConfigSerDeTest to check all the serialization/de-serialization

Side changes:
- Refactor DataSizeUtils (from DataSize), integrate StorageQuotaChecker and HelixExternalViewBasedQueryQuotaManager with the POJO like QuotaConfig
- TextIndexConfigValidator is integrated into `TableConfigUtils.validate(TableConfig tableConfig)`

BACKWARD-INCOMPATIBILITY:
- TableConfig no longer support de-serialization from json string of nested json string (i.e. no `\"` inside the json)
  • Loading branch information
Jackie-Jiang committed Mar 29, 2020
1 parent 1f1baf8 commit 0e3bdaf
Show file tree
Hide file tree
Showing 238 changed files with 2,727 additions and 2,723 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import javax.ws.rs.core.Response;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.config.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;


@Api(tags = "Debug")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.spi.config.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
Expand All @@ -59,6 +58,7 @@
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -33,14 +32,15 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
import org.apache.pinot.common.config.QuotaConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.QuotaConfig;
import org.apache.pinot.spi.config.TableConfig;
import org.apache.pinot.spi.config.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -99,33 +99,11 @@ public void initOrUpdateTableQueryQuota(String tableNameWithType) {
*/
public void initTableQueryQuota(TableConfig tableConfig, ExternalView brokerResource) {
String tableNameWithType = tableConfig.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
LOGGER.info("Initializing rate limiter for table {}", tableNameWithType);

// Check whether qps quotas from both tables are the same.
QuotaConfig offlineQuotaConfig;
QuotaConfig realtimeQuotaConfig;
CommonConstants.Helix.TableType tableType = tableConfig.getTableType();
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
offlineQuotaConfig = tableConfig.getQuotaConfig();
realtimeQuotaConfig = getQuotaConfigFromPropertyStore(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
} else {
realtimeQuotaConfig = tableConfig.getQuotaConfig();
offlineQuotaConfig = getQuotaConfigFromPropertyStore(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName));
}
// Log a warning if MaxQueriesPerSecond are set different.
if ((offlineQuotaConfig != null && !Strings.isNullOrEmpty(offlineQuotaConfig.getMaxQueriesPerSecond())) && (
realtimeQuotaConfig != null && !Strings.isNullOrEmpty(realtimeQuotaConfig.getMaxQueriesPerSecond()))) {
if (!offlineQuotaConfig.getMaxQueriesPerSecond().equals(realtimeQuotaConfig.getMaxQueriesPerSecond())) {
LOGGER.warn(
"Attention! The values of MaxQueriesPerSecond for table {} are set different! Offline table qps quota: {}, Real-time table qps quota: {}",
rawTableName, offlineQuotaConfig.getMaxQueriesPerSecond(), realtimeQuotaConfig.getMaxQueriesPerSecond());
}
}

// Create rate limiter if query quota config is specified.
QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getMaxQueriesPerSecond())) {
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
LOGGER.info("No qps config specified for table: {}", tableNameWithType);
removeRateLimiter(tableNameWithType);
} else {
Expand Down Expand Up @@ -173,7 +151,7 @@ private QuotaConfig getQuotaConfigFromPropertyStore(String tableNameWithType) {
* @param quotaConfig quota config of the table.
*/
private void createRateLimiter(String tableNameWithType, ExternalView brokerResource, QuotaConfig quotaConfig) {
if (quotaConfig == null || Strings.isNullOrEmpty(quotaConfig.getMaxQueriesPerSecond())) {
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
LOGGER.info("No qps config specified for table: {}", tableNameWithType);
return;
}
Expand All @@ -200,14 +178,7 @@ private void createRateLimiter(String tableNameWithType, ExternalView brokerReso
LOGGER.info("The number of online brokers for table {} is {}", tableNameWithType, onlineCount);

// Get the dynamic rate
double overallRate;
if (quotaConfig.isMaxQueriesPerSecondValid()) {
overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond());
} else {
LOGGER.error("Failed to init qps quota: error when parsing qps quota: {} for table: {}",
quotaConfig.getMaxQueriesPerSecond(), tableNameWithType);
return;
}
double overallRate = quotaConfig.getMaxQPS();

// Get stat from property store
String tableConfigPath = constructTableConfigPath(tableNameWithType);
Expand Down Expand Up @@ -237,11 +208,11 @@ public boolean acquire(String tableName) {
QueryQuotaEntity offlineTableQueryQuotaEntity = null;
QueryQuotaEntity realtimeTableQueryQuotaEntity = null;

CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == TableType.OFFLINE) {
offlineTableName = tableName;
offlineTableQueryQuotaEntity = _rateLimiterMap.get(tableName);
} else if (tableType == CommonConstants.Helix.TableType.REALTIME) {
} else if (tableType == TableType.REALTIME) {
realtimeTableName = tableName;
realtimeTableQueryQuotaEntity = _rateLimiterMap.get(tableName);
} else {
Expand Down Expand Up @@ -358,14 +329,13 @@ public void processQueryQuotaChange(ExternalView currentBrokerResource) {
// Get latest quota config only if stat don't match.
if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) {
QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(tableNameWithType);
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null || !quotaConfig
.isMaxQueriesPerSecondValid()) {
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
LOGGER.info("No query quota config or the config is invalid for Table {}. Removing its rate limit.",
tableNameWithType);
it.remove();
continue;
}
overallRate = Double.parseDouble(quotaConfig.getMaxQueriesPerSecond());
overallRate = quotaConfig.getMaxQPS();
} else {
overallRate = queryQuotaEntity.getOverallRate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.metrics.BrokerMeter;
Expand All @@ -73,6 +72,8 @@
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.QueryOptions;
import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
import org.apache.pinot.spi.config.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -207,13 +208,13 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
// Get the tables hit by the request
String offlineTableName = null;
String realtimeTableName = null;
CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == TableType.OFFLINE) {
// Offline table
if (_routingManager.routingExists(tableName)) {
offlineTableName = tableName;
}
} else if (tableType == CommonConstants.Helix.TableType.REALTIME) {
} else if (tableType == TableType.REALTIME) {
// Realtime table
if (_routingManager.routingExists(tableName)) {
realtimeTableName = tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
Expand All @@ -44,6 +43,7 @@
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@
import org.apache.pinot.broker.routing.segmentselector.SegmentSelectorFactory;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryManager;
import org.apache.pinot.common.config.QueryConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
Expand All @@ -57,6 +54,9 @@
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.config.QueryConfig;
import org.apache.pinot.spi.config.TableConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.pinot.broker.routing.instanceselector;

import org.apache.pinot.common.config.RoutingConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.spi.config.RoutingConfig;
import org.apache.pinot.spi.config.TableConfig;
import org.apache.pinot.spi.config.TableType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import javax.annotation.Nullable;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.RoutingConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.spi.config.ColumnPartitionConfig;
import org.apache.pinot.spi.config.RoutingConfig;
import org.apache.pinot.spi.config.SegmentPartitionConfig;
import org.apache.pinot.spi.config.TableConfig;
import org.apache.pinot.spi.config.TableType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
* under the License.
*/
package org.apache.pinot.broker.routing.segmentselector;

import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.spi.config.TableConfig;
import org.apache.pinot.spi.config.TableType;


public class SegmentSelectorFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ExternalView;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.spi.config.TableConfig;
import org.apache.pinot.spi.config.TableType;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.config.TableConfig;
import org.apache.pinot.spi.config.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -83,11 +84,11 @@ public void setUp()
.addTime(TIME_COLUMN_NAME, TimeUnit.DAYS, FieldSpec.DataType.INT).build();
_helixResourceManager.addSchema(schema, true);
TableConfig offlineTableConfig =
new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
.setTimeType(TimeUnit.DAYS.name()).build();
_helixResourceManager.addTable(offlineTableConfig);
TableConfig realtimeTimeConfig =
new TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
.setTimeType(TimeUnit.DAYS.name()).
setStreamConfigs(getStreamConfigs()).build();
_helixResourceManager.addTable(realtimeTimeConfig);
Expand Down Expand Up @@ -154,7 +155,7 @@ public void testResourceAndTagAssignment()
String newRawTableName = "newTable";
String newOfflineTableName = TableNameBuilder.OFFLINE.tableNameWithType(newRawTableName);
TableConfig newTableConfig =
new TableConfig.Builder(TableType.OFFLINE).setTableName(newRawTableName).setBrokerTenant("testBroker").build();
new TableConfigBuilder(TableType.OFFLINE).setTableName(newRawTableName).setBrokerTenant("testBroker").build();
_helixResourceManager.addTable(newTableConfig);

// Broker tenant should be overridden to DefaultTenant
Expand Down

0 comments on commit 0e3bdaf

Please sign in to comment.