Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.servicecomb.qps;

import com.google.common.annotations.VisibleForTesting;
import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
Expand All @@ -28,6 +29,12 @@
* Support 3 levels of microservice/schema/operation.
*/
public class ConsumerQpsFlowControlHandler implements Handler {

@VisibleForTesting
public QpsControllerManager getQpsControllerMgr() {
return qpsControllerMgr;
}

private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(false);

@Override
Expand All @@ -40,7 +47,8 @@ public void handle(Invocation invocation, AsyncResponse asyncResp) throws Except
QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation);
if (qpsStrategy.isLimitNewRequest()) {
// return http status 429
CommonExceptionData errorData = new CommonExceptionData("consumer request rejected by qps flowcontrol");
CommonExceptionData errorData = new CommonExceptionData(
"consumer request rejected by qps flowcontrol");
asyncResp.consumerFail(
new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;

import com.google.common.annotations.VisibleForTesting;

public class ProviderQpsFlowControlHandler implements Handler {

private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(true);

@Override
Expand All @@ -44,16 +47,19 @@ public void handle(Invocation invocation, AsyncResponse asyncResp) throws Except

String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE);
QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(microserviceName, invocation);
isLimitNewRequest(qpsStrategy, asyncResp);
checkRequestRateLimited(qpsStrategy, asyncResp);
}

private boolean isLimitNewRequest(QpsStrategy qpsStrategy, AsyncResponse asyncResp) {
private void checkRequestRateLimited(QpsStrategy qpsStrategy, AsyncResponse asyncResp) {
if (qpsStrategy.isLimitNewRequest()) {
CommonExceptionData errorData = new CommonExceptionData("provider request rejected by qps flowcontrol");
CommonExceptionData errorData = new CommonExceptionData(
"provider request rejected by qps flowcontrol");
asyncResp.producerFail(new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData));
return true;
} else {
return false;
}
}

@VisibleForTesting
public QpsControllerManager getQpsControllerMgr() {
return qpsControllerMgr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.config.DynamicProperty;

public class QpsControllerManager {
Expand Down Expand Up @@ -75,6 +76,11 @@ public QpsControllerManager(boolean isProvider) {
initGlobalQpsController();
}

@VisibleForTesting
public Map<String, AbstractQpsStrategy> getQualifiedNameControllerMap() {
return qualifiedNameControllerMap;
}

public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) {
final String name = validatedName(microserviceName);
return qualifiedNameControllerMap
Expand All @@ -95,7 +101,8 @@ private String validatedName(String microserviceName) {
* Create relevant qpsLimit dynamicProperty and watch the configuration change.
* Search and return a valid qpsController.
*/
private AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName,
@VisibleForTesting
AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName,
Invocation invocation) {
createForService(qualifiedNameKey, microserviceName, invocation);
String qualifiedAnyServiceName = Config.ANY_SERVICE + qualifiedNameKey.substring(microserviceName.length());
Expand Down Expand Up @@ -155,11 +162,6 @@ private AbstractQpsStrategy searchQpsController(String qualifiedNameKey) {
return null;
}

private boolean keyMatch(String configKey, Entry<String, AbstractQpsStrategy> controllerEntry) {
return controllerEntry.getKey().equals(configKey)
|| controllerEntry.getKey().startsWith(configKey + SEPARATOR);
}

private boolean isValidQpsController(AbstractQpsStrategy qpsStrategy) {
return null != qpsStrategy && null != qpsStrategy.getQpsLimit();
}
Expand All @@ -182,37 +184,37 @@ private void createQpsControllerIfNotExist(String configKey) {
configQpsControllerMap.put(configKey, innerQpsStrategy);
LOGGER.info("Global flow control strategy update, value = [{}]",
strategyProperty.getString());
updateObjMap(configKey);
updateObjMap();
});
limitProperty.addCallback(() -> {
qpsStrategy.setQpsLimit(limitProperty.getLong());
LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey,
limitProperty.getString());
updateObjMap(configKey);
updateObjMap();
});
bucketProperty.addCallback(() -> {
qpsStrategy.setBucketLimit(bucketProperty.getLong());
LOGGER.info("bucket limit updated, configKey = [{}], value = [{}]", configKey,
bucketProperty.getString());
updateObjMap(configKey);
updateObjMap();
});

configQpsControllerMap.put(configKey, qpsStrategy);
}

protected void updateObjMap(String configKey) {
protected void updateObjMap() {
Iterator<Entry<String, AbstractQpsStrategy>> it = qualifiedNameControllerMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, AbstractQpsStrategy> entry = it.next();
if (keyMatch(configKey, entry)) {
AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey());
if (qpsStrategy != null) {
entry.setValue(qpsStrategy);
LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]",
entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit());
} else {
it.remove();
}
AbstractQpsStrategy qpsStrategy = searchQpsController(entry.getKey());
if (qpsStrategy == null) {
it.remove();
continue;
}
if (qpsStrategy != entry.getValue()) {
entry.setValue(qpsStrategy);
LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]",
entry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class FixedWindowStrategy extends AbstractQpsStrategy {
private volatile long msCycleBegin;

// Request count between Interval begin and now in one interval
private AtomicLong requestCount = new AtomicLong();
private final AtomicLong requestCount = new AtomicLong();

// request count before an interval
private volatile long lastRequestCount = 1;
Expand All @@ -55,9 +55,8 @@ public boolean isLimitNewRequest() {
// Configuration update and use is at the situation of multi-threaded concurrency
// It is possible that operation level updated to null,but schema level or microservice level does not updated
boolean isLimitRequest = newCount - lastRequestCount >= this.getQpsLimit();
if (isLimitRequest) {
LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(),
newCount - lastRequestCount + 1);
if (isLimitRequest){
LOGGER.warn("qps flowcontrol open, qpsLimit is {} and tps is {}", this.getQpsLimit(), newCount - lastRequestCount + 1);
}
return isLimitRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class LeakyBucketStrategy extends AbstractQpsStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(LeakyBucketStrategy.class);

// Request count between Interval begin and now in one interval
private volatile AtomicLong requestCount = new AtomicLong();
private final AtomicLong requestCount = new AtomicLong();

private volatile long lastTime;

Expand All @@ -47,7 +47,7 @@ public boolean isLimitNewRequest() {
throw new IllegalStateException("should not happen");
}
if (this.getBucketLimit() == null) {
this.setBucketLimit(Math.max(2 * this.getQpsLimit(), Integer.MAX_VALUE));
this.setBucketLimit(Math.min(2 * this.getQpsLimit(), Integer.MAX_VALUE));
}
long nowTime = System.currentTimeMillis();
//get the num of te period time
Expand Down