Skip to content

Commit

Permalink
Smooth Round Robin selection (apache#2650)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonjoo2010 authored and CrazyHZM committed Dec 6, 2018
1 parent 8bc08f9 commit 4c107f9
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,139 @@
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.AtomicPositiveInteger;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Round robin load balance.
*
* @author jason
*/
public class RoundRobinLoadBalance extends AbstractLoadBalance {

public static final String NAME = "roundrobin";

private static int RECYCLE_PERIOD = 60000;

protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}

private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

private final ConcurrentMap<String, AtomicPositiveInteger> indexSeqs = new ConcurrentHashMap<String, AtomicPositiveInteger>();

private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
private AtomicBoolean updateLock = new AtomicBoolean();

/**
* get invoker addr list cached for specified invocation
* <p>
* <b>for unit test only</b>
*
* @param invokers
* @param invocation
* @return
*/
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // Number of invokers
int maxWeight = 0; // The maximum weight
int minWeight = Integer.MAX_VALUE; // The minimum weight
final List<Invoker<T>> nonZeroWeightedInvokers = new ArrayList<>();
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
if (weight > 0) {
nonZeroWeightedInvokers.add(invokers.get(i));
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}

if (maxWeight > 0 && minWeight < maxWeight) {
AtomicPositiveInteger indexSeq = indexSeqs.get(key);
if (indexSeq == null) {
indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
indexSeq = indexSeqs.get(key);
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
int weight = getWeight(invoker, invocation);
if (weight < 0) {
weight = 0;
}
length = nonZeroWeightedInvokers.size();
while (true) {
int index = indexSeq.incrementAndGet() % length;
int currentWeight;
if (index == 0) {
currentWeight = sequence.incrementAndGet() % maxWeight;
} else {
currentWeight = sequence.get() % maxWeight;
}
if (getWeight(nonZeroWeightedInvokers.get(index), invocation) > currentWeight) {
return nonZeroWeightedInvokers.get(index);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
weightedRoundRobin = map.get(identifyString);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
// Round robin
return invokers.get(sequence.getAndIncrement() % length);
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ public int testSticky(String methodName, boolean check) {

given(invoker1.invoke(invocation)).willReturn(result);
given(invoker1.isAvailable()).willReturn(true);
given(invoker1.getUrl()).willReturn(url);
given(invoker1.getUrl()).willReturn(url.setPort(1));
given(invoker1.getInterface()).willReturn(StickyTest.class);

given(invoker2.invoke(invocation)).willReturn(result);
given(invoker2.isAvailable()).willReturn(true);
given(invoker2.getUrl()).willReturn(url);
given(invoker2.getUrl()).willReturn(url.setPort(2));
given(invoker2.getInterface()).willReturn(StickyTest.class);

invocation.setMethodName(methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.junit.BeforeClass;
import org.junit.Test;

import com.alibaba.fastjson.JSON;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -114,16 +116,21 @@ public void setUp() throws Exception {

public Map<Invoker, AtomicLong> getInvokeCounter(int runs, String loadbalanceName) {
Map<Invoker, AtomicLong> counter = new ConcurrentHashMap<Invoker, AtomicLong>();
LoadBalance lb = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);
LoadBalance lb = getLoadBalance(loadbalanceName);
for (Invoker invoker : invokers) {
counter.put(invoker, new AtomicLong(0));
}
URL url = invokers.get(0).getUrl();
for (int i = 0; i < runs; i++) {
Invoker sinvoker = lb.select(invokers, invokers.get(0).getUrl(), invocation);
Invoker sinvoker = lb.select(invokers, url, invocation);
counter.get(sinvoker).incrementAndGet();
}
return counter;
}

protected AbstractLoadBalance getLoadBalance(String loadbalanceName) {
return (AbstractLoadBalance) ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);
}

@Test
public void testLoadBalanceWarmup() {
Expand Down Expand Up @@ -153,44 +160,83 @@ private static int calculateDefaultWarmupWeight(int uptime) {
}

/*------------------------------------test invokers for weight---------------------------------------*/

protected static class InvokeResult {
private AtomicLong count = new AtomicLong();
private int weight = 0;
private int totalWeight = 0;

public InvokeResult(int weight) {
this.weight = weight;
}

public AtomicLong getCount() {
return count;
}

public int getWeight() {
return weight;
}

public int getTotalWeight() {
return totalWeight;
}

public void setTotalWeight(int totalWeight) {
this.totalWeight = totalWeight;
}

public int getExpected(int runCount) {
return getWeight() * runCount / getTotalWeight();
}

public float getDeltaPercentage(int runCount) {
int expected = getExpected(runCount);
return Math.abs((expected - getCount().get()) * 100.0f / expected);
}

@Override
public String toString() {
return JSON.toJSONString(this);
}
}

protected List<Invoker<LoadBalanceBaseTest>> weightInvokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
protected Invoker<LoadBalanceBaseTest> weightInvoker1;
protected Invoker<LoadBalanceBaseTest> weightInvoker2;
protected Invoker<LoadBalanceBaseTest> weightInvoker3;
protected Invoker<LoadBalanceBaseTest> weightInvokerTmp;

@Before
public void before() throws Exception {
weightInvoker1 = mock(Invoker.class);
weightInvoker2 = mock(Invoker.class);
weightInvoker3 = mock(Invoker.class);
weightInvokerTmp = mock(Invoker.class);

weightTestInvocation = new RpcInvocation();
weightTestInvocation.setMethodName("test");

URL url1 = URL.valueOf("test1://0:1/DemoService");
url1 = url1.addParameter(Constants.WEIGHT_KEY, 1);
url1 = url1.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 1);
url1 = url1.addParameter("active", 0);

URL url2 = URL.valueOf("test2://0:9/DemoService");
url2 = url2.addParameter(Constants.WEIGHT_KEY, 9);
url2 = url2.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 9);
url2 = url2.addParameter("active", 0);

URL url3 = URL.valueOf("test3://1:6/DemoService");
url3 = url3.addParameter(Constants.WEIGHT_KEY, 6);
url3 = url3.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 6);
url3 = url3.addParameter("active", 1);
URL url1 = URL.valueOf("test1://127.0.0.1:11/DemoService?weight=1&active=0");
URL url2 = URL.valueOf("test2://127.0.0.1:12/DemoService?weight=9&active=0");
URL url3 = URL.valueOf("test3://127.0.0.1:13/DemoService?weight=6&active=1");
URL urlTmp = URL.valueOf("test4://127.0.0.1:9999/DemoService?weight=11&active=0");

given(weightInvoker1.isAvailable()).willReturn(true);
given(weightInvoker1.getInterface()).willReturn(LoadBalanceBaseTest.class);
given(weightInvoker1.getUrl()).willReturn(url1);

given(weightInvoker2.isAvailable()).willReturn(true);
given(weightInvoker2.getInterface()).willReturn(LoadBalanceBaseTest.class);
given(weightInvoker2.getUrl()).willReturn(url2);

given(weightInvoker3.isAvailable()).willReturn(true);
given(weightInvoker3.getInterface()).willReturn(LoadBalanceBaseTest.class);
given(weightInvoker3.getUrl()).willReturn(url3);

given(weightInvokerTmp.isAvailable()).willReturn(true);
given(weightInvokerTmp.getInterface()).willReturn(LoadBalanceBaseTest.class);
given(weightInvokerTmp.getUrl()).willReturn(urlTmp);

weightInvokers.add(weightInvoker1);
weightInvokers.add(weightInvoker2);
Expand All @@ -203,4 +249,25 @@ public void before() throws Exception {
// weightTestRpcStatus3 active is 1
RpcStatus.beginCount(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
}

protected Map<Invoker, InvokeResult> getWeightedInvokeResult(int runs, String loadbalanceName) {
Map<Invoker, InvokeResult> counter = new ConcurrentHashMap<Invoker, InvokeResult>();
AbstractLoadBalance lb = getLoadBalance(loadbalanceName);
int totalWeight = 0;
for (int i = 0; i < weightInvokers.size(); i ++) {
InvokeResult invokeResult = new InvokeResult(lb.getWeight(weightInvokers.get(i), weightTestInvocation));
counter.put(weightInvokers.get(i), invokeResult);
totalWeight += invokeResult.getWeight();
}
for (InvokeResult invokeResult : counter.values()) {
invokeResult.setTotalWeight(totalWeight);
}
URL url = weightInvokers.get(0).getUrl();
for (int i = 0; i < runs; i++) {
Invoker sinvoker = lb.select(weightInvokers, url, weightTestInvocation);
counter.get(sinvoker).getCount().incrementAndGet();
}
return counter;
}

}
Loading

0 comments on commit 4c107f9

Please sign in to comment.