Skip to content

Commit

Permalink
after round 1 of feedback from Ariel:
Browse files Browse the repository at this point in the history
init all connection types;
allow messages to set ConnType in MsgOut;
add params to Config, not yaml
move blockFor logic out of MessagingService into it's own class;
added utests
  • Loading branch information
jasobrown committed Feb 17, 2018
1 parent 8011032 commit 4cb1e72
Show file tree
Hide file tree
Showing 16 changed files with 456 additions and 165 deletions.
12 changes: 6 additions & 6 deletions .circleci/config.yml
Expand Up @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only
- build
# Set env_settings, env_vars, and workflows/build_and_run_tests based on environment
env_settings: &env_settings
<<: *default_env_settings
#<<: *high_capacity_env_settings
#<<: *default_env_settings
<<: *high_capacity_env_settings
env_vars: &env_vars
<<: *resource_constrained_env_vars
#<<: *high_capacity_env_vars
#<<: *resource_constrained_env_vars
<<: *high_capacity_env_vars
workflows:
version: 2
build_and_run_tests: *default_jobs
#build_and_run_tests: *default_jobs
#build_and_run_tests: *with_dtest_jobs_only
#build_and_run_tests: *with_dtest_jobs
build_and_run_tests: *with_dtest_jobs
docker_image: &docker_image kjellman/cassandra-test:0.4.3
version: 2
jobs:
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Expand Up @@ -373,6 +373,11 @@ public class Config

public String full_query_log_dir = null;

// parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
public int block_for_peers_percentage = 70;
public int block_for_peers_timeout_in_secs = 10;


/**
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
*/
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -2522,4 +2522,14 @@ public static String getFullQueryLogPath()
{
return conf.full_query_log_dir;
}

public static int getBlockForPeersPercentage()
{
return conf.block_for_peers_percentage;
}

public static int getBlockForPeersTimeoutInSeconds()
{
return conf.block_for_peers_timeout_in_secs;
}
}
29 changes: 22 additions & 7 deletions src/java/org/apache/cassandra/net/MessageOut.java
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
Expand Down Expand Up @@ -96,6 +97,11 @@ public class MessageOut<T>
//the second object is the POJO to serialize
public final List<Object> parameters;

/**
* Allows sender to explicitly state which connection type the message should be sent on.
*/
public final ConnectionType connectionType;

/**
* Memoization of the serialized size of the just the payload.
*/
Expand All @@ -122,24 +128,33 @@ public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T>
this(verb,
payload,
serializer,
isTracing()
? Tracing.instance.getTraceHeaders()
: ImmutableList.of());
isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
null);
}

public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, ConnectionType connectionType)
{
this(verb,
payload,
serializer,
isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
connectionType);
}

private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
{
this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters);
this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters, connectionType);
}

@VisibleForTesting
public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
{
this.from = from;
this.verb = verb;
this.payload = payload;
this.serializer = serializer;
this.parameters = parameters;
this.connectionType = connectionType;
}

public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
Expand All @@ -148,7 +163,7 @@ public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
newParameters.addAll(parameters);
newParameters.add(type);
newParameters.add(value);
return new MessageOut<T>(verb, payload, serializer, newParameters);
return new MessageOut<T>(verb, payload, serializer, newParameters, connectionType);
}

public Stage getStage()
Expand Down
127 changes: 8 additions & 119 deletions src/java/org/apache/cassandra/net/MessagingService.java
Expand Up @@ -35,14 +35,12 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;

import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
Expand All @@ -60,7 +58,6 @@
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand All @@ -82,13 +79,11 @@
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.EchoMessage;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.GossipDigestAck;
import org.apache.cassandra.gms.GossipDigestAck2;
import org.apache.cassandra.gms.GossipDigestSyn;
import org.apache.cassandra.hints.HintMessage;
import org.apache.cassandra.hints.HintResponse;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand Down Expand Up @@ -260,16 +255,17 @@ public long getTimeout()
}
},
PING(),
PONG(),
// remember to add new verbs at the end, since we serialize by ordinal

// add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal.
// UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future.
UNUSED_1,
UNUSED_2,
UNUSED_3,
UNUSED_4,
UNUSED_5,
;

private int id;
private final int id;
Verb()
{
id = ordinal();
Expand Down Expand Up @@ -297,7 +293,11 @@ public int getId()
static
{
for (Verb v : values())
{
if (idToVerbMap.containsKey(v.getId()))
throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId());
idToVerbMap.put(v.getId(), v);
}
}

public static Verb fromId(int id)
Expand Down Expand Up @@ -355,7 +355,6 @@ public static Verb fromId(int id)
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);

put(Verb.PING, Stage.READ);
put(Verb.PONG, Stage.READ);
}};

