Skip to content
Closed

14727 #275

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only
- build
# Set env_settings, env_vars, and workflows/build_and_run_tests based on environment
env_settings: &env_settings
<<: *default_env_settings
#<<: *high_capacity_env_settings
#<<: *default_env_settings
<<: *high_capacity_env_settings
env_vars: &env_vars
<<: *resource_constrained_env_vars
#<<: *high_capacity_env_vars
#<<: *resource_constrained_env_vars
<<: *high_capacity_env_vars
workflows:
version: 2
build_and_run_tests: *default_jobs
#build_and_run_tests: *default_jobs
#build_and_run_tests: *with_dtest_jobs_only
#build_and_run_tests: *with_dtest_jobs
build_and_run_tests: *with_dtest_jobs
docker_image: &docker_image spod/cassandra-testing-ubuntu18-java11
version: 2
jobs:
Expand Down
18 changes: 16 additions & 2 deletions src/java/org/apache/cassandra/db/ConsistencyLevel.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.transport.ProtocolException;

import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
import static org.apache.cassandra.locator.Replicas.countInOurDc;

public enum ConsistencyLevel
Expand Down Expand Up @@ -92,7 +93,12 @@ public static int localQuorumFor(Keyspace keyspace, String dc)
: quorumFor(keyspace);
}

public static ObjectIntOpenHashMap<String> eachQuorumFor(Keyspace keyspace)
public static int localQuorumForOurDc(Keyspace keyspace)
{
return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
}

public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace keyspace)
{
NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size());
Expand All @@ -101,6 +107,13 @@ public static ObjectIntOpenHashMap<String> eachQuorumFor(Keyspace keyspace)
return perDc;
}

public static ObjectIntOpenHashMap<String> eachQuorumForWrite(Keyspace keyspace, Endpoints<?> pendingWithDown)
{
ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace);
addToCountPerDc(perDc, pendingWithDown, 1);
return perDc;
}

public int blockFor(Keyspace keyspace)
{
switch (this)
Expand All @@ -121,7 +134,7 @@ public int blockFor(Keyspace keyspace)
return keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
case LOCAL_QUORUM:
case LOCAL_SERIAL:
return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
return localQuorumForOurDc(keyspace);
case EACH_QUORUM:
if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
Expand Down Expand Up @@ -164,6 +177,7 @@ public int blockForWrite(Keyspace keyspace, Endpoints<?> pending)

/**
* Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace
* WARNING: this is not locality aware; you cannot safely use this with mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM)
*/
public boolean satisfies(ConsistencyLevel other, Keyspace keyspace)
{
Expand Down
50 changes: 33 additions & 17 deletions src/java/org/apache/cassandra/locator/ReplicaPlans.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.locator;

import com.carrotsearch.hppc.ObjectIntOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectIntCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -57,9 +58,13 @@
import java.util.function.Predicate;

import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor;
import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForRead;
import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForWrite;
import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor;
import static org.apache.cassandra.db.ConsistencyLevel.localQuorumForOurDc;
import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
import static org.apache.cassandra.locator.Replicas.countInOurDc;
import static org.apache.cassandra.locator.Replicas.countPerDc;

Expand All @@ -77,7 +82,7 @@ public static boolean isSufficientLiveReplicasForRead(Keyspace keyspace, Consist
case LOCAL_ONE:
return countInOurDc(liveReplicas).hasAtleast(1, 1);
case LOCAL_QUORUM:
return countInOurDc(liveReplicas).hasAtleast(consistencyLevel.blockFor(keyspace), 1);
return countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace), 1);
case EACH_QUORUM:
if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
Expand Down Expand Up @@ -143,7 +148,7 @@ static void assureSufficientLiveReplicas(Keyspace keyspace, ConsistencyLevel con
Collection<String> dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters();
for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, allLive))
{
int dcBlockFor = ConsistencyLevel.localQuorumFor(keyspace, entry.key);
int dcBlockFor = localQuorumFor(keyspace, entry.key);
Replicas.ReplicaCount dcCount = entry.value;
if (!dcCount.hasAtleast(dcBlockFor, 0))
throw UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas());
Expand Down Expand Up @@ -340,28 +345,28 @@ public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyL

public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
{
EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live);
assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending());
return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
}

public interface Selector
{
<E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
E select(Keyspace keyspace, L liveAndDown, L live);
}

