Permalink
Browse files

fix #151 fix #215

  • Loading branch information...
1 parent 405003a commit 07d3b305766691e75bb4e00b51dc03540853f3d9 @teaey teaey committed Apr 9, 2016
@@ -13,70 +13,91 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.alibaba.dubbo.rpc.cluster.loadbalance;
-
-import java.util.ArrayList;
+package com.alibaba.dubbo.rpc.cluster.loadbalance;
+
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
-
-/**
- * Round robin load balance.
- *
- * @author qian.lei
- * @author william.liangf
- */
+
+/**
+ * Round robin load balance.
+ *
+ * @author qian.lei
+ * @author william.liangf
+ */
public class RoundRobinLoadBalance extends AbstractLoadBalance {
-
- public static final String NAME = "roundrobin";
-
- private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
- private final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
-
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- int length = invokers.size(); // 总个数
- int maxWeight = 0; // 最大权重
- int minWeight = Integer.MAX_VALUE; // 最小权重
- for (int i = 0; i < length; i++) {
- int weight = getWeight(invokers.get(i), invocation);
- maxWeight = Math.max(maxWeight, weight); // 累计最大权重
- minWeight = Math.min(minWeight, weight); // 累计最小权重
- }
- if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
- AtomicPositiveInteger weightSequence = weightSequences.get(key);
- if (weightSequence == null) {
- weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
- weightSequence = weightSequences.get(key);
- }
- int currentWeight = weightSequence.getAndIncrement() % maxWeight;
- List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
- for (Invoker<T> invoker : invokers) { // 筛选权重大于当前权重基数的Invoker
- if (getWeight(invoker, invocation) > currentWeight) {
- weightInvokers.add(invoker);
- }
- }
- int weightLength = weightInvokers.size();
- if (weightLength == 1) {
- return weightInvokers.get(0);
- } else if (weightLength > 1) {
- invokers = weightInvokers;
- length = invokers.size();
- }
- }
- AtomicPositiveInteger sequence = sequences.get(key);
- if (sequence == null) {
- sequences.putIfAbsent(key, new AtomicPositiveInteger());
- sequence = sequences.get(key);
- }
- // 取模轮循
- return invokers.get(sequence.getAndIncrement() % length);
- }
-
+ public static final String NAME = "roundrobin";
+
+ private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
+
+ private static final class IntegerWrapper {
+ public IntegerWrapper(int value) {
+ this.value = value;
+ }
+
+ private int value;
+
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int value) {
+ this.value = value;
+ }
+
+ public void decrement() {
+ this.value--;
+ }
+ }
+
+ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
+ String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
+ int length = invokers.size(); // 总个数
+ int maxWeight = 0; // 最大权重
+ int minWeight = Integer.MAX_VALUE; // 最小权重
+ final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
+ int weightSum = 0;
+ for (int i = 0; i < length; i++) {
+ int weight = getWeight(invokers.get(i), invocation);
+ maxWeight = Math.max(maxWeight, weight); // 累计最大权重
+ minWeight = Math.min(minWeight, weight); // 累计最小权重
+ if (weight > 0) {
+ invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
+ weightSum += weight;
+ }
+ }
+ AtomicPositiveInteger sequence = sequences.get(key);
+ if (sequence == null) {
+ sequences.putIfAbsent(key, new AtomicPositiveInteger());
+ sequence = sequences.get(key);
+ }
+ int currentSequence = sequence.getAndIncrement();
+ if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
+ int mod = currentSequence % weightSum;
+ for (int i = 0; i < maxWeight; i++) {
+ for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
+ final Invoker<T> k = each.getKey();
+ final IntegerWrapper v = each.getValue();
+ if (mod == 0 && v.getValue() > 0) {
+ return k;
+ }
+ if (v.getValue() > 0) {
+ v.decrement();
+ mod--;
+ }
+ }
+ }
+ }
+ // 取模轮循
+ return invokers.get(currentSequence % length);
+ }
+
}
@@ -207,7 +207,7 @@ protected synchronized void doExport() {
generic = Boolean.FALSE.toString();
}
if(local !=null){
- if(local=="true"){
+ if("true".equals(local)){
local=interfaceName+"Local";
}
Class<?> localClass;
@@ -221,7 +221,7 @@ protected synchronized void doExport() {
}
}
if(stub !=null){
- if(stub=="true"){
+ if("true".equals(stub)){
stub=interfaceName+"Stub";
}
Class<?> stubClass;
@@ -54,7 +54,7 @@ public void reset(URL url) {
+ url + ", cause: Channel closed. channel: " + getLocalAddress());
}
try {
- if (url.hasParameter(Constants.HEARTBEAT_KEY)) {
+ if (url.hasParameter(Constants.TIMEOUT_KEY)) {
int t = url.getParameter(Constants.TIMEOUT_KEY, 0);
if (t > 0) {
this.timeout = t;
View
@@ -537,5 +537,14 @@
</roles>
<timezone>+8</timezone>
</developer>
+ <developer>
+ <name>WuXiaoFei(Teaey)</name>
+ <id>xiaofei.wxf</id>
+ <email>xiaofei.wxf (AT) alibaba-inc.com</email>
+ <roles>
+ <role>Developer</role>
+ </roles>
+ <timezone>+8</timezone>
+ </developer>
</developers>
</project>

0 comments on commit 07d3b30

Please sign in to comment.