Permalink
Browse files

Codestyle cleanups, added entry in changelog

  • Loading branch information...
1 parent 7a831b5 commit 7ad7bb97376d91ae00bb71086e9712275335a88f @pcmanus pcmanus committed Dec 9, 2013
View
@@ -7,5 +7,3 @@ testing/
doc
notes
.DS_Store
-bin/
-test-output/
@@ -7,6 +7,7 @@ CHANGELOG
- [new] Add LOCAL_ONE consistency level support (requires using C* 2.0.2+) (JAVA-207)
- [bug] Fix parsing of counter types (JAVA-219)
- [bug] Fix missing whitespace for IN clause in the query builder (JAVA-218)
+- [bug] Fix replicas computation for token aware balancing (JAVA-221)
Merged from 1.0 branch:
@@ -40,13 +40,8 @@ static ReplicationStrategy create(Map<String, String> replicationOptions) {
try {
if (strategyClass.contains("SimpleStrategy")) {
- //replication_factor is only specified for SimpleStrategy
- String repFactorString = replicationOptions.get("replication_factor");
- if (repFactorString == null) {
- return null;
- } else {
- return new SimpleStrategy(Integer.parseInt(repFactorString));
- }
+ String repFactorString = replicationOptions.get("replication_factor");
+ return repFactorString == null ? null : new SimpleStrategy(Integer.parseInt(repFactorString));
} else if (strategyClass.contains("NetworkTopologyStrategy")) {
Map<String, Integer> dcRfs = new HashMap<String, Integer>();
for (Map.Entry<String, String> entry : replicationOptions.entrySet())
@@ -87,12 +82,11 @@ private SimpleStrategy(int replicationFactor) {
Map<Token, Set<Host>> replicaMap = new HashMap<Token, Set<Host>>(tokenToPrimary.size());
for (int i = 0; i < ring.size(); i++) {
- //handle consecutive sections in the ring assigned to the same host
- Set<Host> replicas = new LinkedHashSet<Host>();
- //we stop when reached the desired RF or ran out of nodes
+ // Consecutive sections of the ring can assigned to the same host
+ Set<Host> replicas = new LinkedHashSet<Host>();
for (int j = 0; j < ring.size() && replicas.size() < rf; j++)
replicas.add(tokenToPrimary.get(getTokenWrapping(i+j, ring)));
- replicaMap.put(ring.get(i), ImmutableSet.<Host>builder().addAll(replicas).build());
+ replicaMap.put(ring.get(i), ImmutableSet.copyOf(replicas));
}
return replicaMap;
}
@@ -108,87 +102,80 @@ private NetworkTopologyStrategy(Map<String, Integer> replicationFactors) {
Map<Token, Set<Host>> computeTokenToReplicaMap(Map<Token, Host> tokenToPrimary, List<Token> ring) {
- /*
- * This is essentially a copy of org.apache.cassandra.locator.NetworkTopologyStrategy
- */
-
- final Map<String, Set<String>> racks = getRacksInDcs(tokenToPrimary.values());
- final Map<Token, Set<Host>> replicaMap = new HashMap<Token, Set<Host>>(tokenToPrimary.size());
+ // This is essentially a copy of org.apache.cassandra.locator.NetworkTopologyStrategy
+ Map<String, Set<String>> racks = getRacksInDcs(tokenToPrimary.values());
+ Map<Token, Set<Host>> replicaMap = new HashMap<Token, Set<Host>>(tokenToPrimary.size());
for (int i = 0; i < ring.size(); i++) {
Map<String, Set<Host>> allDcReplicas = new HashMap<String, Set<Host>>();
Map<String, Set<String>> seenRacks = new HashMap<String, Set<String>>();
Map<String, Set<Host>> skippedDcEndpoints = new HashMap<String, Set<Host>>();
for (String dc : replicationFactors.keySet()) {
- allDcReplicas.put(dc, new HashSet<Host>());
- seenRacks.put(dc, new HashSet<String>());
- //preserve order
- skippedDcEndpoints.put(dc, new LinkedHashSet<Host>());
+ allDcReplicas.put(dc, new HashSet<Host>());
+ seenRacks.put(dc, new HashSet<String>());
+ skippedDcEndpoints.put(dc, new LinkedHashSet<Host>()); // preserve order
}
-
- //preserve order - primary replica will be first
+
+ // Preserve order - primary replica will be first
Set<Host> replicas = new LinkedHashSet<Host>();
- //we stop the inner iteration if all DCs have enough replicas or we reach the end of the ring
for (int j = 0; j < ring.size() && !allDone(allDcReplicas); j++) {
Host h = tokenToPrimary.get(getTokenWrapping(i + j, ring));
String dc = h.getDatacenter();
if (dc == null)
continue;
Integer rf = replicationFactors.get(dc);
- Set<Host> dcReplicas = allDcReplicas.get(dc);
+ Set<Host> dcReplicas = allDcReplicas.get(dc);
if (dcReplicas.size() >= rf)
continue;
- //check if we already visited all racks in dc
- if (seenRacks.get(dc).size() == racks.get(dc).size()) {
- replicas.add(h);
- dcReplicas.add(h);
+ String rack = h.getRack();
+ // Check if we already visited all racks in dc
+ if (rack == null || seenRacks.get(dc).size() == racks.get(dc).size()) {
+ replicas.add(h);
+ dcReplicas.add(h);
} else {
- String rack = h.getRack();
- //is this a new rack?
- if (seenRacks.get(dc).contains(rack)) {
- skippedDcEndpoints.get(dc).add(h);
- } else {
- replicas.add(h);
- dcReplicas.add(h);
- seenRacks.get(dc).add(rack);
- //check if we have run out of all racks
- //if yes, add all those nodes that we skipped so far
- if (seenRacks.get(dc).size() == racks.get(dc).size()) {
- Iterator<Host> skippedIt = skippedDcEndpoints.get(dc).iterator();
- while (skippedIt.hasNext() && dcReplicas.size() < rf) {
- Host nextSkipped = skippedIt.next();
- replicas.add(nextSkipped);
- dcReplicas.add(nextSkipped);
- }
- }
- }
+ // Is this a new rack?
+ if (seenRacks.get(dc).contains(rack)) {
+ skippedDcEndpoints.get(dc).add(h);
+ } else {
+ replicas.add(h);
+ dcReplicas.add(h);
+ seenRacks.get(dc).add(rack);
+ // If we've run out of distinct racks, add the nodes skipped so far
+ if (seenRacks.get(dc).size() == racks.get(dc).size()) {
+ Iterator<Host> skippedIt = skippedDcEndpoints.get(dc).iterator();
+ while (skippedIt.hasNext() && dcReplicas.size() < rf) {
+ Host nextSkipped = skippedIt.next();
+ replicas.add(nextSkipped);
+ dcReplicas.add(nextSkipped);
+ }
+ }
+ }
}
}
- replicaMap.put(ring.get(i), ImmutableSet.<Host>builder().addAll(replicas).build());
+ replicaMap.put(ring.get(i), ImmutableSet.copyOf(replicas));
}
return replicaMap;
}
- private boolean allDone(Map<String, Set<Host>> map)
- {
+ private boolean allDone(Map<String, Set<Host>> map) {
for (Map.Entry<String, Set<Host>> entry : map.entrySet())
if (entry.getValue().size() < replicationFactors.get(entry.getKey()))
return false;
return true;
}
-
+
private Map<String, Set<String>> getRacksInDcs(Iterable<Host> hosts) {
- Map<String, Set<String>> result = new HashMap<String, Set<String>>();
- for (Host host : hosts) {
- Set<String> racks = result.get(host.getDatacenter());
- if (racks == null) {
- racks = new HashSet<String>();
- result.put(host.getDatacenter(), racks);
- }
- racks.add(host.getRack());
- }
- return result;
+ Map<String, Set<String>> result = new HashMap<String, Set<String>>();
+ for (Host host : hosts) {
+ Set<String> racks = result.get(host.getDatacenter());
+ if (racks == null) {
+ racks = new HashSet<String>();
+ result.put(host.getDatacenter(), racks);
+ }
+ racks.add(host.getRack());
+ }
+ return result;
}
}
}
Oops, something went wrong.

0 comments on commit 7ad7bb9

Please sign in to comment.