/**
* Select all nodes, transient or otherwise, as targets for the operation.
*
* This is may no longer be useful until we finish implementing transient replication support, however
* This is may no longer be useful once we finish implementing transient replication support, however
* it can be of value to stipulate that a location writes to all nodes without regard to transient status.
*/
public static final Selector writeAll = new Selector()
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
E select(Keyspace keyspace, L liveAndDown, L live)
{
return liveAndDown.all();
}
Expand All @@ -380,22 +385,33 @@ E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
E select(Keyspace keyspace, L liveAndDown, L live)
{
if (!any(liveAndDown.all(), Replica::isTransient))
return liveAndDown.all();

assert consistencyLevel != EACH_QUORUM;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change consistencyLevel is unused in both cases.


ReplicaCollection.Builder<E> contacts = liveAndDown.all().newBuilder(liveAndDown.all().size());
contacts.addAll(liveAndDown.natural().filterLazily(Replica::isFull));
contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
contacts.addAll(liveAndDown.pending());

// TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all)
int liveCount = contacts.count(live.all()::contains);
int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount;
if (requiredTransientCount > 0)
contacts.addAll(live.natural().filterLazily(Replica::isTransient, requiredTransientCount));
/**
* Per CASSANDRA-14768, we ensure we write to at least a QUORUM of nodes in every DC,
* regardless of how many responses we need to wait for and our requested consistencyLevel.
* This is to minimally surprise users with transient replication; with normal writes, we
* soft-ensure that we reach QUORUM in all DCs we are able to, by writing to every node;
* even if we don't wait for ACK, we have in both cases sent sufficient messages.
*/
ObjectIntOpenHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending());
addToCountPerDc(requiredPerDc, live.natural().filter(Replica::isFull), -1);
addToCountPerDc(requiredPerDc, live.pending(), -1);

IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
for (Replica replica : filter(live.natural(), Replica::isTransient))
{
String dc = snitch.getDatacenter(replica);
if (requiredPerDc.addTo(dc, -1) >= 0)
contacts.add(replica);
}
return contacts.build();
}
};
Expand Down Expand Up @@ -453,7 +469,7 @@ private static <E extends Endpoints<E>> E candidatesForRead(ConsistencyLevel con
private static <E extends Endpoints<E>> E contactForEachQuorumRead(Keyspace keyspace, E candidates)
{
assert keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy;
ObjectIntOpenHashMap<String> perDc = eachQuorumFor(keyspace);
ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace);

final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
return candidates.filter(replica -> {
Expand Down
24 changes: 21 additions & 3 deletions src/java/org/apache/cassandra/locator/Replicas.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.function.Predicate;

import com.carrotsearch.hppc.ObjectIntOpenHashMap;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.DatabaseDescriptor;
Expand Down Expand Up @@ -84,20 +85,37 @@ public static ReplicaCount countInOurDc(ReplicaCollection<?> replicas)
return count;
}

public static ObjectObjectOpenHashMap<String, ReplicaCount> countPerDc(Collection<String> dataCenters, Iterable<Replica> liveReplicas)
/**
* count the number of full and transient replicas, separately, for each DC
*/
public static ObjectObjectOpenHashMap<String, ReplicaCount> countPerDc(Collection<String> dataCenters, Iterable<Replica> replicas)
{
ObjectObjectOpenHashMap<String, ReplicaCount> perDc = new ObjectObjectOpenHashMap<>(dataCenters.size());
for (String dc: dataCenters)
perDc.put(dc, new ReplicaCount());

for (Replica replica : liveReplicas)
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
for (Replica replica : replicas)
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
String dc = snitch.getDatacenter(replica);
perDc.get(dc).increment(replica);
}
return perDc;
}

/**
* increment each of the map's DC entries for each matching replica provided
*/
public static void addToCountPerDc(ObjectIntOpenHashMap<String> perDc, Iterable<Replica> replicas, int add)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
for (Replica replica : replicas)
{
String dc = snitch.getDatacenter(replica);
perDc.addTo(dc, add);
}
}

/**
* A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,11 @@
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.locator.Replica.fullReplica;
import static org.apache.cassandra.locator.Replica.transientReplica;
import static org.apache.cassandra.locator.ReplicaUtils.*;

public class ReplicaCollectionTest
{

static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP;
static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE;
static final List<InetAddressAndPort> ALL_EP;
static final List<Range<Token>> ALL_R;
static
{
try
{
EP1 = InetAddressAndPort.getByName("127.0.0.1");
EP2 = InetAddressAndPort.getByName("127.0.0.2");
EP3 = InetAddressAndPort.getByName("127.0.0.3");
EP4 = InetAddressAndPort.getByName("127.0.0.4");
EP5 = InetAddressAndPort.getByName("127.0.0.5");
BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
R1 = range(0, 1);
R2 = range(1, 2);
R3 = range(2, 3);
R4 = range(3, 4);
R5 = range(4, 0);
BROADCAST_RANGE = range(10, 11);
NULL_RANGE = range(10000, 10001);
ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP);
ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}

static Token tk(long t)
{
return new Murmur3Partitioner.LongToken(t);
}

static Range<Token> range(long left, long right)
{
return new Range<>(tk(left), tk(right));
}

static class TestCase<C extends AbstractReplicaCollection<C>>
{
final boolean isBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.Assert;
import org.junit.Test;

import static org.apache.cassandra.locator.ReplicaCollectionTest.*;
import static org.apache.cassandra.locator.ReplicaUtils.*;

public class ReplicaLayoutTest
{
Expand Down
Loading