From 01d1ecb647e7febe01fbc4dc43d137e932ea6cdf Mon Sep 17 00:00:00 2001 From: liubao Date: Thu, 29 Sep 2022 17:22:48 +0800 Subject: [PATCH] [SCB-2689]fix rate limiting configuration changed cause NPE problem --- .../qps/ConsumerQpsFlowControlHandler.java | 10 ++++- .../qps/ProviderQpsFlowControlHandler.java | 18 ++++++--- .../servicecomb/qps/QpsControllerManager.java | 40 ++++++++++--------- .../qps/strategy/FixedWindowStrategy.java | 7 ++-- .../qps/strategy/LeakyBucketStrategy.java | 4 +- 5 files changed, 47 insertions(+), 32 deletions(-) diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java index f04ace87ca3..8885af4dad2 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.qps; +import com.google.common.annotations.VisibleForTesting; import org.apache.servicecomb.core.Handler; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.swagger.invocation.AsyncResponse; @@ -28,6 +29,12 @@ * Support 3 levels of microservice/schema/operation. */ public class ConsumerQpsFlowControlHandler implements Handler { + + @VisibleForTesting + public QpsControllerManager getQpsControllerMgr() { + return qpsControllerMgr; + } + private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(false); @Override @@ -40,7 +47,8 @@ public void handle(Invocation invocation, AsyncResponse asyncResp) throws Except QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation); if (qpsStrategy.isLimitNewRequest()) { // return http status 429 - CommonExceptionData errorData = new CommonExceptionData("consumer request rejected by qps flowcontrol"); + CommonExceptionData errorData = new CommonExceptionData( + "consumer request rejected by qps flowcontrol"); asyncResp.consumerFail( new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData)); return; diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java index 009f81cf9a1..21016dbb608 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java @@ -24,7 +24,10 @@ import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; +import com.google.common.annotations.VisibleForTesting; + public class ProviderQpsFlowControlHandler implements Handler { + private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(true); @Override @@ -44,16 +47,19 @@ public void handle(Invocation invocation, AsyncResponse asyncResp) throws Except String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE); QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(microserviceName, invocation); - isLimitNewRequest(qpsStrategy, asyncResp); + checkRequestRateLimited(qpsStrategy, asyncResp); } - private boolean isLimitNewRequest(QpsStrategy qpsStrategy, AsyncResponse asyncResp) { + private void checkRequestRateLimited(QpsStrategy qpsStrategy, AsyncResponse asyncResp) { if (qpsStrategy.isLimitNewRequest()) { - CommonExceptionData errorData = new CommonExceptionData("provider request rejected by qps flowcontrol"); + CommonExceptionData errorData = new CommonExceptionData( + "provider request rejected by qps flowcontrol"); asyncResp.producerFail(new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData)); - return true; - } else { - return false; } } + + @VisibleForTesting + public QpsControllerManager getQpsControllerMgr() { + return qpsControllerMgr; + } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java index 1e954a65ddb..436d6db1344 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.netflix.config.DynamicProperty; public class QpsControllerManager { @@ -75,6 +76,11 @@ public QpsControllerManager(boolean isProvider) { initGlobalQpsController(); } + @VisibleForTesting + public Map getQualifiedNameControllerMap() { + return qualifiedNameControllerMap; + } + public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) { final String name = validatedName(microserviceName); return qualifiedNameControllerMap @@ -95,7 +101,8 @@ private String validatedName(String microserviceName) { * Create relevant qpsLimit dynamicProperty and watch the configuration change. * Search and return a valid qpsController. */ - private AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName, + @VisibleForTesting + AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName, Invocation invocation) { createForService(qualifiedNameKey, microserviceName, invocation); String qualifiedAnyServiceName = Config.ANY_SERVICE + qualifiedNameKey.substring(microserviceName.length()); @@ -155,11 +162,6 @@ private AbstractQpsStrategy searchQpsController(String qualifiedNameKey) { return null; } - private boolean keyMatch(String configKey, Entry controllerEntry) { - return controllerEntry.getKey().equals(configKey) - || controllerEntry.getKey().startsWith(configKey + SEPARATOR); - } - private boolean isValidQpsController(AbstractQpsStrategy qpsStrategy) { return null != qpsStrategy && null != qpsStrategy.getQpsLimit(); } @@ -182,37 +184,37 @@ private void createQpsControllerIfNotExist(String configKey) { configQpsControllerMap.put(configKey, innerQpsStrategy); LOGGER.info("Global flow control strategy update, value = [{}]", strategyProperty.getString()); - updateObjMap(configKey); + updateObjMap(); }); limitProperty.addCallback(() -> { qpsStrategy.setQpsLimit(limitProperty.getLong()); LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey, limitProperty.getString()); - updateObjMap(configKey); + updateObjMap(); }); bucketProperty.addCallback(() -> { qpsStrategy.setBucketLimit(bucketProperty.getLong()); LOGGER.info("bucket limit updated, configKey = [{}], value = [{}]", configKey, bucketProperty.getString()); - updateObjMap(configKey); + updateObjMap(); }); configQpsControllerMap.put(configKey, qpsStrategy); } - protected void updateObjMap(String configKey) { + protected void updateObjMap() { Iterator> it = qualifiedNameControllerMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); - if (keyMatch(configKey, entry)) { - AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey()); - if (qpsStrategy != null) { - entry.setValue(qpsStrategy); - LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]", - entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit()); - } else { - it.remove(); - } + AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey()); + if (qpsStrategy == null) { + it.remove(); + continue; + } + if (qpsStrategy != entry.getValue()) { + entry.setValue(qpsStrategy); + LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]", + entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit()); } } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java index 4fd884e975c..65204c3a304 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java @@ -29,7 +29,7 @@ public class FixedWindowStrategy extends AbstractQpsStrategy { private volatile long msCycleBegin; // Request count between Interval begin and now in one interval - private AtomicLong requestCount = new AtomicLong(); + private final AtomicLong requestCount = new AtomicLong(); // request count before an interval private volatile long lastRequestCount = 1; @@ -55,9 +55,8 @@ public boolean isLimitNewRequest() { // Configuration update and use is at the situation of multi-threaded concurrency // It is possible that operation level updated to null,but schema level or microservice level does not updated boolean isLimitRequest = newCount - lastRequestCount >= this.getQpsLimit(); - if (isLimitRequest) { - LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(), - newCount - lastRequestCount + 1); + if (isLimitRequest){ + LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(), newCount - lastRequestCount + 1); } return isLimitRequest; } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java index 180be4d747b..b8bf65c5e7e 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java @@ -33,7 +33,7 @@ public class LeakyBucketStrategy extends AbstractQpsStrategy { private static final Logger LOGGER = LoggerFactory.getLogger(LeakyBucketStrategy.class); // Request count between Interval begin and now in one interval - private volatile AtomicLong requestCount = new AtomicLong(); + private final AtomicLong requestCount = new AtomicLong(); private volatile long lastTime; @@ -47,7 +47,7 @@ public boolean isLimitNewRequest() { throw new IllegalStateException("should not happen"); } if (this.getBucketLimit() == null) { - this.setBucketLimit(Math.max(2 * this.getQpsLimit(), Integer.MAX_VALUE)); + this.setBucketLimit(Math.min(2 * this.getQpsLimit(), Integer.MAX_VALUE)); } long nowTime = System.currentTimeMillis(); //get the num of te period time