-
Notifications
You must be signed in to change notification settings - Fork 2k
/
ConsistentHashLoadBalance.java
91 lines (73 loc) · 3.27 KB
/
ConsistentHashLoadBalance.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package github.javaguide.loadbalance.loadbalancer;
import github.javaguide.loadbalance.AbstractLoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
/**
* refer to dubbo consistent hash load balance: https://github.com/apache/dubbo/blob/2d9583adf26a2d8bd6fb646243a9fe80a77e65d5/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
*
* @author RicardoZ
* @createTime 2020年10月20日 18:15:20
*/
@Slf4j
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();
@Override
protected String doSelect(List<String> serviceAddresses, String rpcServiceName) {
int identityHashCode = System.identityHashCode(serviceAddresses);
ConsistentHashSelector selector = selectors.get(rpcServiceName);
// check for updates
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
selector = selectors.get(rpcServiceName);
}
return selector.select(rpcServiceName);
}
static class ConsistentHashSelector {
private final TreeMap<Long, String> virtualInvokers;
private final int identityHashCode;
ConsistentHashSelector(List<String> invokers, int replicaNumber, int identityHashCode) {
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;
for (String invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(invoker + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
static byte[] md5(String key) {
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
md.update(bytes);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
return md.digest();
}
static long hash(byte[] digest, int idx) {
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}
public String select(String rpcServiceName) {
byte[] digest = md5(rpcServiceName);
return selectForKey(hash(digest, 0));
}
public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}
}