/**
Expand Down Expand Up @@ -396,7 +395,6 @@ public static Verb fromId(int id)
put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);

put(Verb.PING, PingMessage.serializer);
put(Verb.PONG, PongMessage.serializer);
}};

/**
Expand Down Expand Up @@ -1683,113 +1681,4 @@ public void reloadSslCertificates()
{
SSLFactory.checkCertFilesForHotReloading();
}

public void blockForPeers()
{
// TODO make these yaml props?
int alivePercent = Integer.getInteger(Config.PROPERTY_PREFIX + "blockForPeers.percent", 70);
if (alivePercent < 0)
alivePercent = 0;
else if (alivePercent > 100)
alivePercent = 100;

int aliveTimeoutSecs = Integer.getInteger(Config.PROPERTY_PREFIX + "blockForPeers.timeout_in_secs", 10);
if (aliveTimeoutSecs < 0)
aliveTimeoutSecs = 1;
else if (aliveTimeoutSecs > 100)
aliveTimeoutSecs = 100;

if (alivePercent > 0)
blockForPeers(alivePercent, aliveTimeoutSecs);
}

private void blockForPeers(int targetAlivePercent, int aliveTimeoutSecs)
{
// grab a snapshot of the current cluster from Gossiper. this is completely prone to race conditions, but it's
// good enough for the purposes of blocking until some certain percentage of nodes are considered 'alive'/connected.
Set<Map.Entry<InetAddressAndPort, EndpointState>> peers = new HashSet<>(Gossiper.instance.getEndpointStates());

// remove current node from the set
peers = peers.stream()
.filter(entry -> !entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
.collect(Collectors.toSet());

final int totalSize = peers.size();

// don't block if there's no other nodes in the cluster (or we don't know about them)
if (totalSize <= 1)
return;

logger.info("choosing to block until {}% of peers are marked alive; max time to wait = {} seconds", targetAlivePercent, aliveTimeoutSecs);

// first, send out a ping message to open up the non-gossip connections
AtomicInteger connectedCount = sendPingMessages(peers);

long startNanos = System.nanoTime();
long expirationNanos = startNanos + TimeUnit.SECONDS.toNanos(aliveTimeoutSecs);
int completedRounds = 0;
while (true)
{
int currentAlive = 0;
for (Map.Entry<InetAddressAndPort, EndpointState> entry : peers)
{
if (entry.getValue().isAlive())
currentAlive++;
}

float currentAlivePercent = ((float) currentAlive / (float) totalSize) * 100;
float currentConnectedPercent = ((float) connectedCount.get() / (float) totalSize) * 100;
if (currentAlivePercent >= targetAlivePercent && currentConnectedPercent >= targetAlivePercent)
{
logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
"and {}% ({} / {}) of peers as connected, " +
"both of which are above the desired threshold of {}%",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
currentAlivePercent, currentAlive, totalSize,
currentConnectedPercent, connectedCount.get(), totalSize,
targetAlivePercent);
return;
}

completedRounds++;
// perform at least two rounds of checking, else this is kinda useless (and the operator set the aliveTimeoutSecs too low)
if (completedRounds > 2 && expirationNanos < System.nanoTime())
{
logger.info("after {} milliseconds, found {}% ({} / {}) of peers as marked alive, " +
"and {}% ({} / {}) of peers as connected, " +
"one or both of which is below the desired threshold of {}%",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
currentAlivePercent, currentAlive, totalSize,
currentConnectedPercent, connectedCount.get(), totalSize,
targetAlivePercent);
return;
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}

private AtomicInteger sendPingMessages(Set<Map.Entry<InetAddressAndPort, EndpointState>> peers)
{
AtomicInteger receivedCount = new AtomicInteger(0);
IAsyncCallback echoHandler = new IAsyncCallback()
{
@Override
public boolean isLatencyForSnitch()
{
return false;
}

@Override
public void response(MessageIn msg)
{
receivedCount.incrementAndGet();
}
};

MessageOut<PingMessage> msg = new MessageOut<>(MessagingService.Verb.PING, PingMessage.instance, PingMessage.serializer);
for (Map.Entry<InetAddressAndPort, EndpointState> peer : peers)
MessagingService.instance().sendRR(msg, peer.getKey(), echoHandler);

return receivedCount;
}
}
44 changes: 38 additions & 6 deletions src/java/org/apache/cassandra/net/PingMessage.java
Expand Up @@ -20,31 +20,63 @@

import java.io.IOException;

import org.apache.cassandra.hints.HintResponse;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;

/**
* Conceptually the same as {@link org.apache.cassandra.gms.EchoMessage}, but indicates to the recipient which
* {@link ConnectionType} should be used for the response.
*/
public class PingMessage
{
public static final PingMessage instance = new PingMessage();
public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();

private PingMessage()
{ }
public static final PingMessage smallChannelMessage = new PingMessage(ConnectionType.SMALL_MESSAGE);
public static final PingMessage largeChannelMessage = new PingMessage(ConnectionType.LARGE_MESSAGE);
public static final PingMessage gossipChannelMessage = new PingMessage(ConnectionType.GOSSIP);

public final ConnectionType connectionType;

public PingMessage(ConnectionType connectionType)
{
this.connectionType = connectionType;
}

public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
{
public void serialize(PingMessage t, DataOutputPlus out, int version) throws IOException
{ }
{
out.writeByte(t.connectionType.getId());
}

public PingMessage deserialize(DataInputPlus in, int version) throws IOException
{
return instance;
ConnectionType connectionType = ConnectionType.fromId(in.readByte());

// if we ever create a new connection type, then during a rolling upgrade, the old nodes won't know about
// the new connection type (as it won't recognize the id), so just default to the small message type.
if (connectionType == null)
connectionType = ConnectionType.SMALL_MESSAGE;

switch (connectionType)
{
case LARGE_MESSAGE:
return largeChannelMessage;
case GOSSIP:
return gossipChannelMessage;
case SMALL_MESSAGE:
default:
return smallChannelMessage;
}
}

public long serializedSize(PingMessage t, int version)
{
return 0;
return 1;
}
}
}
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/net/PingVerbHandler.java
Expand Up @@ -23,7 +23,9 @@ public class PingVerbHandler implements IVerbHandler<PingMessage>
@Override
public void doVerb(MessageIn<PingMessage> message, int id)
{
MessageOut<PongMessage> msg = new MessageOut<>(MessagingService.Verb.PONG, PongMessage.instance, PongMessage.serializer);
MessageOut<PongMessage> msg = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, PongMessage.instance,
PongMessage.serializer,
message.payload.connectionType);
MessagingService.instance().sendReply(msg, id, message.from);
}
}

0 comments on commit 4cb1e72

Please sign in to comment.