Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

13993 #191

Closed
wants to merge 4 commits into from
Closed

13993 #191

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions .circleci/config.yml
Expand Up @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only
- build
# Set env_settings, env_vars, and workflows/build_and_run_tests based on environment
env_settings: &env_settings
<<: *default_env_settings
#<<: *high_capacity_env_settings
#<<: *default_env_settings
<<: *high_capacity_env_settings
env_vars: &env_vars
<<: *resource_constrained_env_vars
#<<: *high_capacity_env_vars
#<<: *resource_constrained_env_vars
<<: *high_capacity_env_vars
workflows:
version: 2
build_and_run_tests: *default_jobs
#build_and_run_tests: *default_jobs
#build_and_run_tests: *with_dtest_jobs_only
#build_and_run_tests: *with_dtest_jobs
build_and_run_tests: *with_dtest_jobs
docker_image: &docker_image kjellman/cassandra-test:0.4.3
version: 2
jobs:
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Expand Up @@ -373,6 +373,11 @@ public class Config

public String full_query_log_dir = null;

// parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
public int block_for_peers_percentage = 70;
public int block_for_peers_timeout_in_secs = 10;


/**
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
*/
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -2522,4 +2522,14 @@ public static String getFullQueryLogPath()
{
return conf.full_query_log_dir;
}

public static int getBlockForPeersPercentage()
{
return conf.block_for_peers_percentage;
}

public static int getBlockForPeersTimeoutInSeconds()
{
return conf.block_for_peers_timeout_in_secs;
}
}
29 changes: 22 additions & 7 deletions src/java/org/apache/cassandra/net/MessageOut.java
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
Expand Down Expand Up @@ -96,6 +97,11 @@ public class MessageOut<T>
//the second object is the POJO to serialize
public final List<Object> parameters;

/**
* Allows sender to explicitly state which connection type the message should be sent on.
*/
public final ConnectionType connectionType;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A part of me wants to say don't make MessageOut bigger, but it's probably a drop in the bucket in terms of allocation rate for processing network messages.


/**
* Memoization of the serialized size of the just the payload.
*/
Expand All @@ -122,24 +128,33 @@ public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T>
this(verb,
payload,
serializer,
isTracing()
? Tracing.instance.getTraceHeaders()
: ImmutableList.of());
isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
null);
}

public MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, ConnectionType connectionType)
{
this(verb,
payload,
serializer,
isTracing() ? Tracing.instance.getTraceHeaders() : ImmutableList.of(),
connectionType);
}

private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
{
this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters);
this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, serializer, parameters, connectionType);
}

@VisibleForTesting
public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters)
public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, List<Object> parameters, ConnectionType connectionType)
{
this.from = from;
this.verb = verb;
this.payload = payload;
this.serializer = serializer;
this.parameters = parameters;
this.connectionType = connectionType;
}

public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
Expand All @@ -148,7 +163,7 @@ public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
newParameters.addAll(parameters);
newParameters.add(type);
newParameters.add(value);
return new MessageOut<T>(verb, payload, serializer, newParameters);
return new MessageOut<T>(verb, payload, serializer, newParameters, connectionType);
}

public Stage getStage()
Expand Down
16 changes: 13 additions & 3 deletions src/java/org/apache/cassandra/net/MessagingService.java
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.management.MBeanServer;
import javax.management.ObjectName;

Expand Down Expand Up @@ -255,15 +254,18 @@ public long getTimeout()
return DatabaseDescriptor.getRangeRpcTimeout();
}
},
// remember to add new verbs at the end, since we serialize by ordinal
PING(),

// add new verbs after the existing verbs, but *before* the UNUSED verbs, since we serialize by ordinal.
// UNUSED verbs serve as padding for backwards compatability where a previous version needs to validate a verb from the future.
UNUSED_1,
UNUSED_2,
UNUSED_3,
UNUSED_4,
UNUSED_5,
;

private int id;
private final int id;
Verb()
{
id = ordinal();
Expand Down Expand Up @@ -291,7 +293,11 @@ public int getId()
static
{
for (Verb v : values())
{
if (idToVerbMap.containsKey(v.getId()))
throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + v.getId());
idToVerbMap.put(v.getId(), v);
}
}

public static Verb fromId(int id)
Expand Down Expand Up @@ -347,6 +353,8 @@ public static Verb fromId(int id)
put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);

put(Verb.PING, Stage.READ);
}};

/**
Expand Down Expand Up @@ -385,6 +393,8 @@ public static Verb fromId(int id)
put(Verb.HINT, HintMessage.serializer);
put(Verb.BATCH_STORE, Batch.serializer);
put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);

put(Verb.PING, PingMessage.serializer);
}};

/**
Expand Down
82 changes: 82 additions & 0 deletions src/java/org/apache/cassandra/net/PingMessage.java
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.net;

import java.io.IOException;

import org.apache.cassandra.hints.HintResponse;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;

/**
* Conceptually the same as {@link org.apache.cassandra.gms.EchoMessage}, but indicates to the recipient which
* {@link ConnectionType} should be used for the response.
*/
public class PingMessage
{
public static IVersionedSerializer<PingMessage> serializer = new PingMessageSerializer();

public static final PingMessage smallChannelMessage = new PingMessage(ConnectionType.SMALL_MESSAGE);
public static final PingMessage largeChannelMessage = new PingMessage(ConnectionType.LARGE_MESSAGE);
public static final PingMessage gossipChannelMessage = new PingMessage(ConnectionType.GOSSIP);

public final ConnectionType connectionType;

public PingMessage(ConnectionType connectionType)
{
this.connectionType = connectionType;
}

public static class PingMessageSerializer implements IVersionedSerializer<PingMessage>
{
public void serialize(PingMessage t, DataOutputPlus out, int version) throws IOException
{
out.writeByte(t.connectionType.getId());
}

public PingMessage deserialize(DataInputPlus in, int version) throws IOException
{
ConnectionType connectionType = ConnectionType.fromId(in.readByte());

// if we ever create a new connection type, then during a rolling upgrade, the old nodes won't know about
// the new connection type (as it won't recognize the id), so just default to the small message type.
if (connectionType == null)
connectionType = ConnectionType.SMALL_MESSAGE;

switch (connectionType)
{
case LARGE_MESSAGE:
return largeChannelMessage;
case GOSSIP:
return gossipChannelMessage;
case SMALL_MESSAGE:
default:
return smallChannelMessage;
}
}

public long serializedSize(PingMessage t, int version)
{
return 1;
}
}
}
31 changes: 31 additions & 0 deletions src/java/org/apache/cassandra/net/PingVerbHandler.java
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.net;

public class PingVerbHandler implements IVerbHandler<PingMessage>
{
@Override
public void doVerb(MessageIn<PingMessage> message, int id)
{
MessageOut<PongMessage> msg = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, PongMessage.instance,
PongMessage.serializer,
message.payload.connectionType);
MessagingService.instance().sendReply(msg, id, message.from);
}
}
50 changes: 50 additions & 0 deletions src/java/org/apache/cassandra/net/PongMessage.java
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.net;

import java.io.IOException;

import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;

public class PongMessage
{
public static final PongMessage instance = new PongMessage();
public static IVersionedSerializer<PongMessage> serializer = new PongMessage.PongMessageSerializer();

private PongMessage()
{ }

public static class PongMessageSerializer implements IVersionedSerializer<PongMessage>
{
public void serialize(PongMessage t, DataOutputPlus out, int version) throws IOException
{ }

public PongMessage deserialize(DataInputPlus in, int version) throws IOException
{
return instance;
}

public long serializedSize(PongMessage t, int version)
{
return 0;
}
}
}