Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void onBecomeOnlineFromOffline(Message message, NotificationContext conte
try {
if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) {
_routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable);
_queryQuotaManager.initOrUpdateLogicalTableQueryQuota(physicalOrLogicalTable);
} else {
_routingManager.buildRouting(physicalOrLogicalTable);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private class RefreshLogicalTableConfigMessageHandler extends MessageHandler {
@Override
public HelixTaskResult handleMessage() {
_routingManager.buildRoutingForLogicalTable(_logicalTableName);
_queryQuotaManager.initOrUpdateLogicalTableQueryQuota(_logicalTableName);
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -175,6 +176,20 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
}
}

public void initOrUpdateLogicalTableQueryQuota(String logicalTableName) {
LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting - is this called in the query path as well ? I understand that the same call is used in initOrUpdateTableQueryQuota

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This initOrUpdateLogicalTableQueryQuota is not part of query path. Even initOrUpdateTableQueryQuota is also in the query path.

if (logicalTableConfig == null) {
LOGGER.info("No query quota to update since logical table config is null");
return;
}

LOGGER.info("Initializing rate limiter for logical table {}", logicalTableName);

ExternalView brokerResourceEV = getBrokerResource();
Stat stat = _propertyStore.getStat(constructLogicalTableConfigPath(logicalTableName), AccessOption.PERSISTENT);
createOrUpdateRateLimiter(logicalTableName, brokerResourceEV, logicalTableConfig.getQuotaConfig(), stat);
}

public void initOrUpdateTableQueryQuota(String tableNameWithType) {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
ExternalView brokerResourceEV = getBrokerResource();
Expand All @@ -195,24 +210,25 @@ public void initOrUpdateTableQueryQuota(TableConfig tableConfig, ExternalView br
LOGGER.info("Initializing rate limiter for table {}", tableNameWithType);

// Create rate limiter if query quota config is specified.
createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, tableConfig.getQuotaConfig());
Stat stat = _propertyStore.getStat(constructTableConfigPath(tableNameWithType), AccessOption.PERSISTENT);
createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, tableConfig.getQuotaConfig(), stat);
}

/**
* Drop table query quota.
* @param tableNameWithType table name with type.
* @param physicalOrLogicalTable physical or logical table name.
*/
public void dropTableQueryQuota(String tableNameWithType) {
LOGGER.info("Dropping rate limiter for table {}", tableNameWithType);
removeRateLimiter(tableNameWithType);
public void dropTableQueryQuota(String physicalOrLogicalTable) {
LOGGER.info("Dropping rate limiter for table {}", physicalOrLogicalTable);
removeRateLimiter(physicalOrLogicalTable);
}

/** Remove or update rate limiter if another table with the same raw table name but different type is still using
* the quota config.
* @param tableNameWithType table name with type
* @param physicalOrLogicalTable physical or logical table name.
*/
private void removeRateLimiter(String tableNameWithType) {
_rateLimiterMap.remove(tableNameWithType);
private void removeRateLimiter(String physicalOrLogicalTable) {
_rateLimiterMap.remove(physicalOrLogicalTable);
}

/**
Expand All @@ -230,26 +246,27 @@ private QuotaConfig getQuotaConfigFromPropertyStore(String tableNameWithType) {

/**
* Create or update a rate limiter for a table.
* @param tableNameWithType table name with table type.
* @param physicalOrLogicalTableName physical or logical table name.
* @param brokerResource broker resource which stores all the broker states of each table.
* @param quotaConfig quota config of the table.
* @param tableStat stat of the table config.
*/
private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView brokerResource,
QuotaConfig quotaConfig) {
private void createOrUpdateRateLimiter(String physicalOrLogicalTableName, ExternalView brokerResource,
QuotaConfig quotaConfig, Stat tableStat) {
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
LOGGER.info("No qps config specified for table: {}", tableNameWithType);
buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType);
LOGGER.info("No qps config specified for table: {}", physicalOrLogicalTableName);
buildEmptyOrResetRateLimiterInQueryQuotaEntity(physicalOrLogicalTableName);
return;
}

if (brokerResource == null) {
LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", tableNameWithType);
LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", physicalOrLogicalTableName);
// It could be possible that brokerResourceEV is null due to ZK connection issue.
// In this case, the rate limiter should not be reset. Simply exit the method would be sufficient.
return;
}

Map<String, String> stateMap = brokerResource.getStateMap(tableNameWithType);
Map<String, String> stateMap = brokerResource.getStateMap(physicalOrLogicalTableName);
int otherOnlineBrokerCount = 0;

// If stateMap is null, that means this broker is the first broker for this table.
Expand All @@ -263,26 +280,22 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br
}

