-
Notifications
You must be signed in to change notification settings - Fork 94
/
TokenAwareSelection.java
203 lines (166 loc) · 7.05 KB
/
TokenAwareSelection.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/*******************************************************************************
* Copyright 2011 Netflix
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package com.netflix.dyno.connectionpool.impl.lb;
import com.netflix.dyno.connectionpool.BaseOperation;
import com.netflix.dyno.connectionpool.HashPartitioner;
import com.netflix.dyno.connectionpool.HostConnectionPool;
import com.netflix.dyno.connectionpool.Operation;
import com.netflix.dyno.connectionpool.exception.NoAvailableHostsException;
import com.netflix.dyno.connectionpool.impl.HostSelectionStrategy;
import com.netflix.dyno.connectionpool.impl.hash.BinarySearchTokenMapper;
import com.netflix.dyno.connectionpool.impl.hash.Murmur1HashPartitioner;
import com.netflix.dyno.connectionpool.impl.utils.CollectionUtils;
import com.netflix.dyno.connectionpool.impl.utils.CollectionUtils.Transform;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Concrete implementation of the {@link HostSelectionStrategy} interface using
* the TOKEN AWARE algorithm. Note that this component needs to be aware of the
* dynomite ring topology to be able to successfully map to the correct token
* owner for any key of an {@link Operation}
*
* @param <CL>
* @author poberai
* @author ipapapa
*/
public class TokenAwareSelection<CL> implements HostSelectionStrategy<CL> {
private final BinarySearchTokenMapper tokenMapper;
private final ConcurrentHashMap<Long, HostConnectionPool<CL>> tokenPools = new ConcurrentHashMap<Long, HostConnectionPool<CL>>();
public TokenAwareSelection() {
this(new Murmur1HashPartitioner());
}
public TokenAwareSelection(HashPartitioner hashPartitioner) {
this.tokenMapper = new BinarySearchTokenMapper(hashPartitioner);
}
@Override
public void initWithHosts(Map<HostToken, HostConnectionPool<CL>> hPools) {
tokenPools.putAll(CollectionUtils.transformMapKeys(hPools, new Transform<HostToken, Long>() {
@Override
public Long get(HostToken x) {
return x.getToken();
}
}));
this.tokenMapper.initSearchMechanism(hPools.keySet());
}
/**
* Identifying the proper pool for the operation. A couple of things that may affect the decision
* (a) hashtags: In this case we will construct the key by decomposing from the hashtag
* (b) type of key: string keys vs binary keys.
* In binary keys hashtags do not really matter.
*/
@Override
public HostConnectionPool<CL> getPoolForOperation(BaseOperation<CL, ?> op, String hashtag) throws NoAvailableHostsException {
String key = op.getStringKey();
HostConnectionPool<CL> hostPool;
HostToken hToken;
if (key != null) {
// If a hashtag is provided by Dynomite then we use that to create the key to hash.
if (hashtag == null || hashtag.isEmpty()) {
hToken = this.getTokenForKey(key);
} else {
String hashValue = StringUtils.substringBetween(key, Character.toString(hashtag.charAt(0)), Character.toString(hashtag.charAt(1)));
hToken = this.getTokenForKey(hashValue);
}
if (hToken == null) {
throw new NoAvailableHostsException("Token not found for key " + key);
}
hostPool = tokenPools.get(hToken.getToken());
if (hostPool == null) {
throw new NoAvailableHostsException(
"Could not find host connection pool for key: " + key + ", hash: " + tokenMapper.hash(key) + " Token:" + hToken.getToken());
}
} else {
// the key is binary
byte[] binaryKey = op.getBinaryKey();
hToken = this.getTokenForKey(binaryKey);
if (hToken == null) {
throw new NoAvailableHostsException("Token not found for key " + binaryKey.toString());
}
hostPool = tokenPools.get(hToken.getToken());
if (hostPool == null) {
throw new NoAvailableHostsException(
"Could not find host connection pool for key: " + binaryKey.toString() + ", hash: " + tokenMapper.hash(binaryKey) + " Token:" + getTokenForKey(binaryKey));
}
}
return hostPool;
}
@Override
public Map<HostConnectionPool<CL>, BaseOperation<CL, ?>> getPoolsForOperationBatch(
Collection<BaseOperation<CL, ?>> ops) throws NoAvailableHostsException {
throw new RuntimeException("Not Implemented");
}
@Override
public List<HostConnectionPool<CL>> getOrderedHostPools() {
return new ArrayList<HostConnectionPool<CL>>(tokenPools.values());
}
@Override
public HostConnectionPool<CL> getPoolForToken(Long token) {
return tokenPools.get(token);
}
public List<HostConnectionPool<CL>> getPoolsForTokens(Long start, Long end) {
throw new RuntimeException("Not Implemented");
}
@Override
public HostToken getTokenForKey(String key) throws UnsupportedOperationException {
Long keyHash = tokenMapper.hash(key);
return tokenMapper.getToken(keyHash);
}
@Override
public HostToken getTokenForKey(byte[] key) throws UnsupportedOperationException {
Long keyHash = tokenMapper.hash(key);
return tokenMapper.getToken(keyHash);
}
@Override
public boolean addHostPool(HostToken hostToken, HostConnectionPool<CL> hostPool) {
HostConnectionPool<CL> prevPool = tokenPools.put(hostToken.getToken(), hostPool);
if (prevPool == null) {
tokenMapper.addHostToken(hostToken);
return true;
} else {
return false;
}
}
@Override
public boolean removeHostPool(HostToken hostToken) {
HostConnectionPool<CL> prev = tokenPools.get(hostToken.getToken());
if (prev != null) {
tokenPools.remove(hostToken.getToken());
return true;
} else {
return false;
}
}
@Override
public boolean isTokenAware() {
return true;
}
@Override
public boolean isEmpty() {
return tokenPools.isEmpty();
}
public Long getKeyHash(String key) {
Long keyHash = tokenMapper.hash(key);
return keyHash;
}
@Override
public String toString() {
return "TokenAwareSelection: " + tokenMapper.toString();
}
}