Skip to content

Commit

Permalink
Optimize leastActiveSelect and weight test case (#2172)
Browse files Browse the repository at this point in the history
* Now the select is select a random one when there are several least active invokers and all of them are in warm up.
After this pr, it will select also by weight and warm up.

And fix a bug when two invoker's active is same and weight not same.

* weight test for all the loadBalance.

* optimize typo

* optimize unit test
  • Loading branch information
carryxyh authored and zonghaishang committed Sep 27, 2018
1 parent 733a376 commit 6edfb1d
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 13 deletions.
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand All @@ -27,7 +26,6 @@

/**
* LeastActiveLoadBalance
*
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {

Expand All @@ -39,26 +37,26 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of weights
int totalWeight = 0; // The sum of with warmup weights
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
int afterWarmup = getWeight(invoker, invocation);
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexes[0] = i; // Reset
totalWeight = weight; // Reset
firstWeight = weight; // Record the weight the first invoker
totalWeight = afterWarmup; // Reset
firstWeight = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexes[leastCount++] = i; // Record index number of this invoker
totalWeight += weight; // Add this invoker's weight to totalWeight.
totalWeight += afterWarmup; // Add this invoker's with warmup weight to totalWeight.
// If every invoker has the same weight?
if (sameWeight && i > 0
&& weight != firstWeight) {
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
Expand All @@ -70,7 +68,7 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
Expand Down
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.rpc.Invoker;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
Expand All @@ -39,4 +38,31 @@ public void testLeastActiveLoadBalance_select() {
}
}

@Test
public void testSelectByWeight() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
int loop = 10000;

LeastActiveLoadBalance lb = new LeastActiveLoadBalance();
for (int i = 0; i < loop; i++) {
Invoker selected = lb.select(weightInvokers, null, weightTestInvocation);

if (selected.getUrl().getProtocol().equals("test1")) {
sumInvoker1++;
}

if (selected.getUrl().getProtocol().equals("test2")) {
sumInvoker2++;
}
// never select invoker3 because it's active is more than invoker1 and invoker2
Assert.assertTrue("select is not the least active one", !selected.getUrl().getProtocol().equals("test3"));
}

// the sumInvoker1 : sumInvoker2 approximately equal to 1: 9
System.out.println(sumInvoker1);
System.out.println(sumInvoker2);

Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2, loop);
}
}
Expand Up @@ -21,8 +21,9 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcStatus;
import org.apache.dubbo.rpc.cluster.LoadBalance;

import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -50,6 +51,12 @@ public class LoadBalanceBaseTest {
Invoker<LoadBalanceBaseTest> invoker4;
Invoker<LoadBalanceBaseTest> invoker5;

RpcStatus weightTestRpcStatus1;
RpcStatus weightTestRpcStatus2;
RpcStatus weightTestRpcStatus3;

RpcInvocation weightTestInvocation;

/**
* @throws java.lang.Exception
*/
Expand Down Expand Up @@ -145,4 +152,55 @@ private static int calculateDefaultWarmupWeight(int uptime) {
return AbstractLoadBalance.calculateWarmupWeight(uptime, Constants.DEFAULT_WARMUP, Constants.DEFAULT_WEIGHT);
}

/*------------------------------------test invokers for weight---------------------------------------*/

protected List<Invoker<LoadBalanceBaseTest>> weightInvokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
protected Invoker<LoadBalanceBaseTest> weightInvoker1;
protected Invoker<LoadBalanceBaseTest> weightInvoker2;
protected Invoker<LoadBalanceBaseTest> weightInvoker3;

@Before
public void before() throws Exception {
weightInvoker1 = mock(Invoker.class);
weightInvoker2 = mock(Invoker.class);
weightInvoker3 = mock(Invoker.class);

weightTestInvocation = new RpcInvocation();
weightTestInvocation.setMethodName("test");

URL url1 = URL.valueOf("test1://0:1/DemoService");
url1 = url1.addParameter(Constants.WEIGHT_KEY, 1);
url1 = url1.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 1);
url1 = url1.addParameter("active", 0);

URL url2 = URL.valueOf("test2://0:9/DemoService");
url2 = url2.addParameter(Constants.WEIGHT_KEY, 9);
url2 = url2.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 9);
url2 = url2.addParameter("active", 0);