int onlineCount = otherOnlineBrokerCount + 1;
LOGGER.info("The number of online brokers for table {} is {}", tableNameWithType, onlineCount);
LOGGER.info("The number of online brokers for table {} is {}", physicalOrLogicalTableName, onlineCount);

// Get the dynamic rate
double overallRate = quotaConfig.getMaxQPS();

// Get stat from property store
String tableConfigPath = constructTableConfigPath(tableNameWithType);
Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT);
double perBrokerRate = overallRate / onlineCount;

QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(physicalOrLogicalTableName);
if (queryQuotaEntity == null) {
queryQuotaEntity =
new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, stat.getVersion());
_rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, tableStat.getVersion());
_rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity);
LOGGER.info(
"Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online "
+ "broker instances: {}. Table config stat version: {}", tableNameWithType, overallRate, perBrokerRate,
onlineCount, stat.getVersion());
+ "broker instances: {}. Table config stat version: {}", physicalOrLogicalTableName, overallRate,
perBrokerRate, onlineCount, tableStat.getVersion());
} else {
RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
double previousRate = -1;
Expand All @@ -297,14 +310,14 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br
}
queryQuotaEntity.setNumOnlineBrokers(onlineCount);
queryQuotaEntity.setOverallRate(overallRate);
queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
queryQuotaEntity.setTableConfigStatVersion(tableStat.getVersion());
LOGGER.info(
"Rate limiter for table: {} has been updated. Overall rate: {}. Previous per-broker rate: {}. New "
+ "per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}",
tableNameWithType, overallRate, previousRate, perBrokerRate, onlineCount, stat.getVersion());
physicalOrLogicalTableName, overallRate, previousRate, perBrokerRate, onlineCount, tableStat.getVersion());
}
addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity);
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity);
addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity);
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity);
if (isQueryRateLimitDisabled()) {
LOGGER.info("Query rate limiting is currently disabled for this broker. So it won't take effect immediately.");
}
Expand Down Expand Up @@ -536,19 +549,19 @@ private void buildEmptyOrResetApplicationRateLimiter(String applicationName) {
* Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query
* quota entity.
*/
private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String tableNameWithType) {
QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String physicalOrLogicalTableName) {
QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(physicalOrLogicalTableName);
if (queryQuotaEntity == null) {
// Create an QueryQuotaEntity object without setting a rate limiter.
queryQuotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
_rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
_rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity);
} else {
// Set rate limiter to null for an existing QueryQuotaEntity object.
queryQuotaEntity.setRateLimiter(null);
}
addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity);
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity);
addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity);
addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity);
}

