Skip to content

Commit

Permalink
Adaptive loadbalance (#10745)
Browse files Browse the repository at this point in the history
  • Loading branch information
ningboliu committed Dec 13, 2022
1 parent 029b15f commit d171605
Show file tree
Hide file tree
Showing 10 changed files with 604 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.cluster;

import org.apache.dubbo.common.beans.factory.ScopeBeanFactory;
import org.apache.dubbo.rpc.AdaptiveMetrics;
import org.apache.dubbo.rpc.cluster.merger.MergerFactory;
import org.apache.dubbo.rpc.cluster.router.RouterSnapshotSwitcher;
import org.apache.dubbo.rpc.cluster.router.mesh.route.MeshRuleManager;
Expand All @@ -38,6 +39,7 @@ public void initializeApplicationModel(ApplicationModel applicationModel) {
ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
beanFactory.registerBean(MergerFactory.class);
beanFactory.registerBean(ClusterUtils.class);
beanFactory.registerBean(AdaptiveMetrics.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.LoadbalanceRules;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.AdaptiveMetrics;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;

/**
* AdaptiveLoadBalance
* </p>
*/
public class AdaptiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "adaptive";

//default key
private String attachmentKey = "mem,load";

private AdaptiveMetrics adaptiveMetrics;

public AdaptiveLoadBalance(ApplicationModel scopeModel){
adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
}

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
Invoker<T> invoker = selectByP2C(invokers,invocation);
invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey);
long startTime = System.currentTimeMillis();
invocation.getAttributes().put(Constants.ADAPTIVE_LOADBALANCE_START_TIME,startTime);
invocation.getAttributes().put(LOADBALANCE_KEY,LoadbalanceRules.ADAPTIVE);
adaptiveMetrics.addConsumerReq(getServiceKey(invoker,invocation));
adaptiveMetrics.setPickTime(getServiceKey(invoker,invocation),startTime);

return invoker;
}

private <T> Invoker<T> selectByP2C(List<Invoker<T>> invokers, Invocation invocation){
int length = invokers.size();
if(length == 1) {
return invokers.get(0);
}

if(length == 2) {
return chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation);
}

int pos1 = ThreadLocalRandom.current().nextInt(length);
int pos2 = ThreadLocalRandom.current().nextInt(length - 1);
if (pos2 >= pos1) {
pos2 = pos2 + 1;
}

return chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation);
}

private String getServiceKey(Invoker<?> invoker,Invocation invocation){

String key = (String) invocation.getAttributes().get(invoker);
if (StringUtils.isNotEmpty(key)){
return key;
}

key = buildServiceKey(invoker,invocation);
invocation.getAttributes().put(invoker,key);
return key;
}

private String buildServiceKey(Invoker<?> invoker,Invocation invocation){
URL url = invoker.getUrl();
StringBuilder sb = new StringBuilder(128);
sb.append(url.getAddress()).append(":").append(invocation.getProtocolServiceKey());
return sb.toString();
}

private int getTimeout(Invoker<?> invoker, Invocation invocation) {
URL url = invoker.getUrl();
String methodName = RpcUtils.getMethodName(invocation);
return (int) RpcUtils.getTimeout(url,methodName, RpcContext.getClientAttachment(),invocation, DEFAULT_TIMEOUT);
}

private <T> Invoker<T> chooseLowLoadInvoker(Invoker<T> invoker1,Invoker<T> invoker2,Invocation invocation){
int weight1 = getWeight(invoker1, invocation);
int weight2 = getWeight(invoker2, invocation);
int timeout1 = getTimeout(invoker2, invocation);
int timeout2 = getTimeout(invoker2, invocation);
long load1 = Double.doubleToLongBits(adaptiveMetrics.getLoad(getServiceKey(invoker1,invocation),weight1,timeout1 ));
long load2 = Double.doubleToLongBits(adaptiveMetrics.getLoad(getServiceKey(invoker2,invocation),weight2,timeout2 ));

if (load1 == load2) {
// The sum of weights
int totalWeight = weight1 + weight2;
if (totalWeight > 0) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
if (offset < weight1) {
return invoker1;
}
return invoker2;
}
return ThreadLocalRandom.current().nextInt(2) == 0 ? invoker1 : invoker2;
}
return load1 > load2 ? invoker2 : invoker1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance
shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance
adaptive=org.apache.dubbo.rpc.cluster.loadbalance.AdaptiveLoadBalance
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AdaptiveMetrics;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.model.ApplicationModel;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class AdaptiveLoadBalanceTest extends LoadBalanceBaseTest {

private ApplicationModel scopeModel;

private AdaptiveMetrics adaptiveMetrics;

@Test
@Order(0)
void testSelectByWeight() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
int sumInvoker3 = 0;
int loop = 10000;

ApplicationModel scopeModel = ApplicationModel.defaultModel();

AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel);
for (int i = 0; i < loop; i++) {
Invoker selected = lb.select(weightInvokers, null, weightTestInvocation);

if (selected.getUrl().getProtocol().equals("test1")) {
sumInvoker1++;
}

if (selected.getUrl().getProtocol().equals("test2")) {
sumInvoker2++;
}

if (selected.getUrl().getProtocol().equals("test3")) {
sumInvoker3++;
}
}

