From 61de01bd529fd37b26a33ad4dc113e1d3ce0aa76 Mon Sep 17 00:00:00 2001 From: abhishekbafna Date: Tue, 20 May 2025 12:01:57 +0530 Subject: [PATCH 1/4] Logical table query quota enforcement. --- ...esourceOnlineOfflineStateModelFactory.java | 21 ++-- ...rokerUserDefinedMessageHandlerFactory.java | 1 + ...lixExternalViewBasedQueryQuotaManager.java | 102 ++++++++++++------ .../BaseBrokerRequestHandler.java | 3 +- .../BaseSingleStageBrokerRequestHandler.java | 2 +- .../common/metadata/ZKMetadataProvider.java | 5 +- .../helix/ControllerRequestClient.java | 23 ++++ .../helix/core/PinotHelixResourceManager.java | 9 +- .../controller/helix/ControllerTest.java | 12 ++- .../QueryQuotaClusterIntegrationTest.java | 94 +++++++++++++++- .../BaseLogicalTableIntegrationTest.java | 35 +++--- ...fflineOneRealtimeTableIntegrationTest.java | 2 +- 12 files changed, 226 insertions(+), 83 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java index dcf8a667e15f..fd7a812a7f71 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java @@ -80,6 +80,7 @@ public void onBecomeOnlineFromOffline(Message message, NotificationContext conte try { if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable); + _queryQuotaManager.initOrUpdateLogicalTableQueryQuota(physicalOrLogicalTable); } else { _routingManager.buildRouting(physicalOrLogicalTable); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable); @@ -97,14 +98,14 @@ public void onBecomeOnlineFromOffline(Message message, NotificationContext conte @Transition(from = "ONLINE", to = "OFFLINE") public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - String tableNameWithType = message.getPartitionName(); - LOGGER.info("Processing transition from ONLINE to OFFLINE for table: {}", tableNameWithType); + String physicalOrLogicalTable = message.getPartitionName(); + LOGGER.info("Processing transition from ONLINE to OFFLINE for table: {}", physicalOrLogicalTable); try { - _routingManager.removeRouting(tableNameWithType); - _queryQuotaManager.dropTableQueryQuota(tableNameWithType); + _routingManager.removeRouting(physicalOrLogicalTable); + _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable); } catch (Exception e) { LOGGER.error("Caught exception while processing transition from ONLINE to OFFLINE for table: {}", - tableNameWithType, e); + physicalOrLogicalTable, e); throw e; } } @@ -116,14 +117,14 @@ public void onBecomeDroppedFromOffline(Message message, NotificationContext cont @Transition(from = "ONLINE", to = "DROPPED") public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { - String tableNameWithType = message.getPartitionName(); - LOGGER.info("Processing transition from ONLINE to DROPPED for table: {}", tableNameWithType); + String physicalOrLogicalTable = message.getPartitionName(); + LOGGER.info("Processing transition from ONLINE to DROPPED for table: {}", physicalOrLogicalTable); try { - _routingManager.removeRouting(tableNameWithType); - _queryQuotaManager.dropTableQueryQuota(tableNameWithType); + _routingManager.removeRouting(physicalOrLogicalTable); + _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable); } catch (Exception e) { LOGGER.error("Caught exception while processing transition from ONLINE to DROPPED for table: {}", - tableNameWithType, e); + physicalOrLogicalTable, e); throw e; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java index 033a126ea640..81ea3d0d4f1b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java @@ -154,6 +154,7 @@ private class RefreshLogicalTableConfigMessageHandler extends MessageHandler { @Override public HelixTaskResult handleMessage() { _routingManager.buildRoutingForLogicalTable(_logicalTableName); + _queryQuotaManager.initOrUpdateLogicalTableQueryQuota(_logicalTableName); HelixTaskResult result = new HelixTaskResult(); result.setSuccess(true); return result; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 925fbc4860eb..db6f68790f61 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -49,6 +49,7 @@ import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.zookeeper.data.Stat; @@ -175,6 +176,20 @@ public void processClusterChange(HelixConstants.ChangeType changeType) { } } + public void initOrUpdateLogicalTableQueryQuota(String logicalTableName) { + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + if (logicalTableConfig == null) { + LOGGER.info("No query quota to update since logical table config is null"); + return; + } + + LOGGER.info("Initializing rate limiter for logical table {}", logicalTableName); + + ExternalView brokerResourceEV = getBrokerResource(); + Stat stat = _propertyStore.getStat(constructLogicalTableConfigPath(logicalTableName), AccessOption.PERSISTENT); + createOrUpdateRateLimiter(logicalTableName, brokerResourceEV, logicalTableConfig.getQuotaConfig(), stat); + } + public void initOrUpdateTableQueryQuota(String tableNameWithType) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); ExternalView brokerResourceEV = getBrokerResource(); @@ -195,24 +210,25 @@ public void initOrUpdateTableQueryQuota(TableConfig tableConfig, ExternalView br LOGGER.info("Initializing rate limiter for table {}", tableNameWithType); // Create rate limiter if query quota config is specified. - createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, tableConfig.getQuotaConfig()); + Stat stat = _propertyStore.getStat(constructTableConfigPath(tableNameWithType), AccessOption.PERSISTENT); + createOrUpdateRateLimiter(tableNameWithType, brokerResourceEV, tableConfig.getQuotaConfig(), stat); } /** * Drop table query quota. - * @param tableNameWithType table name with type. + * @param physicalOrLogicalTable physical or logical table name. */ - public void dropTableQueryQuota(String tableNameWithType) { - LOGGER.info("Dropping rate limiter for table {}", tableNameWithType); - removeRateLimiter(tableNameWithType); + public void dropTableQueryQuota(String physicalOrLogicalTable) { + LOGGER.info("Dropping rate limiter for table {}", physicalOrLogicalTable); + removeRateLimiter(physicalOrLogicalTable); } /** Remove or update rate limiter if another table with the same raw table name but different type is still using * the quota config. - * @param tableNameWithType table name with type + * @param physicalOrLogicalTable physical or logical table name. */ - private void removeRateLimiter(String tableNameWithType) { - _rateLimiterMap.remove(tableNameWithType); + private void removeRateLimiter(String physicalOrLogicalTable) { + _rateLimiterMap.remove(physicalOrLogicalTable); } /** @@ -233,9 +249,10 @@ private QuotaConfig getQuotaConfigFromPropertyStore(String tableNameWithType) { * @param tableNameWithType table name with table type. * @param brokerResource broker resource which stores all the broker states of each table. * @param quotaConfig quota config of the table. + * @param tableStat stat of the table config. */ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView brokerResource, - QuotaConfig quotaConfig) { + QuotaConfig quotaConfig, Stat tableStat) { if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { LOGGER.info("No qps config specified for table: {}", tableNameWithType); buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType); @@ -267,22 +284,18 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br // Get the dynamic rate double overallRate = quotaConfig.getMaxQPS(); - - // Get stat from property store - String tableConfigPath = constructTableConfigPath(tableNameWithType); - Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); double perBrokerRate = overallRate / onlineCount; QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType); if (queryQuotaEntity == null) { queryQuotaEntity = new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), - new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, stat.getVersion()); + new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, tableStat.getVersion()); _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); LOGGER.info( "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online " + "broker instances: {}. Table config stat version: {}", tableNameWithType, overallRate, perBrokerRate, - onlineCount, stat.getVersion()); + onlineCount, tableStat.getVersion()); } else { RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter(); double previousRate = -1; @@ -297,11 +310,11 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } queryQuotaEntity.setNumOnlineBrokers(onlineCount); queryQuotaEntity.setOverallRate(overallRate); - queryQuotaEntity.setTableConfigStatVersion(stat.getVersion()); + queryQuotaEntity.setTableConfigStatVersion(tableStat.getVersion()); LOGGER.info( "Rate limiter for table: {} has been updated. Overall rate: {}. Previous per-broker rate: {}. New " + "per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}", - tableNameWithType, overallRate, previousRate, perBrokerRate, onlineCount, stat.getVersion()); + tableNameWithType, overallRate, previousRate, perBrokerRate, onlineCount, tableStat.getVersion()); } addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); @@ -536,19 +549,19 @@ private void buildEmptyOrResetApplicationRateLimiter(String applicationName) { * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query * quota entity. */ - private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String tableNameWithType) { - QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType); + private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String physicalOrLogicalTableName) { + QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(physicalOrLogicalTableName); if (queryQuotaEntity == null) { // Create an QueryQuotaEntity object without setting a rate limiter. queryQuotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0); - _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); + _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity); } else { // Set rate limiter to null for an existing QueryQuotaEntity object. queryQuotaEntity.setRateLimiter(null); } - addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); - addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); + addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); + addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); } /** @@ -647,6 +660,15 @@ public boolean acquire(String tableName) { if (isQueryRateLimitDisabled()) { return true; } + + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { + return acquireForLogicalTable(tableName); + } else { + return acquireForPhysicalTable(tableName); + } + } + + private boolean acquireForPhysicalTable(String tableName) { String offlineTableName = null; String realtimeTableName = null; QueryQuotaEntity offlineTableQueryQuotaEntity = null; @@ -676,10 +698,18 @@ public boolean acquire(String tableName) { LOGGER.debug("Trying to acquire token for table: {}", realtimeTableName); realtimeQuotaOk = tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity); } - return offlineQuotaOk && realtimeQuotaOk; } + private boolean acquireForLogicalTable(String tableName) { + QueryQuotaEntity logicalTableQueryQuotaEntity = _rateLimiterMap.get(tableName); + if (logicalTableQueryQuotaEntity != null) { + LOGGER.debug("Trying to acquire token for logical table: {}", tableName); + return tryAcquireToken(tableName, logicalTableQueryQuotaEntity); + } + return true; + } + /** * Try to acquire token from rate limiter. Emit the utilization of the qps quota if broker metric isn't null. * @param resourceName resource name to acquire. @@ -749,7 +779,7 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke int numRebuilt = 0; for (Iterator> it = _rateLimiterMap.entrySet().iterator(); it.hasNext(); ) { Map.Entry entry = it.next(); - String tableNameWithType = entry.getKey(); + String physicalOrLogicalTableName = entry.getKey(); QueryQuotaEntity queryQuotaEntity = entry.getValue(); if (queryQuotaEntity.getRateLimiter() == null) { // No rate limiter set, skip this table. @@ -757,9 +787,9 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke } // Get number of online brokers. - Map stateMap = currentBrokerResourceEV.getStateMap(tableNameWithType); + Map stateMap = currentBrokerResourceEV.getStateMap(physicalOrLogicalTableName); if (stateMap == null) { - LOGGER.info("No broker resource for Table {}. Removing its rate limit.", tableNameWithType); + LOGGER.info("No broker resource for Table {}. Removing its rate limit.", physicalOrLogicalTableName); it.remove(); continue; } @@ -773,10 +803,14 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke int onlineBrokerCount = otherOnlineBrokerCount + 1; // Get stat from property store - String tableConfigPath = constructTableConfigPath(tableNameWithType); - Stat stat = _propertyStore.getStat(tableConfigPath, AccessOption.PERSISTENT); + String physicalOrLogicalTableConfigPath = + ZKMetadataProvider.isTableConfigExists(_propertyStore, physicalOrLogicalTableName) + ? constructTableConfigPath(physicalOrLogicalTableName) + : constructLogicalTableConfigPath(physicalOrLogicalTableName); + Stat stat = _propertyStore.getStat(physicalOrLogicalTableConfigPath, AccessOption.PERSISTENT); if (stat == null) { - LOGGER.info("Table {} has been deleted from property store. Removing its rate limit.", tableNameWithType); + LOGGER.info("Table {} has been deleted from property store. Removing its rate limit.", + physicalOrLogicalTableName); it.remove(); continue; } @@ -790,10 +824,10 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke double overallRate; // Get latest quota config only if stat don't match. if (stat.getVersion() != queryQuotaEntity.getTableConfigStatVersion()) { - QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(tableNameWithType); + QuotaConfig quotaConfig = getQuotaConfigFromPropertyStore(physicalOrLogicalTableName); if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { LOGGER.info("No query quota config or the config is invalid for Table {}. Removing its rate limit.", - tableNameWithType); + physicalOrLogicalTableName); it.remove(); continue; } @@ -810,7 +844,7 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke queryQuotaEntity.setTableConfigStatVersion(stat.getVersion()); LOGGER.info("Rate limiter for table: {} has been updated. Overall rate: {}. Previous per-broker rate: {}. New " + "per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}.", - tableNameWithType, overallRate, previousRate, latestRate, onlineBrokerCount, stat.getVersion()); + physicalOrLogicalTableName, overallRate, previousRate, latestRate, onlineBrokerCount, stat.getVersion()); numRebuilt++; } } @@ -931,4 +965,8 @@ public boolean isQueryRateLimitDisabled() { private String constructTableConfigPath(String tableNameWithType) { return "/CONFIGS/TABLE/" + tableNameWithType; } + + private String constructLogicalTableConfigPath(String tableName) { + return "/LOGICAL/TABLE/" + tableName; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 69b27fe65a6a..f9bfe311c1fc 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -285,7 +285,8 @@ protected void updatePhaseTimingForTables(Set tableNames, BrokerQueryPha * @return true if the query was successfully cancelled, false otherwise. */ protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, - HttpClientConnectionManager connMgr, Map serverResponses) throws Exception; + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception; protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { statistics.setNumRowsResultSet(response.getNumRowsResultSet()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index fd880bdfc602..d7776f987e04 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -406,7 +406,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn } // Validate QPS - if (hasExceededQPSQuota(database, physicalTableNames, requestContext)) { + if (hasExceededQPSQuota(database, Set.of(logicalTableConfig.getTableName()), requestContext)) { String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query); return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 029f5c3491eb..eff90fcf491c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -884,7 +884,8 @@ public static boolean isLogicalTableExists(ZkHelixPropertyStore proper return propertyStore.exists(constructPropertyStorePathForLogical(tableName), AccessOption.PERSISTENT); } - public static boolean isTableConfigExists(ZkHelixPropertyStore propertyStore, String tableName) { - return propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName), AccessOption.PERSISTENT); + public static boolean isTableConfigExists(ZkHelixPropertyStore propertyStore, String tableNameWithType) { + return propertyStore.exists(constructPropertyStorePathForResourceConfig(tableNameWithType), + AccessOption.PERSISTENT); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index 7aa8f41de60c..333a22b5b49e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.tenant.TenantRole; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; @@ -137,6 +138,17 @@ public void addTableConfig(TableConfig tableConfig) } } + public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + try { + HttpClient.wrapAndThrowHttpException( + _httpClient.sendJsonPostRequest(new URI(_controllerRequestURLBuilder.forLogicalTableCreate()), + logicalTableConfig.toJsonString(), _headers)); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + public void updateTableConfig(TableConfig tableConfig) throws IOException { try { @@ -148,6 +160,17 @@ public void updateTableConfig(TableConfig tableConfig) } } + public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + try { + HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPutRequest( + new URI(_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableConfig.getTableName())), + logicalTableConfig.toJsonString(), _headers)); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + public void toggleTableState(String tableName, TableType type, boolean enable) throws IOException { try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 892e3fbc117e..09bd904aee54 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -180,7 +180,6 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; @@ -2152,13 +2151,7 @@ public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig) updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); } - TimeBoundaryConfig oldTimeBoundaryConfig = oldLogicalTableConfig.getTimeBoundaryConfig(); - TimeBoundaryConfig newTimeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); - // compare the old and new time boundary config and send message if they are different - if ((oldTimeBoundaryConfig != null && !oldTimeBoundaryConfig.equals(newTimeBoundaryConfig)) - || (oldTimeBoundaryConfig == null && newTimeBoundaryConfig != null)) { - sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName()); - } + sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName()); LOGGER.info("Updated logical table {}: Successfully updated table", tableName); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 38c29dfeb74f..25b35f6aca37 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -407,7 +407,7 @@ public static LogicalTableConfig getDummyLogicalTableConfig(String tableName, Li .setBrokerTenant(brokerTenant) .setRefOfflineTableName(offlineTableName) .setRefRealtimeTableName(realtimeTableName) - .setQuotaConfig(new QuotaConfig(null, "999")) + .setQuotaConfig(new QuotaConfig(null, "99999")) .setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L)) .setTimeBoundaryConfig(new TimeBoundaryConfig("min", Map.of("includedTables", physicalTableNames))) .setPhysicalTableConfigMap(physicalTableConfigMap); @@ -752,11 +752,21 @@ public void addTableConfig(TableConfig tableConfig) getControllerRequestClient().addTableConfig(tableConfig); } + public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + getControllerRequestClient().addLogicalTableConfig(logicalTableConfig); + } + public void updateTableConfig(TableConfig tableConfig) throws IOException { getControllerRequestClient().updateTableConfig(tableConfig); } + public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws IOException { + getControllerRequestClient().updateLogicalTableConfig(logicalTableConfig); + } + public void toggleTableState(String tableName, TableType type, boolean enable) throws IOException { getControllerRequestClient().toggleTableState(tableName, type, enable); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index f3945bcf1f4d..f91a4a9148cc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -23,6 +23,7 @@ import java.io.UncheckedIOException; import java.net.URI; import java.util.Iterator; +import java.util.List; import java.util.Properties; import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManagerTest; @@ -35,10 +36,12 @@ import org.apache.pinot.common.utils.http.HttpClient; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -76,6 +79,12 @@ public void setUp() TableConfig tableConfig = createOfflineTableConfig(); addTableConfig(tableConfig); + // Create and upload schema and logical table + schema.setSchemaName(getLogicalTableName()); + addSchema(schema); + LogicalTableConfig logicalTableConfig = getLogicalTableConfig(); + addLogicalTableConfig(logicalTableConfig); + Properties properties = new Properties(); properties.put(FAIL_ON_EXCEPTIONS, "FALSE"); _pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory() @@ -96,9 +105,11 @@ void resetQuotas() setQueryQuotaForApplication(null); addQueryQuotaToDatabaseConfig(null); addQueryQuotaToTableConfig(null); + addQueryQuotaToLogicalTableConfig(null); _brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0); verifyQuotaUpdate(0); + verifyQuotaUpdateWithTableName(0, getLogicalTableName()); } @Test @@ -258,6 +269,48 @@ public void testApplicationAndDatabaseQueryQuotaWithTableQueryQuotaWithExtraBrok } } + @Test + public void testLogicalTableQueryQuota() + throws Exception { + int maxQps = 10; + addQueryQuotaToLogicalTableConfig(maxQps); + verifyQuotaUpdateWithTableName(maxQps, getLogicalTableName()); + runQueries(maxQps, false, "default", getLogicalTableName()); + //increase the qps and some of the queries should be throttled. + runQueries(maxQps * 2, true, "default", getLogicalTableName()); + + // queries on broker + runQueriesOnBroker(maxQps, false, getLogicalTableName()); + //increase the qps and some of the queries should be throttled. + runQueriesOnBroker(maxQps * 2, true, getLogicalTableName()); + } + + @Test + public void testLogicalTableWithDatabaseQueryQuota() + throws Exception { + int databaseMaxQps = 25; + int logicalTableMaxQps = 10; + addQueryQuotaToDatabaseConfig(databaseMaxQps); + addQueryQuotaToLogicalTableConfig(logicalTableMaxQps); + // table quota within database quota. Queries should fail upon table quota (10 qps) breach + verifyQuotaUpdateWithTableName(logicalTableMaxQps, getLogicalTableName()); + runQueries(logicalTableMaxQps, false, "default", getLogicalTableName()); + // queries on broker + runQueriesOnBroker(logicalTableMaxQps, false, getLogicalTableName()); + + //increase the logical table qps. + logicalTableMaxQps = 50; + addQueryQuotaToLogicalTableConfig(logicalTableMaxQps); + verifyQuotaUpdateWithTableName(databaseMaxQps, getLogicalTableName()); + runQueries(databaseMaxQps, false, "default", getLogicalTableName()); + // broker queries + runQueriesOnBroker(databaseMaxQps, false, getLogicalTableName()); + + //increase the qps and some of the queries should be throttled. + runQueries(databaseMaxQps * 2, true, "default", getLogicalTableName()); + runQueriesOnBroker(databaseMaxQps * 2, true, getLogicalTableName()); + } + /** * Runs the query load with the max rate that the quota can allow and ensures queries are not failing. * Then runs the query load with double the max rate and expects queries to fail due to quota breach. @@ -295,15 +348,18 @@ private static void sleep(long deadline, double iterationsLeft) { private void runQueries(int qps, boolean shouldFail) { runQueries(qps, shouldFail, "default"); } + private void runQueries(int qps, boolean shouldFail, String applicationName) { + runQueries(qps, shouldFail, applicationName, getTableName()); + } // try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis // is not comparable to sleepMillis, else the actual qps would end up being much lower than required qps - private void runQueries(int qps, boolean shouldFail, String applicationName) { + private void runQueries(int qps, boolean shouldFail, String applicationName, String tableName) { int failCount = 0; boolean isLastFail = false; long deadline = System.currentTimeMillis() + 1000; - String query = "SET applicationName='" + applicationName + "'; SELECT COUNT(*) FROM " + getTableName(); + String query = "SET applicationName='" + applicationName + "'; SELECT COUNT(*) FROM " + tableName; for (int i = 0; i < qps; i++) { sleep(deadline, qps - i); @@ -333,11 +389,15 @@ private void runQueries(int qps, boolean shouldFail, String applicationName) { private static volatile String _quotaSource; private void verifyQuotaUpdate(double quotaQps) { + verifyQuotaUpdateWithTableName(quotaQps, getTableName() + "_OFFLINE"); + } + + private void verifyQuotaUpdateWithTableName(double quotaQps, String tableName) { try { TestUtils.waitForCondition(aVoid -> { try { double tableQuota = Double.parseDouble(sendGetRequest( - "http://" + _brokerHostPort + "/debug/tables/queryQuota/" + getTableName() + "_OFFLINE")); + "http://" + _brokerHostPort + "/debug/tables/queryQuota/" + tableName)); double dbQuota = Double.parseDouble( sendGetRequest("http://" + _brokerHostPort + "/debug/databases/queryQuota/default")); double appQuota = Double.parseDouble( @@ -363,7 +423,7 @@ private void verifyQuotaUpdate(double quotaQps) { } catch (IOException e) { throw new RuntimeException(e); } - }, 10000, "Failed to reflect query quota on rate limiter in 5s."); + }, 10000, "Failed to reflect query quota on rate limiter in 10s."); } catch (AssertionError ae) { throw new AssertionError( ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae); @@ -375,13 +435,17 @@ private BrokerResponse executeQueryOnBroker(String query) { } private void runQueriesOnBroker(float qps, boolean shouldFail) { + runQueriesOnBroker(qps, shouldFail, getTableName()); + } + + private void runQueriesOnBroker(float qps, boolean shouldFail, String tableName) { int failCount = 0; long deadline = System.currentTimeMillis() + 1000; for (int i = 0; i < qps; i++) { sleep(deadline, qps - i); BrokerResponse resultSetGroup = - executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()); + executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) FROM " + tableName); for (Iterator it = resultSetGroup.getExceptions().elements(); it.hasNext(); ) { JsonNode exception = it.next(); if (exception.get("errorCode").asInt() == QueryErrorCode.TOO_MANY_REQUESTS.getId()) { @@ -406,6 +470,14 @@ public void addQueryQuotaToTableConfig(Integer maxQps) // to allow change propagation to QueryQuotaManager } + public void addQueryQuotaToLogicalTableConfig(Integer maxQps) + throws Exception { + LogicalTableConfig logicalTableConfig = getLogicalTableConfig(); + logicalTableConfig.setQuotaConfig(new QuotaConfig(null, maxQps == null ? null : maxQps.toString())); + updateLogicalTableConfig(logicalTableConfig); + // to allow change propagation to QueryQuotaManager + } + public void addQueryQuotaToDatabaseConfig(Integer maxQps) throws Exception { String url = _controllerRequestURLBuilder.getBaseUrl() + "/databases/default/quotas"; @@ -453,4 +525,16 @@ public void addAppQueryQuotaToClusterConfig(Integer maxQps) } // to allow change propagation to QueryQuotaManager } + + private static String getLogicalTableName() { + return "logical_table"; + } + + private LogicalTableConfig getLogicalTableConfig() { + List physicalTableNames = List.of(TableNameBuilder.OFFLINE.tableNameWithType(getTableName())); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(getLogicalTableName(), physicalTableNames, "DefaultTenant"); + logicalTableConfig.setQuotaConfig(new QuotaConfig(null, null)); + return logicalTableConfig; + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java index 3f2bffd888e6..7fb4802cc1f9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java @@ -110,6 +110,7 @@ public void setUp() if (_sharedClusterTestSuite != this) { _controllerRequestURLBuilder = _sharedClusterTestSuite._controllerRequestURLBuilder; _helixResourceManager = _sharedClusterTestSuite._helixResourceManager; + _kafkaStarters = _sharedClusterTestSuite._kafkaStarters; } _avroFiles = getAllAvroFiles(); @@ -323,16 +324,6 @@ protected LogicalTableConfig getLogicalTableConfig(String logicalTableName) return LogicalTableConfig.fromString(resp); } - protected void updateLogicalTableConfig(String logicalTableName, LogicalTableConfig logicalTableConfig) - throws IOException { - String updateLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName); - String resp = - ControllerTest.sendPutRequest(updateLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - - assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + getLogicalTableName() - + " logical table successfully updated.\"}"); - } - protected void deleteLogicalTable() throws IOException { String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName()); @@ -447,7 +438,7 @@ public void testDisableGroovyQueryTableConfigOverride() QueryConfig queryConfig = new QueryConfig(null, false, null, null, null, null); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); String groovyQuery = "SELECT GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', " + "'arg0 + arg1', FlightNum, Origin) FROM mytable"; @@ -459,7 +450,7 @@ public void testDisableGroovyQueryTableConfigOverride() queryConfig = new QueryConfig(null, true, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); // grpc and http throw different exceptions. So only check error message. Exception athrows = expectThrows(Exception.class, () -> postQuery(groovyQuery)); @@ -467,7 +458,7 @@ public void testDisableGroovyQueryTableConfigOverride() // Remove query config logicalTableConfig.setQueryConfig(null); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); athrows = expectThrows(Exception.class, () -> postQuery(groovyQuery)); assertTrue(athrows.getMessage().contains("Groovy transform functions are disabled for queries")); @@ -481,7 +472,7 @@ public void testMaxQueryResponseSizeTableConfig() QueryConfig queryConfig = new QueryConfig(null, null, null, null, 100L, null); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); JsonNode response = postQuery(starQuery); JsonNode exceptions = response.get("exceptions"); @@ -491,7 +482,7 @@ public void testMaxQueryResponseSizeTableConfig() // Query Succeeds with a high limit. queryConfig = new QueryConfig(null, null, null, null, 1000000L, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -499,7 +490,7 @@ public void testMaxQueryResponseSizeTableConfig() //Reset to null. queryConfig = new QueryConfig(null, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -513,7 +504,7 @@ public void testMaxServerResponseSizeTableConfig() QueryConfig queryConfig = new QueryConfig(null, null, null, null, null, 1000L); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); JsonNode response = postQuery(starQuery); JsonNode exceptions = response.get("exceptions"); assertTrue(!exceptions.isEmpty() @@ -522,7 +513,7 @@ public void testMaxServerResponseSizeTableConfig() // Query Succeeds with a high limit. queryConfig = new QueryConfig(null, null, null, null, null, 1000000L); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -530,7 +521,7 @@ public void testMaxServerResponseSizeTableConfig() //Reset to null. queryConfig = new QueryConfig(null, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -543,7 +534,7 @@ public void testQueryTimeOut() QueryConfig queryConfig = new QueryConfig(1L, null, null, null, null, null); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); JsonNode response = postQuery(starQuery); JsonNode exceptions = response.get("exceptions"); assertTrue( @@ -554,7 +545,7 @@ public void testQueryTimeOut() // Query Succeeds with a high limit. queryConfig = new QueryConfig(1000000L, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); @@ -562,7 +553,7 @@ public void testQueryTimeOut() //Reset to null. queryConfig = new QueryConfig(null, null, null, null, null, null); logicalTableConfig.setQueryConfig(queryConfig); - updateLogicalTableConfig(getLogicalTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); response = postQuery(starQuery); exceptions = response.get("exceptions"); assertTrue(exceptions.isEmpty(), "Query should not throw exception"); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java index d406b2ad5607..55d80735abdc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java @@ -72,6 +72,6 @@ private void updateTimeBoundaryTableInLogicalTable(LogicalTableConfig logicalTab Map parameters = Map.of("includedTables", List.of(newTimeBoundaryTableName)); logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters); - updateLogicalTableConfig(logicalTableConfig.getTableName(), logicalTableConfig); + updateLogicalTableConfig(logicalTableConfig); } } From 3970102f58531d4935d5151cd0d9808a08a66a41 Mon Sep 17 00:00:00 2001 From: abhishekbafna Date: Tue, 20 May 2025 18:17:36 +0530 Subject: [PATCH 2/4] Add query quota manager acquire method for logical tables. --- ...lixExternalViewBasedQueryQuotaManager.java | 47 ++++++++----------- .../broker/queryquota/QueryQuotaManager.java | 7 +++ .../BaseBrokerRequestHandler.java | 15 ++++-- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index db6f68790f61..7bb8641271b7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -246,27 +246,27 @@ private QuotaConfig getQuotaConfigFromPropertyStore(String tableNameWithType) { /** * Create or update a rate limiter for a table. - * @param tableNameWithType table name with table type. + * @param physicalOrLogicalTableName physical or logical table name. * @param brokerResource broker resource which stores all the broker states of each table. * @param quotaConfig quota config of the table. * @param tableStat stat of the table config. */ - private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView brokerResource, + private void createOrUpdateRateLimiter(String physicalOrLogicalTableName, ExternalView brokerResource, QuotaConfig quotaConfig, Stat tableStat) { if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) { - LOGGER.info("No qps config specified for table: {}", tableNameWithType); - buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType); + LOGGER.info("No qps config specified for table: {}", physicalOrLogicalTableName); + buildEmptyOrResetRateLimiterInQueryQuotaEntity(physicalOrLogicalTableName); return; } if (brokerResource == null) { - LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", tableNameWithType); + LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", physicalOrLogicalTableName); // It could be possible that brokerResourceEV is null due to ZK connection issue. // In this case, the rate limiter should not be reset. Simply exit the method would be sufficient. return; } - Map stateMap = brokerResource.getStateMap(tableNameWithType); + Map stateMap = brokerResource.getStateMap(physicalOrLogicalTableName); int otherOnlineBrokerCount = 0; // If stateMap is null, that means this broker is the first broker for this table. @@ -280,22 +280,22 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br } int onlineCount = otherOnlineBrokerCount + 1; - LOGGER.info("The number of online brokers for table {} is {}", tableNameWithType, onlineCount); + LOGGER.info("The number of online brokers for table {} is {}", physicalOrLogicalTableName, onlineCount); // Get the dynamic rate double overallRate = quotaConfig.getMaxQPS(); double perBrokerRate = overallRate / onlineCount; - QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType); + QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(physicalOrLogicalTableName); if (queryQuotaEntity == null) { queryQuotaEntity = new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, tableStat.getVersion()); - _rateLimiterMap.put(tableNameWithType, queryQuotaEntity); + _rateLimiterMap.put(physicalOrLogicalTableName, queryQuotaEntity); LOGGER.info( "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online " - + "broker instances: {}. Table config stat version: {}", tableNameWithType, overallRate, perBrokerRate, - onlineCount, tableStat.getVersion()); + + "broker instances: {}. Table config stat version: {}", physicalOrLogicalTableName, overallRate, + perBrokerRate, onlineCount, tableStat.getVersion()); } else { RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter(); double previousRate = -1; @@ -314,10 +314,10 @@ private void createOrUpdateRateLimiter(String tableNameWithType, ExternalView br LOGGER.info( "Rate limiter for table: {} has been updated. Overall rate: {}. Previous per-broker rate: {}. New " + "per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}", - tableNameWithType, overallRate, previousRate, perBrokerRate, onlineCount, tableStat.getVersion()); + physicalOrLogicalTableName, overallRate, previousRate, perBrokerRate, onlineCount, tableStat.getVersion()); } - addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); - addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); + addMaxBurstQPSCallbackTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); + addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(physicalOrLogicalTableName, queryQuotaEntity); if (isQueryRateLimitDisabled()) { LOGGER.info("Query rate limiting is currently disabled for this broker. So it won't take effect immediately."); } @@ -660,15 +660,6 @@ public boolean acquire(String tableName) { if (isQueryRateLimitDisabled()) { return true; } - - if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { - return acquireForLogicalTable(tableName); - } else { - return acquireForPhysicalTable(tableName); - } - } - - private boolean acquireForPhysicalTable(String tableName) { String offlineTableName = null; String realtimeTableName = null; QueryQuotaEntity offlineTableQueryQuotaEntity = null; @@ -698,14 +689,16 @@ private boolean acquireForPhysicalTable(String tableName) { LOGGER.debug("Trying to acquire token for table: {}", realtimeTableName); realtimeQuotaOk = tryAcquireToken(realtimeTableName, realtimeTableQueryQuotaEntity); } + return offlineQuotaOk && realtimeQuotaOk; } - private boolean acquireForLogicalTable(String tableName) { - QueryQuotaEntity logicalTableQueryQuotaEntity = _rateLimiterMap.get(tableName); + @Override + public boolean acquireLogicalTable(String logicalTableName) { + QueryQuotaEntity logicalTableQueryQuotaEntity = _rateLimiterMap.get(logicalTableName); if (logicalTableQueryQuotaEntity != null) { - LOGGER.debug("Trying to acquire token for logical table: {}", tableName); - return tryAcquireToken(tableName, logicalTableQueryQuotaEntity); + LOGGER.debug("Trying to acquire token for logical table: {}", logicalTableName); + return tryAcquireToken(logicalTableName, logicalTableQueryQuotaEntity); } return true; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java index 70c3ef7588eb..eb3de472d951 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java @@ -27,6 +27,13 @@ public interface QueryQuotaManager { */ boolean acquire(String tableName); + /** + * Try to acquire a quota for the given logical table. + * @param logicalTableName Logical table name + * @return {@code true} if the table quota has not been reached, {@code false} otherwise + */ + boolean acquireLogicalTable(String logicalTableName); + /** * Try to acquire a quota for the given database. * @param databaseName database name diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index f9bfe311c1fc..3dc76d65885c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -261,11 +261,18 @@ protected boolean hasExceededQPSQuota(@Nullable String database, Set tab requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); return true; } - for (String tableName : tableNames) { - if (!_queryQuotaManager.acquire(tableName)) { - LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName); + for (String physicalOrLogicalTableName : tableNames) { + boolean acquired; + if (_tableCache.isLogicalTable(physicalOrLogicalTableName)) { + acquired = _queryQuotaManager.acquireLogicalTable(physicalOrLogicalTableName); + } else { + acquired = _queryQuotaManager.acquire(physicalOrLogicalTableName); + } + if (!acquired) { + LOGGER.warn("Request {}: query exceeds quota for table: {}", + requestContext.getRequestId(), physicalOrLogicalTableName); requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); - String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String rawTableName = TableNameBuilder.extractRawTableName(physicalOrLogicalTableName); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); return true; } From 5528aa7426e0e3d7cef5fefa0f8babb62eda80e8 Mon Sep 17 00:00:00 2001 From: abhishekbafna Date: Fri, 23 May 2025 16:25:30 +0530 Subject: [PATCH 3/4] Addressing minor review comment. --- .../pinot/broker/requesthandler/BaseBrokerRequestHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index e7c2172b1be5..c24ddf0ff1a5 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -262,17 +262,19 @@ protected boolean hasExceededQPSQuota(@Nullable String database, Set tab return true; } for (String physicalOrLogicalTableName : tableNames) { + String rawTableName = physicalOrLogicalTableName; boolean acquired; if (_tableCache.isLogicalTable(physicalOrLogicalTableName)) { acquired = _queryQuotaManager.acquireLogicalTable(physicalOrLogicalTableName); } else { acquired = _queryQuotaManager.acquire(physicalOrLogicalTableName); + rawTableName = TableNameBuilder.extractRawTableName(physicalOrLogicalTableName); } if (!acquired) { LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), physicalOrLogicalTableName); requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); - String rawTableName = TableNameBuilder.extractRawTableName(physicalOrLogicalTableName); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); return true; } From 6e96b878131f17d1ac2dcce660daf83109d32635 Mon Sep 17 00:00:00 2001 From: abhishekbafna Date: Tue, 27 May 2025 11:06:46 +0530 Subject: [PATCH 4/4] Revert the qps for mse and localize for sse. --- .../BaseBrokerRequestHandler.java | 17 ++++---------- .../BaseSingleStageBrokerRequestHandler.java | 23 +++++++++++++------ 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index c24ddf0ff1a5..2a344876c358 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -261,20 +261,11 @@ protected boolean hasExceededQPSQuota(@Nullable String database, Set tab requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); return true; } - for (String physicalOrLogicalTableName : tableNames) { - String rawTableName = physicalOrLogicalTableName; - boolean acquired; - if (_tableCache.isLogicalTable(physicalOrLogicalTableName)) { - acquired = _queryQuotaManager.acquireLogicalTable(physicalOrLogicalTableName); - } else { - acquired = _queryQuotaManager.acquire(physicalOrLogicalTableName); - rawTableName = TableNameBuilder.extractRawTableName(physicalOrLogicalTableName); - } - if (!acquired) { - LOGGER.warn("Request {}: query exceeds quota for table: {}", - requestContext.getRequestId(), physicalOrLogicalTableName); + for (String tableName : tableNames) { + if (!_queryQuotaManager.acquire(tableName)) { + LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName); requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); - + String rawTableName = TableNameBuilder.extractRawTableName(tableName); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); return true; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 9eb0b53f6cff..ab013085cb28 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -366,8 +366,8 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn // Compile the request into PinotQuery long compilationStartTimeNs = System.nanoTime(); CompileResult compileResult = - compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders, - accessControl); + compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders, + accessControl); if (compileResult._errorOrLiteralOnlyBrokerResponse != null) { /* @@ -406,8 +406,17 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn } // Validate QPS - if (hasExceededQPSQuota(database, Set.of(logicalTableConfig.getTableName()), requestContext)) { - String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query); + if (!_queryQuotaManager.acquireDatabase(database)) { + String errorMessage = + String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); + LOGGER.info(errorMessage); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); + return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); + } + if (!_queryQuotaManager.acquireLogicalTable(tableName)) { + String errorMessage = + String.format("Request %d: %s exceeds query quota for table: %s.", requestId, query, tableName); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); } @@ -815,9 +824,9 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) { return new CompileResult(new BrokerResponseNative(QueryErrorCode.SQL_PARSING, "It seems that the query is only supported by the multi-stage query engine, please retry the query " - + "using " - + "the multi-stage query engine " - + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)")); + + "using " + + "the multi-stage query engine " + + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)")); } else { return new CompileResult( new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage()));