/**
Expand Down Expand Up @@ -680,6 +693,16 @@ public boolean acquire(String tableName) {
return offlineQuotaOk && realtimeQuotaOk;
}

@Override
public boolean acquireLogicalTable(String logicalTableName) {
QueryQuotaEntity logicalTableQueryQuotaEntity = _rateLimiterMap.get(logicalTableName);
if (logicalTableQueryQuotaEntity != null) {
LOGGER.debug("Trying to acquire token for logical table: {}", logicalTableName);
return tryAcquireToken(logicalTableName, logicalTableQueryQuotaEntity);
}
return true;
}

/**
* Try to acquire token from rate limiter. Emit the utilization of the qps quota if broker metric isn't null.
* @param resourceName resource name to acquire.
Expand Down Expand Up @@ -749,17 +772,17 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
int numRebuilt = 0;
for (Iterator<Map.Entry<String, QueryQuotaEntity>> it = _rateLimiterMap.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, QueryQuotaEntity> entry = it.next();
String tableNameWithType = entry.getKey();
String physicalOrLogicalTableName = entry.getKey();
QueryQuotaEntity queryQuotaEntity = entry.getValue();
if (queryQuotaEntity.getRateLimiter() == null) {
// No rate limiter set, skip this table.
continue;
}

// Get number of online brokers.
Map<String, String> stateMap = currentBrokerResourceEV.getStateMap(tableNameWithType);
Map<String, String> stateMap = currentBrokerResourceEV.getStateMap(physicalOrLogicalTableName);
if (stateMap == null) {
LOGGER.info("No broker resource for Table {}. Removing its rate limit.", tableNameWithType);
LOGGER.info("No broker resource for Table {}. Removing its rate limit.", physicalOrLogicalTableName);
it.remove();
continue;
}
Expand All @@ -773,10 +796,14 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
int onlineBrokerCount = otherOnlineBrokerCount + 1;

// Get stat from property store
String tableConfigPath = constructTableConfigPath(tableNameWithType);
Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT);
String physicalOrLogicalTableConfigPath =
ZKMetadataProvider.isTableConfigExists(_propertyStore, physicalOrLogicalTableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this in the query path ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is called here.

  public void processClusterChange(HelixConstants.ChangeType changeType) {
    .....
    if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
      ExternalView brokerResourceEV = getBrokerResource();
      processQueryRateLimitingExternalViewChange(brokerResourceEV);
    }
    ....

? constructTableConfigPath(physicalOrLogicalTableName)
: constructLogicalTableConfigPath(physicalOrLogicalTableName);
Stat stat = _propertyStore.getStat(physicalOrLogicalTableConfigPath, AccessOption.PERSISTENT);
if (stat == null) {
LOGGER.info("Table {} has been deleted from property store. Removing its rate limit.", tableNameWithType);
LOGGER.info("Table {} has been deleted from property store. Removing its rate limit.",
physicalOrLogicalTableName);
it.remove();
continue;
}
Expand All @@ -790,10 +817,10 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
double overallRate;
// Get latest quota config only if stat don't match.
if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) {
QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(tableNameWithType);
QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(physicalOrLogicalTableName);
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
LOGGER.info("No query quota config or the config is invalid for Table {}. Removing its rate limit.",
tableNameWithType);
physicalOrLogicalTableName);
it.remove();
continue;
}
Expand All @@ -810,7 +837,7 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
LOGGER.info("Rate limiter for table: {} has been updated. Overall rate: {}. Previous per-broker rate: {}. New "
+ "per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}.",
tableNameWithType, overallRate, previousRate, latestRate, onlineBrokerCount, stat.getVersion());
physicalOrLogicalTableName, overallRate, previousRate, latestRate, onlineBrokerCount, stat.getVersion());
numRebuilt++;
}
}
Expand Down Expand Up @@ -931,4 +958,8 @@ public boolean isQueryRateLimitDisabled() {
private String constructTableConfigPath(String tableNameWithType) {
return "/CONFIGS/TABLE/" + tableNameWithType;
}

private String constructLogicalTableConfigPath(String tableName) {
return "/LOGICAL/TABLE/" + tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public interface QueryQuotaManager {
*/
boolean acquire(String tableName);

/**
* Try to acquire a quota for the given logical table.
* @param logicalTableName Logical table name
* @return {@code true} if the table quota has not been reached, {@code false} otherwise
*/
boolean acquireLogicalTable(String logicalTableName);

/**
* Try to acquire a quota for the given database.
* @param databaseName database name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ protected void updatePhaseTimingForTables(Set<String> tableNames, BrokerQueryPha
* @return true if the query was successfully cancelled, false otherwise.
*/
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception;
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception;

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
// Compile the request into PinotQuery
long compilationStartTimeNs = System.nanoTime();
CompileResult compileResult =
compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders,
accessControl);
compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders,
accessControl);

if (compileResult._errorOrLiteralOnlyBrokerResponse != null) {
/*
Expand Down Expand Up @@ -406,8 +406,17 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn
}

// Validate QPS
if (hasExceededQPSQuota(database, physicalTableNames, requestContext)) {
String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query);
if (!_queryQuotaManager.acquireDatabase(database)) {
String errorMessage =
String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database);
LOGGER.info(errorMessage);
requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage);
}
if (!_queryQuotaManager.acquireLogicalTable(tableName)) {
String errorMessage =
String.format("Request %d: %s exceeds query quota for table: %s.", requestId, query, tableName);
requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage);
}

Expand Down Expand Up @@ -815,9 +824,9 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt
if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) {
return new CompileResult(new BrokerResponseNative(QueryErrorCode.SQL_PARSING,
"It seems that the query is only supported by the multi-stage query engine, please retry the query "
+ "using "
+ "the multi-stage query engine "
+ "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"));
+ "using "
+ "the multi-stage query engine "
+ "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"));
} else {
return new CompileResult(
new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,8 @@ public static boolean isLogicalTableExists(ZkHelixPropertyStore<ZNRecord> proper
return propertyStore.exists(constructPropertyStorePathForLogical(tableName), AccessOption.PERSISTENT);
}

public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) {
return propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName), AccessOption.PERSISTENT);
public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) {
return propertyStore.exists(constructPropertyStorePathForResourceConfig(tableNameWithType),
AccessOption.PERSISTENT);
}
}
Loading
Loading