URL url3 = URL.valueOf("test3://1:6/DemoService");
url3 = url3.addParameter(Constants.WEIGHT_KEY, 6);
url3 = url3.addParameter(weightTestInvocation.getMethodName() + "." + Constants.WEIGHT_KEY, 6);
url3 = url3.addParameter("active", 1);

given(weightInvoker1.isAvailable()).willReturn(true);
given(weightInvoker1.getUrl()).willReturn(url1);

given(weightInvoker2.isAvailable()).willReturn(true);
given(weightInvoker2.getUrl()).willReturn(url2);

given(weightInvoker3.isAvailable()).willReturn(true);
given(weightInvoker3.getUrl()).willReturn(url3);

weightInvokers.add(weightInvoker1);
weightInvokers.add(weightInvoker2);
weightInvokers.add(weightInvoker3);

weightTestRpcStatus1 = RpcStatus.getStatus(weightInvoker1.getUrl(), weightTestInvocation.getMethodName());
weightTestRpcStatus2 = RpcStatus.getStatus(weightInvoker2.getUrl(), weightTestInvocation.getMethodName());
weightTestRpcStatus3 = RpcStatus.getStatus(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());

// weightTestRpcStatus3 active is 1
RpcStatus.beginCount(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
}
}
Expand Up @@ -18,7 +18,6 @@

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcStatus;

import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -54,4 +53,35 @@ public void testRandomLoadBalanceSelect() {
Assert.assertEquals(0, counter.get(invoker5).intValue());
}

@Test
public void testSelectByWeight() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
int sumInvoker3 = 0;
int loop = 10000;

RandomLoadBalance lb = new RandomLoadBalance();
for (int i = 0; i < loop; i++) {
Invoker selected = lb.select(weightInvokers, null, weightTestInvocation);

if (selected.getUrl().getProtocol().equals("test1")) {
sumInvoker1++;
}

if (selected.getUrl().getProtocol().equals("test2")) {
sumInvoker2++;
}

if (selected.getUrl().getProtocol().equals("test3")) {
sumInvoker3++;
}
}

// 1 : 9 : 6
System.out.println(sumInvoker1);
System.out.println(sumInvoker2);
System.out.println(sumInvoker3);
Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2 + sumInvoker3, loop);
}

}
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.rpc.Invoker;

import org.junit.Assert;
import org.junit.Test;

Expand All @@ -34,4 +33,36 @@ public void testRoundRobinLoadBalanceSelect() {
Assert.assertTrue("abs diff should < 1", Math.abs(count - runs / (0f + invokers.size())) < 1f);
}
}

@Test
public void testSelectByWeight() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
int sumInvoker3 = 0;
int loop = 10000;

RoundRobinLoadBalance lb = new RoundRobinLoadBalance();
for (int i = 0; i < loop; i++) {
Invoker selected = lb.select(weightInvokers, null, weightTestInvocation);

if (selected.getUrl().getProtocol().equals("test1")) {
sumInvoker1++;
}

if (selected.getUrl().getProtocol().equals("test2")) {
sumInvoker2++;
}

if (selected.getUrl().getProtocol().equals("test3")) {
sumInvoker3++;
}
}

// 1 : 9 : 6
System.out.println(sumInvoker1);
System.out.println(sumInvoker2);
System.out.println(sumInvoker3);
Assert.assertEquals("select failed!", sumInvoker1 + sumInvoker2 + sumInvoker3, loop);
}

}

0 comments on commit 6edfb1d

Please sign in to comment.