// 1 : 9 : 6
System.out.println(sumInvoker1);
System.out.println(sumInvoker2);
System.out.println(sumInvoker3);
Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker3, loop, "select failed!");
}

private String buildServiceKey(Invoker invoker){
URL url = invoker.getUrl();
return url.getAddress() + ":" + invocation.getProtocolServiceKey();
}

private AdaptiveMetrics getAdaptiveMetricsInstance(){
if (adaptiveMetrics == null) {
adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class);
}
return adaptiveMetrics;
}

@Test
@Order(1)
void testSelectByAdaptive() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
int sumInvoker5 = 0;
int loop = 10000;

scopeModel = ApplicationModel.defaultModel();
AdaptiveLoadBalance lb = new AdaptiveLoadBalance(scopeModel);

lb.select(weightInvokersSR, null, weightTestInvocation);

for (int i = 0; i < loop; i++) {
Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation);

Map<String, String> metricsMap = new HashMap<>();
String idKey = buildServiceKey(selected);

if (selected.getUrl().getProtocol().equals("test1")) {
sumInvoker1++;
metricsMap.put("rt", "10");
metricsMap.put("load", "10");
metricsMap.put("curTime", String.valueOf(System.currentTimeMillis()-10));
getAdaptiveMetricsInstance().addConsumerSuccess(idKey);
}

if (selected.getUrl().getProtocol().equals("test2")) {
sumInvoker2++;
metricsMap.put("rt", "100");
metricsMap.put("load", "40");
metricsMap.put("curTime", String.valueOf(System.currentTimeMillis()-100));
getAdaptiveMetricsInstance().addConsumerSuccess(idKey);
}

if (selected.getUrl().getProtocol().equals("test5")) {
metricsMap.put("rt", "5000");
metricsMap.put("load", "400");//400%
metricsMap.put("curTime", String.valueOf(System.currentTimeMillis() - 5000));

getAdaptiveMetricsInstance().addErrorReq(idKey);
sumInvoker5++;
}
getAdaptiveMetricsInstance().setProviderMetrics(idKey,metricsMap);

}
Map<Invoker<LoadBalanceBaseTest>, Integer> weightMap = weightInvokersSR.stream()
.collect(Collectors.toMap(Function.identity(), e -> Integer.valueOf(e.getUrl().getParameter("weight"))));
Integer totalWeight = weightMap.values().stream().reduce(0, Integer::sum);
// max deviation = expectWeightValue * 2
int expectWeightValue = loop / totalWeight;
int maxDeviation = expectWeightValue * 2;
double beta = 0.5;
//this EMA is an approximate value
double ewma1 = beta * 50 + (1 - beta) * 10;
double ewma2 = beta * 50 + (1 - beta) * 100;
double ewma5 = beta * 50 + (1 - beta) * 5000;

AtomicInteger weight1 = new AtomicInteger();
AtomicInteger weight2 = new AtomicInteger();
AtomicInteger weight5 = new AtomicInteger();
weightMap.forEach((k, v) ->{
if (k.getUrl().getProtocol().equals("test1")){
weight1.set(v);
}
else if (k.getUrl().getProtocol().equals("test2")){
weight2.set(v);
}
else if (k.getUrl().getProtocol().equals("test5")){
weight5.set(v);
}
});

Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker5, loop, "select failed!");
Assertions.assertTrue(Math.abs(sumInvoker1 / (weightMap.get(weightInvoker1) * ewma1) - expectWeightValue) < maxDeviation, "select failed!");
Assertions.assertTrue(Math.abs(sumInvoker2 / (weightMap.get(weightInvoker2) * ewma2) - expectWeightValue) < maxDeviation, "select failed!");
Assertions.assertTrue(Math.abs(sumInvoker5 / (weightMap.get(weightInvoker5) * ewma5) - expectWeightValue) < maxDeviation, "select failed!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public interface LoadbalanceRules {
**/
String SHORTEST_RESPONSE = "shortestresponse";

/**
* adaptive load balance.
**/
String ADAPTIVE = "adaptive";

String EMPTY = "";

}
Loading

0 comments on commit d171605

Please sign in to comment.