Skip to content

Commit

Permalink
Fix TimeoutException when there is a firewall issue.
Browse files Browse the repository at this point in the history
patch by Vijay; reviewed by jbellis for CASSANDRA-3533
  • Loading branch information
Vijay2win committed Apr 10, 2013
1 parent b31f48d commit 576efcd
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 14 deletions.
29 changes: 29 additions & 0 deletions src/java/org/apache/cassandra/gms/EchoMessage.java
@@ -0,0 +1,29 @@
package org.apache.cassandra.gms;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.cassandra.io.IVersionedSerializer;

public class EchoMessage
{
public static IVersionedSerializer<EchoMessage> serializer = new EchoMessageSerializer();

public static class EchoMessageSerializer implements IVersionedSerializer<EchoMessage>
{
public void serialize(EchoMessage t, DataOutput out, int version) throws IOException
{
}

public EchoMessage deserialize(DataInput in, int version) throws IOException
{
return new EchoMessage();
}

public long serializedSize(EchoMessage t, int version)
{
return 0;
}
}
}
44 changes: 30 additions & 14 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Expand Up @@ -33,6 +33,8 @@
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token; import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.StorageService;
Expand Down Expand Up @@ -759,21 +761,35 @@ void notifyFailureDetector(InetAddress endpoint, EndpointState remoteEndpointSta


} }


private void markAlive(InetAddress addr, EndpointState localState) private void markAlive(final InetAddress addr, final EndpointState localState)
{ {
if (logger.isTraceEnabled()) MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), EchoMessage.serializer);
logger.trace("marking as alive {}", addr); logger.trace("Sending a EchoMessage to {}", addr);
localState.markAlive(); IAsyncCallback echoHandler = new IAsyncCallback()
localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime {
liveEndpoints.add(addr); public boolean isLatencyForSnitch()
unreachableEndpoints.remove(addr); {
expireTimeEndpointMap.remove(addr); return false;
logger.debug("removing expire time for endpoint : " + addr); }
logger.info("InetAddress {} is now UP", addr);
for (IEndpointStateChangeSubscriber subscriber : subscribers) public void response(MessageIn msg)
subscriber.onAlive(addr, localState); {
if (logger.isTraceEnabled()) if (logger.isTraceEnabled())
logger.trace("Notified " + subscribers); logger.trace("marking as alive {}", addr);
localState.markAlive();
localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime
liveEndpoints.add(addr);
unreachableEndpoints.remove(addr);
expireTimeEndpointMap.remove(addr);
logger.debug("removing expire time for endpoint : " + addr);
logger.info("InetAddress {} is now UP", addr);
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onAlive(addr, localState);
if (logger.isTraceEnabled())
logger.trace("Notified " + subscribers);
}
};
MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
} }


private void markDead(InetAddress addr, EndpointState localState) private void markDead(InetAddress addr, EndpointState localState)
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/net/MessagingService.java
Expand Up @@ -48,6 +48,7 @@
import org.apache.cassandra.db.*; import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.EchoMessage;
import org.apache.cassandra.gms.GossipDigestAck; import org.apache.cassandra.gms.GossipDigestAck;
import org.apache.cassandra.gms.GossipDigestAck2; import org.apache.cassandra.gms.GossipDigestAck2;
import org.apache.cassandra.gms.GossipDigestSyn; import org.apache.cassandra.gms.GossipDigestSyn;
Expand Down Expand Up @@ -116,6 +117,7 @@ public enum Verb
MIGRATION_REQUEST, MIGRATION_REQUEST,
GOSSIP_SHUTDOWN, GOSSIP_SHUTDOWN,
_TRACE, // dummy verb so we can use MS.droppedMessages _TRACE, // dummy verb so we can use MS.droppedMessages
ECHO,
// use as padding for backwards compatability where a previous version needs to validate a verb from the future. // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
UNUSED_1, UNUSED_1,
UNUSED_2, UNUSED_2,
Expand Down Expand Up @@ -152,6 +154,7 @@ public enum Verb
put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE); put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
put(Verb.COUNTER_MUTATION, Stage.MUTATION); put(Verb.COUNTER_MUTATION, Stage.MUTATION);
put(Verb.SNAPSHOT, Stage.MISC); put(Verb.SNAPSHOT, Stage.MISC);
put(Verb.ECHO, Stage.GOSSIP);
put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
Expand Down Expand Up @@ -190,6 +193,7 @@ public enum Verb
put(Verb.INDEX_SCAN, IndexScanCommand.serializer); put(Verb.INDEX_SCAN, IndexScanCommand.serializer);
put(Verb.REPLICATION_FINISHED, null); put(Verb.REPLICATION_FINISHED, null);
put(Verb.COUNTER_MUTATION, CounterMutation.serializer); put(Verb.COUNTER_MUTATION, CounterMutation.serializer);
put(Verb.ECHO, EchoMessage.serializer);
}}; }};


/** /**
Expand Down
21 changes: 21 additions & 0 deletions src/java/org/apache/cassandra/service/EchoVerbHandler.java
@@ -0,0 +1,21 @@
package org.apache.cassandra.service;

import org.apache.cassandra.gms.EchoMessage;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EchoVerbHandler implements IVerbHandler<EchoMessage>
{
private static final Logger logger = LoggerFactory.getLogger(EchoVerbHandler.class);

public void doVerb(MessageIn<EchoMessage> message, int id)
{
MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, new EchoMessage(), EchoMessage.serializer);
logger.trace("Sending a EchoMessage reply {}", message.from);
MessagingService.instance().sendReply(echoMessage, id, message.from);
}
}
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -256,6 +256,7 @@ public StorageService()
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST, new MigrationRequestVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST, new MigrationRequestVerbHandler());


MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler());


// spin up the streaming service so it is available for jmx tools. // spin up the streaming service so it is available for jmx tools.
if (StreamingService.instance == null) if (StreamingService.instance == null)
Expand Down
Expand Up @@ -83,6 +83,7 @@ public static void scanClasspath()
expectedClassNames.add("ColumnFamilySerializer"); expectedClassNames.add("ColumnFamilySerializer");
expectedClassNames.add("CompressionInfoSerializer"); expectedClassNames.add("CompressionInfoSerializer");
expectedClassNames.add("ChunkSerializer"); expectedClassNames.add("ChunkSerializer");
expectedClassNames.add("EchoMessageSerializer");


discoveredClassNames = new ArrayList<String>(); discoveredClassNames = new ArrayList<String>();
String cp = System.getProperty("java.class.path"); String cp = System.getProperty("java.class.path");
Expand Down

0 comments on commit 576efcd

Please sign in to comment.