Skip to content
Permalink
Browse files
Merge pull request #97 from complone/master
Provide an example for Dubbo load balancing
  • Loading branch information
beiwei30 committed Aug 6, 2019
2 parents dee98f4 + f3ed81f commit c1715d68908d625117e7cce1c5dd32ac83a5ca8c
Showing 9 changed files with 236 additions and 0 deletions.
@@ -0,0 +1,31 @@
# Dubbo-Uniformity HashSelector

Design and implementation of Dubbo adaptive load balancing with priority random load




## Sample introduction



> Dubbo-based stochastic load balancing



- The random strategy will first determine whether all Invokers have the same weight.

- If they are the same, then the process is relatively simple. Using random. nexInt (length) you can randomly generate an Invoker serial number

- Select the corresponding Invoker according to the serial number. If no weight is set on the service provider, then all Invoker weights are the same, default is 100.

- If the weights are different, then we need to combine the weights to set the random probability.



> Random Load Balancing after Modification
- Get the address of a service from the locally currently registered provider service, and if it is the same as the IP of the current machine, get the provider first.

- If different, re-load the random load and retrieve a provider service
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<groupId>org.apache.dubbo</groupId>
<version>1.0-SNAPSHOT</version>

<modelVersion>4.0.0</modelVersion>

<artifactId>dubbo-samples-gateway</artifactId>

<properties>
<source.level>1.8</source.level>
<target.level>1.8</target.level>
<dubbo.version>2.7.1</dubbo.version>
<junit.version>4.12</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>${dubbo.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-zookeeper</artifactId>
<version>${dubbo.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-samples-callback</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,14 @@
package org.apache.dubbo.samples;


import org.apache.dubbo.samples.callback.api.CallbackListener;


public class CallbackListenerImpl implements CallbackListener {

@Override
public void changed(String msg) {
System.out.println("receive msg from server :" + msg);
}

}
@@ -0,0 +1,28 @@
package org.apache.dubbo.samples;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;


@Activate(group = Constants.CONSUMER)
public class TestClientFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try{
Result result = invoker.invoke(invocation);
return result;
}catch (Exception e){
throw e;
}
}

@Override
public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
return result;
}
}
@@ -0,0 +1,37 @@
package org.apache.dubbo.samples;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance;

import java.net.InetAddress;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;


public class UserLoadBalance extends RandomLoadBalance {

@Override
public <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
// 循环当前注册的多个invoker 优先选择与自己IP相同的服务
for (Invoker t : invokers) {
try {
InetAddress addr = InetAddress.getLocalHost();
String ip=addr.getHostAddress().toString(); //获取本机ip
URL u = t.getUrl();
if (u.getIp().equals(ip)) {
return t;
}
} catch (Exception e) {
}
}
return super.doSelect(invokers, url, invocation);
}
}
@@ -0,0 +1 @@
org.apache.dubbo.samples.TestClientFilter
@@ -0,0 +1 @@
org.apache.dubbo.samples.UserLoadBalance
@@ -0,0 +1 @@
org.apache.dubbo.samples.CallbackListenerImpl
@@ -0,0 +1,77 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcStatus;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance;
import org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance;
import org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance;
import org.apache.dubbo.samples.UserLoadBalance;
import org.junit.Assert;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;


public class UserLoadBalanceTest {

Invocation invocation;

Invoker<UserLoadBalanceTest> invoker1;
Invoker<UserLoadBalanceTest> invoker2;
Invoker<UserLoadBalanceTest> invoker3;
Invoker<UserLoadBalanceTest> invoker4;
Invoker<UserLoadBalanceTest> invoker5;

List<Invoker<UserLoadBalanceTest>> invokers = new ArrayList<Invoker<UserLoadBalanceTest>>();
@org.junit.Test
public void doSelect() {

int runs = 1000;
Map<Invoker, AtomicLong> counter = getInvokeCounter(runs, RandomLoadBalance.NAME);
for (Map.Entry<Invoker, AtomicLong> entry : counter.entrySet()) {
Long count = entry.getValue().get();
Assert.assertTrue("abs diff should < avg",Math.abs(count - runs / (0f + invokers.size())) < runs / (0f + invokers.size()));
}

for (int i = 0; i < 5; i++) {
for (int j = 0; j <= i; j++) {
RpcStatus.beginCount(invokers.get(i).getUrl(), invocation.getMethodName());
}
}
counter = getInvokeCounter(runs, LeastActiveLoadBalance.NAME);
for (Map.Entry<Invoker, AtomicLong> entry : counter.entrySet()) {
Long count = entry.getValue().get();
}
Assert.assertEquals(runs, counter.get(invoker1).intValue());
Assert.assertEquals(0, counter.get(invoker2).intValue());
Assert.assertEquals(0, counter.get(invoker3).intValue());
Assert.assertEquals(0, counter.get(invoker4).intValue());
Assert.assertEquals(0, counter.get(invoker5).intValue());
}


public Map<Invoker, AtomicLong> getInvokeCounter(int runs, String loadbalanceName) {
Map<Invoker, AtomicLong> counter = new ConcurrentHashMap<Invoker, AtomicLong>();
LoadBalance lb = getLoadBalance(loadbalanceName);
for (Invoker invoker : invokers) {
counter.put(invoker, new AtomicLong(0));
}
URL url = invokers.get(0).getUrl();
for (int i = 0; i < runs; i++) {
Invoker sinvoker = lb.select(invokers, url, invocation);
counter.get(sinvoker).incrementAndGet();
}
return counter;
}

protected AbstractLoadBalance getLoadBalance(String loadbalanceName) {
return (AbstractLoadBalance) ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);
}

}

0 comments on commit c1715d6

Please sign in to comment.