From 7627e4fd9219ddca9b2fd0ea06851e74e44396a1 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Wed, 19 Sep 2018 12:52:27 +0100 Subject: [PATCH 1/2] Transient Replication support for EACH_QUORUM, and correction of behaviour for LOCAL_QUORUM --- .../apache/cassandra/db/ConsistencyLevel.java | 18 ++- .../cassandra/locator/ReplicaPlans.java | 50 +++++--- .../apache/cassandra/locator/Replicas.java | 24 +++- .../locator/ReplicaCollectionTest.java | 42 +----- .../cassandra/locator/ReplicaLayoutTest.java | 2 +- .../cassandra/locator/ReplicaPlansTest.java | 120 ++++++++++++++++++ .../cassandra/locator/ReplicaUtils.java | 63 +++++++++ .../WriteResponseHandlerTransientTest.java | 12 +- 8 files changed, 264 insertions(+), 67 deletions(-) create mode 100644 test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 9e884a75983b..49739159d9ff 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -27,6 +27,7 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.transport.ProtocolException; +import static org.apache.cassandra.locator.Replicas.addToCountPerDc; import static org.apache.cassandra.locator.Replicas.countInOurDc; public enum ConsistencyLevel @@ -92,7 +93,12 @@ public static int localQuorumFor(Keyspace keyspace, String dc) : quorumFor(keyspace); } - public static ObjectIntOpenHashMap eachQuorumFor(Keyspace keyspace) + public static int localQuorumForOurDc(Keyspace keyspace) + { + return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter()); + } + + public static ObjectIntOpenHashMap eachQuorumForRead(Keyspace keyspace) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); ObjectIntOpenHashMap perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size()); @@ -101,6 +107,13 @@ public static ObjectIntOpenHashMap eachQuorumFor(Keyspace keyspace) return perDc; } + public static ObjectIntOpenHashMap eachQuorumForWrite(Keyspace keyspace, Endpoints pendingWithDown) + { + ObjectIntOpenHashMap perDc = eachQuorumForRead(keyspace); + addToCountPerDc(perDc, pendingWithDown, 1); + return perDc; + } + public int blockFor(Keyspace keyspace) { switch (this) @@ -121,7 +134,7 @@ public int blockFor(Keyspace keyspace) return keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; case LOCAL_QUORUM: case LOCAL_SERIAL: - return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter()); + return localQuorumForOurDc(keyspace); case EACH_QUORUM: if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { @@ -164,6 +177,7 @@ public int blockForWrite(Keyspace keyspace, Endpoints pending) /** * Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace + * WARNING: this is not locality aware; you cannot safely use this with mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM) */ public boolean satisfies(ConsistencyLevel other, Keyspace keyspace) { diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 5551e5fb7355..a6fe53ff0018 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -19,6 +19,7 @@ package org.apache.cassandra.locator; import com.carrotsearch.hppc.ObjectIntOpenHashMap; +import com.carrotsearch.hppc.cursors.ObjectIntCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; @@ -57,9 +58,13 @@ import java.util.function.Predicate; import static com.google.common.collect.Iterables.any; +import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM; -import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor; +import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForRead; +import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForWrite; import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor; +import static org.apache.cassandra.db.ConsistencyLevel.localQuorumForOurDc; +import static org.apache.cassandra.locator.Replicas.addToCountPerDc; import static org.apache.cassandra.locator.Replicas.countInOurDc; import static org.apache.cassandra.locator.Replicas.countPerDc; @@ -77,7 +82,7 @@ public static boolean isSufficientLiveReplicasForRead(Keyspace keyspace, Consist case LOCAL_ONE: return countInOurDc(liveReplicas).hasAtleast(1, 1); case LOCAL_QUORUM: - return countInOurDc(liveReplicas).hasAtleast(consistencyLevel.blockFor(keyspace), 1); + return countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace), 1); case EACH_QUORUM: if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { @@ -143,7 +148,7 @@ static void assureSufficientLiveReplicas(Keyspace keyspace, ConsistencyLevel con Collection dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters(); for (ObjectObjectCursor entry : countPerDc(dcs, allLive)) { - int dcBlockFor = ConsistencyLevel.localQuorumFor(keyspace, entry.key); + int dcBlockFor = localQuorumFor(keyspace, entry.key); Replicas.ReplicaCount dcCount = entry.value; if (!dcCount.hasAtleast(dcBlockFor, 0)) throw UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas()); @@ -340,7 +345,7 @@ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyL public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException { - EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live); + EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live); assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending()); return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts); } @@ -348,20 +353,20 @@ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyL public interface Selector { , L extends ReplicaLayout.ForWrite> - E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live); + E select(Keyspace keyspace, L liveAndDown, L live); } /** * Select all nodes, transient or otherwise, as targets for the operation. * - * This is may no longer be useful until we finish implementing transient replication support, however + * This is may no longer be useful once we finish implementing transient replication support, however * it can be of value to stipulate that a location writes to all nodes without regard to transient status. */ public static final Selector writeAll = new Selector() { @Override public , L extends ReplicaLayout.ForWrite> - E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) + E select(Keyspace keyspace, L liveAndDown, L live) { return liveAndDown.all(); } @@ -380,22 +385,33 @@ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L { @Override public , L extends ReplicaLayout.ForWrite> - E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) + E select(Keyspace keyspace, L liveAndDown, L live) { if (!any(liveAndDown.all(), Replica::isTransient)) return liveAndDown.all(); - assert consistencyLevel != EACH_QUORUM; - ReplicaCollection.Builder contacts = liveAndDown.all().newBuilder(liveAndDown.all().size()); - contacts.addAll(liveAndDown.natural().filterLazily(Replica::isFull)); + contacts.addAll(filter(liveAndDown.natural(), Replica::isFull)); contacts.addAll(liveAndDown.pending()); - // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all) - int liveCount = contacts.count(live.all()::contains); - int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount; - if (requiredTransientCount > 0) - contacts.addAll(live.natural().filterLazily(Replica::isTransient, requiredTransientCount)); + /** + * Per CASSANDRA-14768, we ensure we write to at least a QUORUM of nodes in every DC, + * regardless of how many responses we need to wait for and our requested consistencyLevel. + * This is to minimally surprise users with transient replication; with normal writes, we + * soft-ensure that we reach QUORUM in all DCs we are able to, by writing to every node; + * even if we don't wait for ACK, we have in both cases sent sufficient messages. + */ + ObjectIntOpenHashMap requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending()); + addToCountPerDc(requiredPerDc, live.natural().filter(Replica::isFull), -1); + addToCountPerDc(requiredPerDc, live.pending(), -1); + + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (Replica replica : filter(live.natural(), Replica::isTransient)) + { + String dc = snitch.getDatacenter(replica); + if (requiredPerDc.addTo(dc, -1) >= 0) + contacts.add(replica); + } return contacts.build(); } }; @@ -453,7 +469,7 @@ private static > E candidatesForRead(ConsistencyLevel con private static > E contactForEachQuorumRead(Keyspace keyspace, E candidates) { assert keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy; - ObjectIntOpenHashMap perDc = eachQuorumFor(keyspace); + ObjectIntOpenHashMap perDc = eachQuorumForRead(keyspace); final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); return candidates.filter(replica -> { diff --git a/src/java/org/apache/cassandra/locator/Replicas.java b/src/java/org/apache/cassandra/locator/Replicas.java index 6c80134fe6ff..9e6048a41255 100644 --- a/src/java/org/apache/cassandra/locator/Replicas.java +++ b/src/java/org/apache/cassandra/locator/Replicas.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.function.Predicate; +import com.carrotsearch.hppc.ObjectIntOpenHashMap; import com.carrotsearch.hppc.ObjectObjectOpenHashMap; import com.google.common.collect.Iterables; import org.apache.cassandra.config.DatabaseDescriptor; @@ -84,20 +85,37 @@ public static ReplicaCount countInOurDc(ReplicaCollection replicas) return count; } - public static ObjectObjectOpenHashMap countPerDc(Collection dataCenters, Iterable liveReplicas) + /** + * count the number of full and transient replicas, separately, for each DC + */ + public static ObjectObjectOpenHashMap countPerDc(Collection dataCenters, Iterable replicas) { ObjectObjectOpenHashMap perDc = new ObjectObjectOpenHashMap<>(dataCenters.size()); for (String dc: dataCenters) perDc.put(dc, new ReplicaCount()); - for (Replica replica : liveReplicas) + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (Replica replica : replicas) { - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); + String dc = snitch.getDatacenter(replica); perDc.get(dc).increment(replica); } return perDc; } + /** + * increment each of the map's DC entries for each matching replica provided + */ + public static void addToCountPerDc(ObjectIntOpenHashMap perDc, Iterable replicas, int add) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (Replica replica : replicas) + { + String dc = snitch.getDatacenter(replica); + perDc.addTo(dc, add); + } + } + /** * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future */ diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java index ced49e2fa4b5..e2d479736bc3 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java @@ -46,51 +46,11 @@ import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.locator.Replica.fullReplica; import static org.apache.cassandra.locator.Replica.transientReplica; +import static org.apache.cassandra.locator.ReplicaUtils.*; public class ReplicaCollectionTest { - static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP; - static final Range R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE; - static final List ALL_EP; - static final List> ALL_R; - static - { - try - { - EP1 = InetAddressAndPort.getByName("127.0.0.1"); - EP2 = InetAddressAndPort.getByName("127.0.0.2"); - EP3 = InetAddressAndPort.getByName("127.0.0.3"); - EP4 = InetAddressAndPort.getByName("127.0.0.4"); - EP5 = InetAddressAndPort.getByName("127.0.0.5"); - BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort(); - NULL_EP = InetAddressAndPort.getByName("127.255.255.255"); - R1 = range(0, 1); - R2 = range(1, 2); - R3 = range(2, 3); - R4 = range(3, 4); - R5 = range(4, 0); - BROADCAST_RANGE = range(10, 11); - NULL_RANGE = range(10000, 10001); - ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP); - ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - - static Token tk(long t) - { - return new Murmur3Partitioner.LongToken(t); - } - - static Range range(long left, long right) - { - return new Range<>(tk(left), tk(right)); - } - static class TestCase> { final boolean isBuilder; diff --git a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java index 9f2ac583bd43..b5b60e3e46ab 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java @@ -23,7 +23,7 @@ import org.junit.Assert; import org.junit.Test; -import static org.apache.cassandra.locator.ReplicaCollectionTest.*; +import static org.apache.cassandra.locator.ReplicaUtils.*; public class ReplicaLayoutTest { diff --git a/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java new file mode 100644 index 000000000000..4d0dd47ba687 --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.ReplicaUtils.*; + +public class ReplicaPlansTest +{ + + static + { + DatabaseDescriptor.daemonInitialization(); + } + + static class Snitch extends AbstractNetworkTopologySnitch + { + final Set dc1; + Snitch(Set dc1) + { + this.dc1 = dc1; + } + @Override + public String getRack(InetAddressAndPort endpoint) + { + return dc1.contains(endpoint) ? "R1" : "R2"; + } + + @Override + public String getDatacenter(InetAddressAndPort endpoint) + { + return dc1.contains(endpoint) ? "DC1" : "DC2"; + } + } + + private static Keyspace ks(Set dc1, Map replication) + { + replication = ImmutableMap.builder().putAll(replication).put("class", "NetworkTopologyStrategy").build(); + Keyspace keyspace = Keyspace.mockKS(KeyspaceMetadata.create("blah", KeyspaceParams.create(false, replication))); + Snitch snitch = new Snitch(dc1); + DatabaseDescriptor.setEndpointSnitch(snitch); + keyspace.getReplicationStrategy().snitch = snitch; + return keyspace; + } + + private static Replica full(InetAddressAndPort ep) { return fullReplica(ep, R1); } + + + + @Test + public void testWriteEachQuorum() + { + IEndpointSnitch stash = DatabaseDescriptor.getEndpointSnitch(); + final Token token = tk(1L); + try + { + { + // all full natural + Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), ImmutableMap.of("DC1", "3", "DC2", "3")); + EndpointsForToken natural = EndpointsForToken.of(token, full(EP1), full(EP2), full(EP3), full(EP4), full(EP5), full(EP6)); + EndpointsForToken pending = EndpointsForToken.empty(token); + ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, ConsistencyLevel.EACH_QUORUM, natural, pending, Predicates.alwaysTrue(), ReplicaPlans.writeNormal); + assertEquals(natural, plan.liveAndDown); + assertEquals(natural, plan.live); + assertEquals(natural, plan.contacts()); + } + { + // all natural and up, one transient in each DC + Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), ImmutableMap.of("DC1", "3", "DC2", "3")); + EndpointsForToken natural = EndpointsForToken.of(token, full(EP1), full(EP2), trans(EP3), full(EP4), full(EP5), trans(EP6)); + EndpointsForToken pending = EndpointsForToken.empty(token); + ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, ConsistencyLevel.EACH_QUORUM, natural, pending, Predicates.alwaysTrue(), ReplicaPlans.writeNormal); + assertEquals(natural, plan.liveAndDown); + assertEquals(natural, plan.live); + EndpointsForToken expectContacts = EndpointsForToken.of(token, full(EP1), full(EP2), full(EP4), full(EP5)); + assertEquals(expectContacts, plan.contacts()); + } + } + finally + { + DatabaseDescriptor.setEndpointSnitch(stash); + } + + { + // test simple + + } + } + +} diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java index c5350dcb0138..72c0a06861ed 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java @@ -18,11 +18,17 @@ package org.apache.cassandra.locator; +import com.google.common.collect.ImmutableList; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.FBUtilities; +import org.junit.Assert; + +import java.net.UnknownHostException; +import java.util.List; import static org.apache.cassandra.locator.Replica.fullReplica; import static org.apache.cassandra.locator.Replica.transientReplica; @@ -51,4 +57,61 @@ public static Replica trans(InetAddressAndPort endpoint, Token token) { return transientReplica(endpoint, new Range<>(token, token)); } + + static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, EP6, EP7, EP8, EP9, BROADCAST_EP, NULL_EP; + static final Range R1, R2, R3, R4, R5, R6, R7, R8, R9, BROADCAST_RANGE, NULL_RANGE, WRAP_RANGE; + static final List ALL_EP; + static final List> ALL_R; + + static + { + try + { + EP1 = InetAddressAndPort.getByName("127.0.0.1"); + EP2 = InetAddressAndPort.getByName("127.0.0.2"); + EP3 = InetAddressAndPort.getByName("127.0.0.3"); + EP4 = InetAddressAndPort.getByName("127.0.0.4"); + EP5 = InetAddressAndPort.getByName("127.0.0.5"); + EP6 = InetAddressAndPort.getByName("127.0.0.6"); + EP7 = InetAddressAndPort.getByName("127.0.0.7"); + EP8 = InetAddressAndPort.getByName("127.0.0.8"); + EP9 = InetAddressAndPort.getByName("127.0.0.9"); + BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort(); + NULL_EP = InetAddressAndPort.getByName("127.255.255.255"); + R1 = range(0, 1); + R2 = range(1, 2); + R3 = range(2, 3); + R4 = range(3, 4); + R5 = range(4, 5); + R6 = range(5, 6); + R7 = range(6, 7); + R8 = range(7, 8); + R9 = range(8, 9); + BROADCAST_RANGE = range(10, 11); + NULL_RANGE = range(10000, 10001); + WRAP_RANGE = range(100000, 0); + ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP); + ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE, WRAP_RANGE); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + static Token tk(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + static Range range(long left, long right) + { + return new Range<>(tk(left), tk(right)); + } + + static void assertEquals(AbstractReplicaCollection a, AbstractReplicaCollection b) + { + Assert.assertEquals(a.list, b.list); + } + } diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java index d31d3f1fcf33..15fbd27be3e9 100644 --- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java @@ -173,9 +173,15 @@ private static ReplicaPlan.ForTokenWrite getSpeculationContext(EndpointsForToken private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite expected, EndpointsForToken replicas, Predicate livePredicate) { ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, livePredicate); - Assert.assertTrue(Iterables.elementsEqual(expected.pending(), actual.pending())); - Assert.assertTrue(Iterables.elementsEqual(expected.live(), actual.live())); - Assert.assertTrue(Iterables.elementsEqual(expected.contacts(), actual.contacts())); + assertEquals(expected.pending(), actual.pending()); + assertEquals(expected.live(), actual.live()); + assertEquals(expected.contacts(), actual.contacts()); + } + + private static void assertEquals(ReplicaCollection a, ReplicaCollection b) + { + if (!Iterables.elementsEqual(a, b)) + Assert.assertTrue(a + " vs " + b, false); } private static Predicate dead(InetAddressAndPort... endpoints) From 09ee889efbed6e381eb7af9fe92502dd721878d9 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Fri, 30 Nov 2018 13:57:39 +0000 Subject: [PATCH 2/2] circleci --- .circleci/config.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 3b2b97830ff8..ebc40008c1bc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only - build # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment env_settings: &env_settings - <<: *default_env_settings - #<<: *high_capacity_env_settings + #<<: *default_env_settings + <<: *high_capacity_env_settings env_vars: &env_vars - <<: *resource_constrained_env_vars - #<<: *high_capacity_env_vars + #<<: *resource_constrained_env_vars + <<: *high_capacity_env_vars workflows: version: 2 - build_and_run_tests: *default_jobs + #build_and_run_tests: *default_jobs #build_and_run_tests: *with_dtest_jobs_only - #build_and_run_tests: *with_dtest_jobs + build_and_run_tests: *with_dtest_jobs docker_image: &docker_image spod/cassandra-testing-ubuntu18-java11 version: 2 jobs: