From e510c4c8cb3fff57701ec2d0d1d4bffa69d298bf Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Mon, 17 Sep 2018 15:13:05 +0200 Subject: [PATCH 01/13] Consolidate batch write code Patch by Alex Petrov; reviewed by TBD for CASSANDRA-14742 --- .../cassandra/locator/ReplicaPlans.java | 63 ++++++++++++++++--- .../cassandra/service/StorageProxy.java | 52 ++++----------- 2 files changed, 68 insertions(+), 47 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 3d56a73c6da5..c640c936266a 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -21,7 +21,13 @@ import com.carrotsearch.hppc.ObjectIntOpenHashMap; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; @@ -31,12 +37,17 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.function.Predicate; import static com.google.common.collect.Iterables.any; @@ -173,26 +184,62 @@ public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace keysp return forSingleReplicaWrite(keyspace, token, replica); } + public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() + { + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); + Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); + + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( + EndpointsForToken.of(token, localSystemReplica), + EndpointsForToken.empty(token) + ); + + return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); + } + /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. - * - * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO. - * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear. */ - public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection endpoints) throws UnavailableException + public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); + Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); + String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + + // Replicas are picked manually: + // - replicas should be alive according to the failure detector + // - replicas should be in the local datacenter + // - choose min(2, number of qualifying candiates above) + // - allow the local node to be the only replica only if it's a single-node DC + Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + + if (chosenEndpoints.isEmpty()) + { + if (consistencyLevel == ConsistencyLevel.ANY) + chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); + else + throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); + } + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - SystemReplicas.getSystemReplicas(endpoints).forToken(token), + SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), EndpointsForToken.empty(token) ); - ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + + // Batchlog is hosted by either one node or two nodes from different racks. + consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + + Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); // assume that we have already been given live endpoints, and skip applying the failure detector - return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll); + ReplicaPlan.ForTokenWrite result = forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll); + result.assureSufficientReplicas(); + return result; } public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c6315ff4d3fb..c8a28e941fe3 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -780,8 +780,9 @@ public static void mutateMV(ByteBuffer dataKey, Collection mutations, ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; //Since the base -> view replication is 1:1 we only need to store the BL locally - final Collection batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite(); + BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -920,10 +921,11 @@ public static void mutateAtomically(Collection mutations, batchConsistencyLevel = consistency_level; } - final Collection batchlogEndpoints = getBatchlogReplicas(localDataCenter, batchConsistencyLevel); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(localDataCenter, batchConsistencyLevel); + final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -939,7 +941,7 @@ public static void mutateAtomically(Collection mutations, } // write to the batchlog - syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime); + syncWriteToBatchlog(mutations, replicaPlan, batchUUID, queryStartNanoTime); // now actually perform the writes and wait for them to complete syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION); @@ -998,11 +1000,9 @@ private static void updateCoordinatorWriteLatencyTableMetric(Collection mutations, Collection endpoints, UUID uuid, long queryStartNanoTime) + private static void syncWriteToBatchlog(Collection mutations, ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException { - Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); - ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints); WriteResponseHandler handler = new WriteResponseHandler(replicaPlan, WriteType.BATCH_LOG, queryStartNanoTime); @@ -1021,18 +1021,18 @@ private static void syncWriteToBatchlog(Collection mutations, Collecti handler.get(); } - private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) + private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPlan, UUID uuid) { MessageOut message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); - for (InetAddressAndPort target : endpoints) + for (Replica target : replicaPlan.contacts()) { if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); - if (target.equals(FBUtilities.getBroadcastAddressAndPort())) - performLocally(Stage.MUTATION, SystemReplicas.getSystemReplica(target), () -> BatchlogManager.remove(uuid)); + if (target.isLocal()) + performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid)); else - MessagingService.instance().sendOneWay(message, target); + MessagingService.instance().sendOneWay(message, target.endpoint()); } } @@ -1162,32 +1162,6 @@ private static class WriteResponseHandlerWrapper } } - /* - * Replicas are picked manually: - * - replicas should be alive according to the failure detector - * - replicas should be in the local datacenter - * - choose min(2, number of qualifying candiates above) - * - allow the local node to be the only replica only if it's a single-node DC - */ - private static Collection getBatchlogReplicas(String localDataCenter, ConsistencyLevel consistencyLevel) - throws UnavailableException - { - TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); - Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); - String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); - - Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); - if (chosenEndpoints.isEmpty()) - { - if (consistencyLevel == ConsistencyLevel.ANY) - return Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); - - throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); - } - - return chosenEndpoints; - } - /** * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node * is not available. From 6837f1b0f24bb8a1ef00b21c5ffa3f1eebcaa1b3 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:29:09 +0200 Subject: [PATCH 02/13] Move endpoint filter to ReplicaPlans --- .../cassandra/batchlog/BatchlogManager.java | 89 ------------------- .../cassandra/locator/ReplicaPlans.java | 76 +++++++++++++++- .../batchlog/BatchlogEndpointFilterTest.java | 65 ++++++-------- 3 files changed, 100 insertions(+), 130 deletions(-) diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 77f725ce3596..91129ede951f 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -527,93 +527,4 @@ public void response(MessageIn m) } } } - - public static class EndpointFilter - { - private final String localRack; - private final Multimap endpoints; - - public EndpointFilter(String localRack, Multimap endpoints) - { - this.localRack = localRack; - this.endpoints = endpoints; - } - - /** - * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - */ - public Collection filter() - { - // special case for single-node data centers - if (endpoints.values().size() == 1) - return endpoints.values(); - - // strip out dead endpoints and localhost - ListMultimap validated = ArrayListMultimap.create(); - for (Map.Entry entry : endpoints.entries()) - if (isValid(entry.getValue())) - validated.put(entry.getKey(), entry.getValue()); - - if (validated.size() <= 2) - return validated.values(); - - if (validated.size() - validated.get(localRack).size() >= 2) - { - // we have enough endpoints in other racks - validated.removeAll(localRack); - } - - if (validated.keySet().size() == 1) - { - /* - * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) - * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack - * because of the preceding if block. - */ - List otherRack = Lists.newArrayList(validated.values()); - shuffle(otherRack); - return otherRack.subList(0, 2); - } - - // randomize which racks we pick from if more than 2 remaining - Collection racks; - if (validated.keySet().size() == 2) - { - racks = validated.keySet(); - } - else - { - racks = Lists.newArrayList(validated.keySet()); - shuffle((List) racks); - } - - // grab a random member of up to two racks - List result = new ArrayList<>(2); - for (String rack : Iterables.limit(racks, 2)) - { - List rackMembers = validated.get(rack); - result.add(rackMembers.get(getRandomInt(rackMembers.size()))); - } - - return result; - } - - @VisibleForTesting - protected boolean isValid(InetAddressAndPort input) - { - return !input.equals(FBUtilities.getBroadcastAddressAndPort()) && FailureDetector.instance.isAlive(input); - } - - @VisibleForTesting - protected int getRandomInt(int bound) - { - return ThreadLocalRandom.current().nextInt(bound); - } - - @VisibleForTesting - protected void shuffle(List list) - { - Collections.shuffle(list); - } - } } diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index c640c936266a..db37aac5c842 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -216,7 +216,7 @@ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, // - replicas should be in the local datacenter // - choose min(2, number of qualifying candiates above) // - allow the local node to be the only replica only if it's a single-node DC - Collection chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + Collection chosenEndpoints = filterBatchlogEndpoints(localRack, localEndpoints); if (chosenEndpoints.isEmpty()) { @@ -242,6 +242,80 @@ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, return result; } + private static Collection filterBatchlogEndpoints(String localRack, + Multimap endpoints) + { + return filterBatchlogEndpoints(localRack, + endpoints, + Collections::shuffle, + FailureDetector.isEndpointAlive, + ThreadLocalRandom.current()::nextInt); + } + + // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + public static Collection filterBatchlogEndpoints(String localRack, + Multimap endpoints, + Consumer> shuffle, + Predicate isAlive, + Function indexPicker) + { + // special case for single-node data centers + if (endpoints.values().size() == 1) + return endpoints.values(); + + // strip out dead endpoints and localhost + ListMultimap validated = ArrayListMultimap.create(); + for (Map.Entry entry : endpoints.entries()) + { + InetAddressAndPort addr = entry.getValue(); + if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) + validated.put(entry.getKey(), entry.getValue()); + } + + if (validated.size() <= 2) + return validated.values(); + + if (validated.size() - validated.get(localRack).size() >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + /* + * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) + * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack + * because of the preceding if block. + */ + List otherRack = Lists.newArrayList(validated.values()); + shuffle.accept(otherRack); + return otherRack.subList(0, 2); + } + + // randomize which racks we pick from if more than 2 remaining + Collection racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + shuffle.accept((List) racks); + } + + // grab a random member of up to two racks + List result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List rackMembers = validated.get(rack); + result.add(rackMembers.get(indexPicker.apply(rackMembers.size()))); + } + + return result; + } + public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException { return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector); diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java index 41564d920256..0e3285c55e0b 100644 --- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java @@ -28,9 +28,12 @@ import org.junit.matchers.JUnitMatchers; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; public class BatchlogEndpointFilterTest { @@ -47,10 +50,10 @@ public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException .put("2", InetAddressAndPort.getByName("2")) .put("2", InetAddressAndPort.getByName("22")) .build(); - Collection result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection result = filterBatchlogEndpoints(endpoints); assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("22"))); + assertTrue(result.contains(InetAddressAndPort.getByName("11"))); + assertTrue(result.contains(InetAddressAndPort.getByName("22"))); } @Test @@ -61,10 +64,10 @@ public void shouldSelectHostFromLocal() throws UnknownHostException .put(LOCAL, InetAddressAndPort.getByName("00")) .put("1", InetAddressAndPort.getByName("1")) .build(); - Collection result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection result = filterBatchlogEndpoints(endpoints); assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0"))); + assertTrue(result.contains(InetAddressAndPort.getByName("1"))); + assertTrue(result.contains(InetAddressAndPort.getByName("0"))); } @Test @@ -73,9 +76,9 @@ public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException Multimap endpoints = ImmutableMultimap. builder() .put(LOCAL, InetAddressAndPort.getByName("0")) .build(); - Collection result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection result = filterBatchlogEndpoints(endpoints); assertThat(result.size(), is(1)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0"))); + assertTrue(result.contains(InetAddressAndPort.getByName("0"))); } @Test @@ -88,12 +91,12 @@ public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostEx .put("1", InetAddressAndPort.getByName("11")) .put("1", InetAddressAndPort.getByName("111")) .build(); - Collection result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection result = filterBatchlogEndpoints(endpoints); // result should be the last two non-local replicas // (Collections.shuffle has been replaced with Collections.reverse for testing) assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111"))); + assertTrue(result.contains(InetAddressAndPort.getByName("11"))); + assertTrue(result.contains(InetAddressAndPort.getByName("111"))); } @Test @@ -105,40 +108,22 @@ public void shouldSelectTwoRandomHostsFromSingleRack() throws UnknownHostExcepti .put(LOCAL, InetAddressAndPort.getByName("111")) .put(LOCAL, InetAddressAndPort.getByName("1111")) .build(); - Collection result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection result = filterBatchlogEndpoints(endpoints); // result should be the last two non-local replicas // (Collections.shuffle has been replaced with Collections.reverse for testing) assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111"))); - assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1111"))); + assertTrue(result.contains(InetAddressAndPort.getByName("111"))); + assertTrue(result.contains(InetAddressAndPort.getByName("1111"))); } - private static class TestEndpointFilter extends BatchlogManager.EndpointFilter + private Collection filterBatchlogEndpoints(Multimap endpoints) { - TestEndpointFilter(String localRack, Multimap endpoints) - { - super(localRack, endpoints); - } - - @Override - protected boolean isValid(InetAddressAndPort input) - { - // We will use always alive non-localhost endpoints - return true; - } - - @Override - protected int getRandomInt(int bound) - { - // We don't need random behavior here - return bound - 1; - } - - @Override - protected void shuffle(List list) - { - // We don't need random behavior here - Collections.reverse(list); - } + return ReplicaPlans.filterBatchlogEndpoints(LOCAL, endpoints, + // Do not shuffle + coll -> {}, + // Always alive + (addr) -> true, + // Always pick the last + (size) -> size - 1); } } From 1eeea3637c64d62012ca86ee11bdfcf3d6dd38bc Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:29:20 +0200 Subject: [PATCH 03/13] Formatting --- src/java/org/apache/cassandra/locator/ReplicaPlans.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index db37aac5c842..7c829bedeb7e 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -191,8 +191,8 @@ public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - EndpointsForToken.of(token, localSystemReplica), - EndpointsForToken.empty(token) + EndpointsForToken.of(token, localSystemReplica), + EndpointsForToken.empty(token) ); return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); From a6b41308030913daff57d345590c4fbd8a9d29f5 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:29:53 +0200 Subject: [PATCH 04/13] Use boolean instead of consistency level --- src/java/org/apache/cassandra/locator/ReplicaPlans.java | 6 ++++-- src/java/org/apache/cassandra/service/StorageProxy.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 7c829bedeb7e..ee3fb4d8bb10 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -201,8 +201,10 @@ public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() /** * Requires that the provided endpoints are alive. Converts them to their relevant system replicas. * Note that the liveAndDown collection and live are equal to the provided endpoints. + * + * @param isAny if batch consistency level is ANY, in which case a local node will be picked */ - public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException + public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, boolean isAny) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); @@ -232,7 +234,7 @@ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, ); // Batchlog is hosted by either one node or two nodes from different racks. - consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; + ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c8a28e941fe3..201ea85b33bc 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -921,7 +921,7 @@ public static void mutateAtomically(Collection mutations, batchConsistencyLevel = consistency_level; } - ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(localDataCenter, batchConsistencyLevel); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(localDataCenter, batchConsistencyLevel == ConsistencyLevel.ANY); final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), From 93eddd2548dc45032c2056d9b0109e88f2a54b25 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:30:32 +0200 Subject: [PATCH 05/13] Remove short-circuit --- .../apache/cassandra/locator/ReplicaPlans.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index ee3fb4d8bb10..7b3f97b12309 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -44,11 +45,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; @@ -220,13 +226,8 @@ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, // - allow the local node to be the only replica only if it's a single-node DC Collection chosenEndpoints = filterBatchlogEndpoints(localRack, localEndpoints); - if (chosenEndpoints.isEmpty()) - { - if (consistencyLevel == ConsistencyLevel.ANY) - chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); - else - throw UnavailableException.create(ConsistencyLevel.ONE, 1, 0); - } + if (chosenEndpoints.isEmpty() && isAny) + chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), From 82d735b1b7de02557bac55ba5a040b5746c2f69b Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:30:53 +0200 Subject: [PATCH 06/13] Use static imports --- .../org/apache/cassandra/service/StorageProxy.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 201ea85b33bc..239f439ad093 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -80,6 +80,8 @@ import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.AbstractIterator; +import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup; + public class StorageProxy implements StorageProxyMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy"; @@ -781,8 +783,8 @@ public static void mutateMV(ByteBuffer dataKey, Collection mutations, //Since the base -> view replication is 1:1 we only need to store the BL locally ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forLocalBatchlogWrite(); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); + BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -924,8 +926,8 @@ public static void mutateAtomically(Collection mutations, ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(localDataCenter, batchConsistencyLevel == ConsistencyLevel.ANY); final UUID batchUUID = UUIDGen.getTimeUUID(); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); + BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(replicaPlan, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) From 0948c87c8377fd3acb976d14bf379ff0cab25860 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:31:13 +0200 Subject: [PATCH 07/13] Rename isLocal to isSelf --- .../apache/cassandra/db/view/ViewUtils.java | 4 ++-- .../dht/RangeFetchMapCalculator.java | 2 +- .../apache/cassandra/dht/RangeStreamer.java | 8 ++++---- .../org/apache/cassandra/locator/Replica.java | 2 +- .../cassandra/service/StorageProxy.java | 20 +++++++++---------- .../service/reads/AbstractReadExecutor.java | 2 +- .../reads/ShortReadPartitionsProtection.java | 2 +- .../cassandra/streaming/StreamPlan.java | 4 ++-- .../cassandra/streaming/StreamSession.java | 4 ++-- 9 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java index ad10d9dcdf2b..9b7f6aa54a1a 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUtils.java +++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java @@ -67,7 +67,7 @@ public static Optional getViewNaturalEndpoint(String keyspaceName, Toke EndpointsForToken naturalBaseReplicas = replicationStrategy.getNaturalReplicasForToken(baseToken); EndpointsForToken naturalViewReplicas = replicationStrategy.getNaturalReplicasForToken(viewToken); - Optional localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isLocal).toJavaUtil(); + Optional localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isSelf).toJavaUtil(); if (localReplica.isPresent()) return localReplica; @@ -93,7 +93,7 @@ public static Optional getViewNaturalEndpoint(String keyspaceName, Toke int baseIdx = -1; for (int i=0; i capacityGraph { sourceFound = true; // if we pass filters, it means that we don't filter away localhost and we can count it as a source: - if (replica.isLocal()) + if (replica.isSelf()) continue; // but don't add localhost to the graph to avoid streaming locally final Vertex endpointVertex = new EndpointVertex(replica.endpoint()); capacityGraph.insertVertex(rangeVertex); diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index f46d665704d9..b50a4e282e43 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -104,7 +104,7 @@ public FetchReplica(Replica local, Replica remote) { Preconditions.checkNotNull(local); Preconditions.checkNotNull(remote); - assert local.isLocal() && !remote.isLocal(); + assert local.isSelf() && !remote.isSelf(); this.local = local; this.remote = remote; } @@ -203,7 +203,7 @@ public static class ExcludeLocalNodeFilter implements SourceFilter @Override public boolean apply(Replica replica) { - return !replica.isLocal(); + return !replica.isSelf(); } @Override @@ -553,8 +553,8 @@ public static Multimap convertPreferredEndpoin { for (Replica source : e.getValue()) { - assert e.getKey().isLocal(); - assert !source.isLocal(); + assert (e.getKey()).isSelf(); + assert !source.isSelf(); workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source)); } } diff --git a/src/java/org/apache/cassandra/locator/Replica.java b/src/java/org/apache/cassandra/locator/Replica.java index 37b6050b8e00..c884f136f51e 100644 --- a/src/java/org/apache/cassandra/locator/Replica.java +++ b/src/java/org/apache/cassandra/locator/Replica.java @@ -100,7 +100,7 @@ public final InetAddressAndPort endpoint() return endpoint; } - public boolean isLocal() + public boolean isSelf() { return endpoint.equals(FBUtilities.getBroadcastAddressAndPort()); } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 239f439ad093..7ed2de7f9428 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -448,7 +448,7 @@ private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPax MessageOut message = new MessageOut(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); for (Replica replica: replicaPlan.contacts()) { - if (replica.isLocal()) + if (replica.isSelf()) { StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PREPARE)).execute(new Runnable() { @@ -486,7 +486,7 @@ private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite r MessageOut message = new MessageOut(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); for (Replica replica : replicaPlan.contacts()) { - if (replica.isLocal()) + if (replica.isSelf()) { StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_PROPOSE)).execute(new Runnable() { @@ -551,7 +551,7 @@ private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLev { if (shouldBlock) { - if (replica.isLocal()) + if (replica.isSelf()) commitPaxosLocal(replica, message, responseHandler); else MessagingService.instance().sendWriteRR(message, replica, responseHandler, allowHints && shouldHint(replica)); @@ -809,7 +809,7 @@ public static void mutateMV(ByteBuffer dataKey, Collection mutations, // When local node is the endpoint we can just apply the mutation locally, // unless there are pending endpoints, in which case we want to do an ordinary // write so the view mutation is sent to the pending endpoint - if (pairedEndpoint.get().isLocal() && StorageService.instance.isJoined() + if (pairedEndpoint.get().isSelf() && StorageService.instance.isJoined() && pendingReplicas.isEmpty()) { try @@ -1015,7 +1015,7 @@ private static void syncWriteToBatchlog(Collection mutations, ReplicaP { logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size()); - if (replica.isLocal()) + if (replica.isSelf()) performLocally(Stage.MUTATION, replica, Optional.empty(), () -> BatchlogManager.store(batch), handler); else MessagingService.instance().sendRR(message, replica.endpoint(), handler); @@ -1031,7 +1031,7 @@ private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPla if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); - if (target.isLocal()) + if (target.isSelf()) performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid)); else MessagingService.instance().sendOneWay(message, target.endpoint()); @@ -1207,7 +1207,7 @@ public static void sendToHintedReplicas(final Mutation mutation, if (plan.isAlive(destination)) { - if (destination.isLocal()) + if (destination.isSelf()) { insertLocal = true; localReplica = destination; @@ -1400,7 +1400,7 @@ public static AbstractWriteResponseHandler mutateCounter(CounterMutat { Replica replica = findSuitableReplica(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); - if (replica.isLocal()) + if (replica.isSelf()) { return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime); } @@ -2072,7 +2072,7 @@ private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean command.trackRepairedStatus(); } - if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal()) + if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf()) { StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler)); } @@ -2342,7 +2342,7 @@ public static boolean shouldHint(Replica replica) { if (!DatabaseDescriptor.hintedHandoffEnabled()) return false; - if (replica.isTransient() || replica.isLocal()) + if (replica.isTransient() || replica.isSelf()) return false; Set disabledDCs = DatabaseDescriptor.hintedHandoffDisabledDCs(); diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 6881a2fed6b6..8d0f14c8d470 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -138,7 +138,7 @@ private void makeRequests(ReadCommand readCommand, ReplicaCollection replicas for (Replica replica: replicas) { InetAddressAndPort endpoint = replica.endpoint(); - if (replica.isLocal()) + if (replica.isSelf()) { hasLocalEndpoint = true; continue; diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index b16d10578475..2e4440fd4e4f 100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@ -181,7 +181,7 @@ UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shar DataResolver resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair)NoopReadRepair.instance, queryStartNanoTime); ReadCallback handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); - if (source.isLocal()) + if (source.isSelf()) StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); else MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source.endpoint(), handler); diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index ea54f9d0d2d7..3fcabd091b4e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -104,8 +104,8 @@ public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Ranges public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies) { //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node - assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); - assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); + assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); + assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); StreamSession session = coordinator.getOrCreateNextSession(from); session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies)); diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 80fcebba7221..08a1b078027a 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -311,8 +311,8 @@ public void start() public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection columnFamilies) { //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node - assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); - assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); + assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); + assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies)); } From ebc73d696091e4f8347022b43bb47d304d7f9ee2 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:33:09 +0200 Subject: [PATCH 08/13] Fix tests --- .../apache/cassandra/batchlog/BatchlogEndpointFilterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java index 0e3285c55e0b..c2b9fc9a8bed 100644 --- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java @@ -119,8 +119,8 @@ public void shouldSelectTwoRandomHostsFromSingleRack() throws UnknownHostExcepti private Collection filterBatchlogEndpoints(Multimap endpoints) { return ReplicaPlans.filterBatchlogEndpoints(LOCAL, endpoints, - // Do not shuffle - coll -> {}, + // Reverse instead of shuffle + Collections::reverse, // Always alive (addr) -> true, // Always pick the last From a4a99b2a53f6edb8070816bd5af1639aa7be23e6 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 21 Sep 2018 16:35:35 +0200 Subject: [PATCH 09/13] Move filter batchlog methods to the bottom of the class --- .../cassandra/locator/ReplicaPlans.java | 148 +++++++++--------- 1 file changed, 74 insertions(+), 74 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 7b3f97b12309..a64ea484a64c 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -245,80 +245,6 @@ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, return result; } - private static Collection filterBatchlogEndpoints(String localRack, - Multimap endpoints) - { - return filterBatchlogEndpoints(localRack, - endpoints, - Collections::shuffle, - FailureDetector.isEndpointAlive, - ThreadLocalRandom.current()::nextInt); - } - - // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - public static Collection filterBatchlogEndpoints(String localRack, - Multimap endpoints, - Consumer> shuffle, - Predicate isAlive, - Function indexPicker) - { - // special case for single-node data centers - if (endpoints.values().size() == 1) - return endpoints.values(); - - // strip out dead endpoints and localhost - ListMultimap validated = ArrayListMultimap.create(); - for (Map.Entry entry : endpoints.entries()) - { - InetAddressAndPort addr = entry.getValue(); - if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) - validated.put(entry.getKey(), entry.getValue()); - } - - if (validated.size() <= 2) - return validated.values(); - - if (validated.size() - validated.get(localRack).size() >= 2) - { - // we have enough endpoints in other racks - validated.removeAll(localRack); - } - - if (validated.keySet().size() == 1) - { - /* - * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) - * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack - * because of the preceding if block. - */ - List otherRack = Lists.newArrayList(validated.values()); - shuffle.accept(otherRack); - return otherRack.subList(0, 2); - } - - // randomize which racks we pick from if more than 2 remaining - Collection racks; - if (validated.keySet().size() == 2) - { - racks = validated.keySet(); - } - else - { - racks = Lists.newArrayList(validated.keySet()); - shuffle.accept((List) racks); - } - - // grab a random member of up to two racks - List result = new ArrayList<>(2); - for (String rack : Iterables.limit(racks, 2)) - { - List rackMembers = validated.get(rack); - result.add(rackMembers.get(indexPicker.apply(rackMembers.size()))); - } - - return result; - } - public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException { return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector); @@ -559,4 +485,78 @@ public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, Consistency // If we get there, merge this range and the next one return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts); } + + private static Collection filterBatchlogEndpoints(String localRack, + Multimap endpoints) + { + return filterBatchlogEndpoints(localRack, + endpoints, + Collections::shuffle, + FailureDetector.isEndpointAlive, + ThreadLocalRandom.current()::nextInt); + } + + // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + public static Collection filterBatchlogEndpoints(String localRack, + Multimap endpoints, + Consumer> shuffle, + Predicate isAlive, + Function indexPicker) + { + // special case for single-node data centers + if (endpoints.values().size() == 1) + return endpoints.values(); + + // strip out dead endpoints and localhost + ListMultimap validated = ArrayListMultimap.create(); + for (Map.Entry entry : endpoints.entries()) + { + InetAddressAndPort addr = entry.getValue(); + if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) + validated.put(entry.getKey(), entry.getValue()); + } + + if (validated.size() <= 2) + return validated.values(); + + if (validated.size() - validated.get(localRack).size() >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + /* + * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) + * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack + * because of the preceding if block. + */ + List otherRack = Lists.newArrayList(validated.values()); + shuffle.accept(otherRack); + return otherRack.subList(0, 2); + } + + // randomize which racks we pick from if more than 2 remaining + Collection racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + shuffle.accept((List) racks); + } + + // grab a random member of up to two racks + List result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List rackMembers = validated.get(rack); + result.add(rackMembers.get(indexPicker.apply(rackMembers.size()))); + } + + return result; + } } From cf6f8329d2754da42867f2d43adfda458550292a Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Mon, 24 Sep 2018 22:16:22 +0200 Subject: [PATCH 10/13] Enable dtests --- .circleci/config.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5a84f724fcf8..76a2c9f84178 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only - build # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment env_settings: &env_settings - <<: *default_env_settings - #<<: *high_capacity_env_settings + #<<: *default_env_settings + <<: *high_capacity_env_settings env_vars: &env_vars - <<: *resource_constrained_env_vars - #<<: *high_capacity_env_vars + #<<: *resource_constrained_env_vars + <<: *high_capacity_env_vars workflows: version: 2 - build_and_run_tests: *default_jobs + #build_and_run_tests: *default_jobs #build_and_run_tests: *with_dtest_jobs_only - #build_and_run_tests: *with_dtest_jobs + build_and_run_tests: *with_dtest_jobs docker_image: &docker_image kjellman/cassandra-test:0.4.3 version: 2 jobs: From c1520ac3a3854becad88b5a7266b815cf8dc253f Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Tue, 25 Sep 2018 10:13:11 +0200 Subject: [PATCH 11/13] Fix dtest --- src/java/org/apache/cassandra/locator/SystemReplicas.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java index 0d1fc8d56a99..3009f6376640 100644 --- a/src/java/org/apache/cassandra/locator/SystemReplicas.java +++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java @@ -51,6 +51,9 @@ public static Replica getSystemReplica(InetAddressAndPort endpoint) public static EndpointsForRange getSystemReplicas(Collection endpoints) { + if (endpoints.isEmpty()) + return EndpointsForRange.empty(FULL_RANGE); + return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica)); } } From 5b36b5402a0917002108b5953a96f17e9e34b92f Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 26 Sep 2018 16:58:48 +0200 Subject: [PATCH 12/13] Move filterBatchlogEndpoint methods closer to other batch methods, avoid passing a global variable --- .../cassandra/locator/ReplicaPlans.java | 164 +++++++++--------- .../cassandra/service/StorageProxy.java | 9 +- 2 files changed, 87 insertions(+), 86 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index a64ea484a64c..732867b59fdb 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -28,7 +28,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; @@ -42,6 +41,8 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.utils.FBUtilities; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; @@ -210,21 +210,21 @@ public static ReplicaPlan.ForTokenWrite forLocalBatchlogWrite() * * @param isAny if batch consistency level is ANY, in which case a local node will be picked */ - public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, boolean isAny) throws UnavailableException + public static ReplicaPlan.ForTokenWrite forBatchlogWrite(boolean isAny) throws UnavailableException { // A single case we write not for range or token, but multiple mutations to many tokens Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); - Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); - String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); - + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + Multimap localEndpoints = HashMultimap.create(topology.getDatacenterRacks() + .get(snitch.getLocalDatacenter())); // Replicas are picked manually: // - replicas should be alive according to the failure detector // - replicas should be in the local datacenter // - choose min(2, number of qualifying candiates above) // - allow the local node to be the only replica only if it's a single-node DC - Collection chosenEndpoints = filterBatchlogEndpoints(localRack, localEndpoints); + Collection chosenEndpoints = filterBatchlogEndpoints(snitch.getLocalRack(), localEndpoints); if (chosenEndpoints.isEmpty() && isAny) chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); @@ -240,8 +240,81 @@ public static ReplicaPlan.ForTokenWrite forBatchlogWrite(String localDataCenter, Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); // assume that we have already been given live endpoints, and skip applying the failure detector - ReplicaPlan.ForTokenWrite result = forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll); - result.assureSufficientReplicas(); + return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll); + } + + private static Collection filterBatchlogEndpoints(String localRack, + Multimap endpoints) + { + return filterBatchlogEndpoints(localRack, + endpoints, + Collections::shuffle, + FailureDetector.isEndpointAlive, + ThreadLocalRandom.current()::nextInt); + } + + // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + @VisibleForTesting + public static Collection filterBatchlogEndpoints(String localRack, + Multimap endpoints, + Consumer> shuffle, + Predicate isAlive, + Function indexPicker) + { + // special case for single-node data centers + if (endpoints.values().size() == 1) + return endpoints.values(); + + // strip out dead endpoints and localhost + ListMultimap validated = ArrayListMultimap.create(); + for (Map.Entry entry : endpoints.entries()) + { + InetAddressAndPort addr = entry.getValue(); + if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) + validated.put(entry.getKey(), entry.getValue()); + } + + if (validated.size() <= 2) + return validated.values(); + + if (validated.size() - validated.get(localRack).size() >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + /* + * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) + * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack + * because of the preceding if block. + */ + List otherRack = Lists.newArrayList(validated.values()); + shuffle.accept(otherRack); + return otherRack.subList(0, 2); + } + + // randomize which racks we pick from if more than 2 remaining + Collection racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + shuffle.accept((List) racks); + } + + // grab a random member of up to two racks + List result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List rackMembers = validated.get(rack); + result.add(rackMembers.get(indexPicker.apply(rackMembers.size()))); + } + return result; } @@ -486,77 +559,4 @@ public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, Consistency return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts); } - private static Collection filterBatchlogEndpoints(String localRack, - Multimap endpoints) - { - return filterBatchlogEndpoints(localRack, - endpoints, - Collections::shuffle, - FailureDetector.isEndpointAlive, - ThreadLocalRandom.current()::nextInt); - } - - // Collect a list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - public static Collection filterBatchlogEndpoints(String localRack, - Multimap endpoints, - Consumer> shuffle, - Predicate isAlive, - Function indexPicker) - { - // special case for single-node data centers - if (endpoints.values().size() == 1) - return endpoints.values(); - - // strip out dead endpoints and localhost - ListMultimap validated = ArrayListMultimap.create(); - for (Map.Entry entry : endpoints.entries()) - { - InetAddressAndPort addr = entry.getValue(); - if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) - validated.put(entry.getKey(), entry.getValue()); - } - - if (validated.size() <= 2) - return validated.values(); - - if (validated.size() - validated.get(localRack).size() >= 2) - { - // we have enough endpoints in other racks - validated.removeAll(localRack); - } - - if (validated.keySet().size() == 1) - { - /* - * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack) - * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack - * because of the preceding if block. - */ - List otherRack = Lists.newArrayList(validated.values()); - shuffle.accept(otherRack); - return otherRack.subList(0, 2); - } - - // randomize which racks we pick from if more than 2 remaining - Collection racks; - if (validated.keySet().size() == 2) - { - racks = validated.keySet(); - } - else - { - racks = Lists.newArrayList(validated.keySet()); - shuffle.accept((List) racks); - } - - // grab a random member of up to two racks - List result = new ArrayList<>(2); - for (String rack : Iterables.limit(racks, 2)) - { - List rackMembers = validated.get(rack); - result.add(rackMembers.get(indexPicker.apply(rackMembers.size()))); - } - - return result; - } } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 7ed2de7f9428..f05331af1f3c 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -902,7 +902,6 @@ public static void mutateAtomically(Collection mutations, long startTime = System.nanoTime(); List wrappers = new ArrayList(mutations.size()); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas())) throw new AssertionError("Logged batches are unsupported with transient replication"); @@ -923,7 +922,7 @@ public static void mutateAtomically(Collection mutations, batchConsistencyLevel = consistency_level; } - ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(localDataCenter, batchConsistencyLevel == ConsistencyLevel.ANY); + ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(batchConsistencyLevel == ConsistencyLevel.ANY); final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogCleanup cleanup = new BatchlogCleanup(mutations.size(), @@ -946,7 +945,7 @@ public static void mutateAtomically(Collection mutations, syncWriteToBatchlog(mutations, replicaPlan, batchUUID, queryStartNanoTime); // now actually perform the writes and wait for them to complete - syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION); + syncWriteBatchedMutations(wrappers, Stage.MUTATION); } catch (UnavailableException e) { @@ -1056,9 +1055,11 @@ private static void asyncWriteBatchedMutations(List } } - private static void syncWriteBatchedMutations(List wrappers, String localDataCenter, Stage stage) + private static void syncWriteBatchedMutations(List wrappers, Stage stage) throws WriteTimeoutException, OverloadedException { + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); + for (WriteResponseHandlerWrapper wrapper : wrappers) { EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown(); From 1f2ca955b666dc4cdca2972103f7dc69842ee6f6 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 26 Sep 2018 17:00:34 +0200 Subject: [PATCH 13/13] Add local datacenter and rack helpers --- .../cassandra/config/DatabaseDescriptor.java | 2 +- .../db/CounterMutationVerbHandler.java | 2 +- .../apache/cassandra/db/SystemKeyspace.java | 4 ++-- .../apache/cassandra/db/view/ViewUtils.java | 2 +- .../org/apache/cassandra/dht/Datacenters.java | 2 +- .../cassandra/locator/EndpointSnitchInfo.java | 4 ++-- .../cassandra/locator/IEndpointSnitch.java | 22 +++++++++++++++++-- .../cassandra/locator/InOurDcTester.java | 2 +- .../cassandra/net/MessagingService.java | 6 ++--- .../cassandra/service/RangeRelocator.java | 3 +-- .../cassandra/service/StartupChecks.java | 4 ++-- .../cassandra/service/StorageProxy.java | 4 ++-- .../cassandra/service/StorageService.java | 8 +++---- 13 files changed, 41 insertions(+), 24 deletions(-) diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 2ad9b181e849..dc76431b66cd 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -991,7 +991,7 @@ public static void applySnitch() snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); EndpointSnitchInfo.create(); - localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + localDC = snitch.getLocalDatacenter(); localComparator = (replica1, replica2) -> { boolean local1 = localDC.equals(snitch.getDatacenter(replica1)); boolean local2 = localDC.equals(snitch.getDatacenter(replica2)); diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index 95d791618afd..c946ea595fef 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -37,7 +37,7 @@ public void doVerb(final MessageIn message, final int id) final CounterMutation cm = message.payload; logger.trace("Applying forwarded {}", cm); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); // We should not wait for the result of the write in this thread, // otherwise we could have a distributed deadlock between replicas // running this VerbHandler (see #4578). diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 0f904cea1ef9..1b3b2a68c882 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -460,8 +460,8 @@ public static void persistLocalMetadata() FBUtilities.getReleaseVersionString(), QueryProcessor.CQL_VERSION.toString(), String.valueOf(ProtocolVersion.CURRENT.asInt()), - snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()), - snitch.getRack(FBUtilities.getBroadcastAddressAndPort()), + snitch.getLocalDatacenter(), + snitch.getLocalRack(), DatabaseDescriptor.getPartitioner().getClass().getName(), DatabaseDescriptor.getRpcAddress(), DatabaseDescriptor.getNativeTransportPort(), diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java index 9b7f6aa54a1a..e824732a5e69 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUtils.java +++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java @@ -63,7 +63,7 @@ public static Optional getViewNaturalEndpoint(String keyspaceName, Toke { AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); EndpointsForToken naturalBaseReplicas = replicationStrategy.getNaturalReplicasForToken(baseToken); EndpointsForToken naturalViewReplicas = replicationStrategy.getNaturalReplicasForToken(viewToken); diff --git a/src/java/org/apache/cassandra/dht/Datacenters.java b/src/java/org/apache/cassandra/dht/Datacenters.java index 26ae2e6970bb..9695a09cb35f 100644 --- a/src/java/org/apache/cassandra/dht/Datacenters.java +++ b/src/java/org/apache/cassandra/dht/Datacenters.java @@ -32,7 +32,7 @@ public class Datacenters private static class DCHandle { - private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + private static final String thisDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); } public static String thisDatacenter() diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java index c06d765568b7..da90a7932184 100644 --- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java +++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java @@ -53,12 +53,12 @@ public String getRack(String host) throws UnknownHostException public String getDatacenter() { - return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + return DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); } public String getRack() { - return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + return DatabaseDescriptor.getEndpointSnitch().getLocalRack(); } public String getSnitchName() diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java index b7797b0b1417..381a64225cd3 100644 --- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java @@ -19,6 +19,8 @@ import java.util.Set; +import org.apache.cassandra.utils.FBUtilities; + /** * This interface helps determine location of node in the datacenter relative to another node. * Give a node A and another node B it can tell if A and B are on the same rack or in the same @@ -28,15 +30,31 @@ public interface IEndpointSnitch { /** - * returns a String representing the rack this endpoint belongs to + * returns a String representing the rack the given endpoint belongs to */ public String getRack(InetAddressAndPort endpoint); /** - * returns a String representing the datacenter this endpoint belongs to + * returns a String representing the rack current endpoint belongs to + */ + default public String getLocalRack() + { + return getRack(FBUtilities.getBroadcastAddressAndPort()); + } + + /** + * returns a String representing the datacenter the given endpoint belongs to */ public String getDatacenter(InetAddressAndPort endpoint); + /** + * returns a String representing the datacenter current endpoint belongs to + */ + default public String getLocalDatacenter() + { + return getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + } + default public String getDatacenter(Replica replica) { return getDatacenter(replica.endpoint()); diff --git a/src/java/org/apache/cassandra/locator/InOurDcTester.java b/src/java/org/apache/cassandra/locator/InOurDcTester.java index 23a8c1364050..514c7efdf4ba 100644 --- a/src/java/org/apache/cassandra/locator/InOurDcTester.java +++ b/src/java/org/apache/cassandra/locator/InOurDcTester.java @@ -43,7 +43,7 @@ boolean stale() // this final clause checks if somehow the snitch/localDc have got out of whack; // presently, this is possible but very unlikely, but this check will also help // resolve races on these global fields as well - || !dc.equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())); + || !dc.equals(snitch.getLocalDatacenter()); } private static final class ReplicaTester extends InOurDcTester implements Predicate diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index bd290a1d950d..c6e8496d5b41 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1696,13 +1696,13 @@ public static boolean isEncryptedConnection(InetAddressAndPort address) case all: break; case dc: - if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()))) + if (snitch.getDatacenter(address).equals(snitch.getLocalDatacenter())) return false; break; case rack: // for rack then check if the DC's are the same. - if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddressAndPort())) - && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()))) + if (snitch.getRack(address).equals(snitch.getLocalRack()) + && snitch.getDatacenter(address).equals(snitch.getLocalDatacenter())) return false; break; } diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java index f2af3db8140b..839a34c11de8 100644 --- a/src/java/org/apache/cassandra/service/RangeRelocator.java +++ b/src/java/org/apache/cassandra/service/RangeRelocator.java @@ -185,8 +185,7 @@ public void calculateToFromStreams() //In the single node token move there is nothing to do and Range subtraction is broken //so it's easier to just identify this case up front. - if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort() -)).size() > 1) + if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter()).size() > 1) { // getting collection of the currently used ranges by this keyspace RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress); diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 224fd5e7a679..881428197e51 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -437,7 +437,7 @@ public void execute() throws StartupException String storedDc = SystemKeyspace.getDatacenter(); if (storedDc != null) { - String currentDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String currentDc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); if (!storedDc.equals(currentDc)) { String formatMessage = "Cannot start node if snitch's data center (%s) differs from previous data center (%s). " + @@ -459,7 +459,7 @@ public void execute() throws StartupException String storedRack = SystemKeyspace.getRack(); if (storedRack != null) { - String currentRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); + String currentRack = DatabaseDescriptor.getEndpointSnitch().getLocalRack(); if (!storedRack.equals(currentRack)) { String formatMessage = "Cannot start node if snitch's rack (%s) differs from previous rack (%s). " + diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index f05331af1f3c..b3adc4779f53 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -625,7 +625,7 @@ public static void mutate(List mutations, ConsistencyLevel throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException { Tracing.trace("Determining replicas for mutation"); - final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); long startTime = System.nanoTime(); @@ -757,7 +757,7 @@ public static void mutateMV(ByteBuffer dataKey, Collection mutations, throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for mutation"); - final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); long startTime = System.nanoTime(); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 391598c379ce..caa732acb7f3 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1060,8 +1060,8 @@ public static boolean isReplacingSameAddress() public void gossipSnitchInfo() { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()); - String rack = snitch.getRack(FBUtilities.getBroadcastAddressAndPort()); + String dc = snitch.getLocalDatacenter(); + String rack = snitch.getLocalRack(); Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc)); Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack)); } @@ -1835,7 +1835,7 @@ private List getTokensInLocalDC() private boolean isLocalDC(InetAddressAndPort targetHost) { String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); - String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String localDC = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); return remoteDC.equals(localDC); } @@ -4073,7 +4073,7 @@ public void decommission(boolean force) throws InterruptedException { PendingRangeCalculatorService.instance.blockUntilFinished(); - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String dc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges {