Skip to content

Commit

Permalink
reduce garbage generated by MessagingServiceto prevent loadspikes
Browse files Browse the repository at this point in the history
patch by jbellis; reviewed by brandonwilliams and tjake for CASSANDRA-2058

git-svn-id: https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.6@1064193 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jbellis committed Jan 27, 2011
1 parent 8bd8558 commit 44cda97
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 219 deletions.
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
0.6.11
* reduce garbage generated by MessagingService to prevent load spikes
(CASSANDRA-2058)


0.6.10
* buffer network stack to avoid inefficient small TCP messages while avoiding
the nagle/delayed ack problem (CASSANDRA-1896)
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/HintedHandOffManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private static boolean sendMessage(InetAddress endPoint, String tableName, Strin
rm.add(cf);
Message message = rm.makeRowMutationMessage();
WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
MessagingService.instance.sendRR(message, new InetAddress[] { endPoint }, responseHandler);
MessagingService.instance.sendRR(message, endPoint, responseHandler);
try
{
responseHandler.get();
Expand Down
2 changes: 0 additions & 2 deletions src/java/org/apache/cassandra/net/AsyncResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ public void result(Message response)
{
lock_.unlock();
}

MessagingService.removeRegisteredCallback(response.getMessageId());
}

public InetAddress getFrom()
Expand Down
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/net/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ String getMessageId()
return messageId_;
}

void setMessageId(String id)
{
messageId_ = id;
}

byte[] getDetail(Object key)
{
return details_.get(key);
Expand Down
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/net/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ public String getMessageId()
return header_.getMessageId();
}

void setMessageId(String id)
{
header_.setMessageId(id);
}

// TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
Expand Down
128 changes: 25 additions & 103 deletions src/java/org/apache/cassandra/net/MessagingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,12 +44,12 @@
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.GCInspector;
import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.GuidGenerator;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashSet;

public class MessagingService
{
Expand All @@ -62,8 +61,7 @@ public class MessagingService
public static final int PROTOCOL_MAGIC = 0xCA552DFA;

/* This records all the results mapped by message Id */
private static ExpiringMap<String, IMessageCallback> callbacks;
private static ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
private static ExpiringMap<String, Pair<InetAddress, IMessageCallback>> callbacks;

/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
Expand Down Expand Up @@ -103,21 +101,16 @@ protected MessagingService()
listenGate = new SimpleCondition();
verbHandlers_ = new HashMap<StorageService.Verb, IVerbHandler>();

Function<String, ?> timeoutReporter = new Function<String, Object>()
Function<Pair<String, Pair<InetAddress, IMessageCallback>>, ?> timeoutReporter = new Function<Pair<String, Pair<InetAddress, IMessageCallback>>, Object>()
{
public Object apply(String messageId)
public Object apply(Pair<String, Pair<InetAddress, IMessageCallback>> pair)
{
Collection<InetAddress> addresses = targets.remove(messageId);
if (addresses == null)
return null;

for (InetAddress address : addresses)
addLatency(address, (double) DatabaseDescriptor.getRpcTimeout());

Pair<InetAddress, IMessageCallback> expiredValue = pair.right;
maybeAddLatency(expiredValue.right, expiredValue.left, (double) DatabaseDescriptor.getRpcTimeout());
return null;
}
};
callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
callbacks = new ExpiringMap<String, Pair<InetAddress, IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);

defaultExecutor_ = new JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
Expand All @@ -133,6 +126,18 @@ public void run()
timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
}

/**
* Track latency information for the dynamic snitch
* @param cb: the callback associated with this message -- this lets us know if it's a message type we're interested in
* @param address: the host that replied to the message
* @param latency
*/
public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
{
if (cb instanceof QuorumResponseHandler || cb instanceof AsyncResult)
addLatency(address, latency);
}

public void addLatency(InetAddress address, double latency)
{
for (ILatencySubscriber subscriber : subscribers)
Expand Down Expand Up @@ -227,49 +232,9 @@ public IVerbHandler getVerbHandler(StorageService.Verb type)
return verbHandlers_.get(type);
}

/**
* Send a message to a given endpoint.
* @param message message to be sent.
* @param to endpoint to which the message needs to be sent
* @return an reference to an IAsyncResult which can be queried for the
* response
*/
public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
addCallback(cb, messageId);
for (InetAddress endpoint : to)
{
putTarget(messageId, endpoint);
sendOneWay(message, endpoint);
}
return messageId;
}

private static void putTarget(String messageId, InetAddress endpoint)
{
Collection<InetAddress> addresses = targets.get(messageId);
if (addresses == null)
{
addresses = new NonBlockingHashSet<InetAddress>();
Collection<InetAddress> oldAddresses = targets.putIfAbsent(messageId, addresses);
if (oldAddresses != null)
addresses = oldAddresses;
}
addresses.add(endpoint);
}

private static void removeTarget(String messageId, InetAddress from)
{
Collection<InetAddress> addresses = targets.get(messageId);
// null is expected if we removed the callback or we got a reply after its timeout expired
if (addresses != null)
addresses.remove(from);
}

public void addCallback(IAsyncCallback cb, String messageId)
private void addCallback(IMessageCallback cb, String messageId, InetAddress to)
{
callbacks.put(messageId, cb);
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
}

/**
Expand All @@ -285,42 +250,11 @@ public void addCallback(IAsyncCallback cb, String messageId)
public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
addCallback(cb, messageId);
putTarget(messageId, to);
addCallback(cb, messageId, to);
sendOneWay(message, to);
return messageId;
}

/**
* Send a message to a given endpoint. The ith element in the <code>messages</code>
* array is sent to the ith element in the <code>to</code> array.This method assumes
* there is a one-one mapping between the <code>messages</code> array and
* the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
* This method also informs the MessagingService to wait for at least
* <code>howManyResults</code> responses to determine success of failure.
* @param messages messages to be sent.
* @param to endpoints to which the message needs to be sent
* @param cb callback interface which is used to pass the responses or
* suggest that a timeout occured to the invoker of the send().
* @return an reference to message id used to match with the result
*/
public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
{
if ( messages.length != to.length )
{
throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
}
String groupId = GuidGenerator.guid();
addCallback(cb, groupId);
for ( int i = 0; i < messages.length; ++i )
{
messages[i].setMessageId(groupId);
putTarget(groupId, to[i]);
sendOneWay(messages[i], to[i]);
}
return groupId;
}

/**
* Send a message to a given endpoint. This method adheres to the fire and forget
* style messaging.
Expand Down Expand Up @@ -368,8 +302,7 @@ public void sendOneWay(Message message, InetAddress to)
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
callbacks.put(message.getMessageId(), iar);
putTarget(message.getMessageId(), to);
addCallback(iar, message.getMessageId(), to);
sendOneWay(message, to);
return iar;
}
Expand Down Expand Up @@ -443,14 +376,8 @@ public static void receive(Message message)
}
}

public static IMessageCallback getRegisteredCallback(String messageId)
{
return callbacks.get(messageId);
}

public static IMessageCallback removeRegisteredCallback(String messageId)
public static Pair<InetAddress, IMessageCallback> removeRegisteredCallback(String messageId)
{
targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
return callbacks.remove(messageId);
}

Expand All @@ -459,11 +386,6 @@ public static long getRegisteredCallbackAge(String messageId)
return callbacks.getAge(messageId);
}

public static void responseReceivedFrom(String messageId, InetAddress from)
{
removeTarget(messageId, from);
}

public static void validateMagic(int magic) throws IOException
{
if (magic != PROTOCOL_MAGIC)
Expand Down
14 changes: 8 additions & 6 deletions src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@

package org.apache.cassandra.net;

import java.net.InetAddress;

import org.apache.log4j.Logger;

import org.apache.cassandra.utils.Pair;

public class ResponseVerbHandler implements IVerbHandler
{
private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );

public void doVerb(Message message)
{
String messageId = message.getMessageId();
MessagingService.responseReceivedFrom(messageId, message.getFrom());
double age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
IMessageCallback cb = MessagingService.getRegisteredCallback(messageId);
if (cb == null)
Pair<InetAddress, IMessageCallback> pair = MessagingService.removeRegisteredCallback(messageId);
if (pair == null)
return;

// if cb is not null, then age will be valid
MessagingService.instance.addLatency(message.getFrom(), age);
IMessageCallback cb = pair.right;
MessagingService.instance.maybeAddLatency(cb, message.getFrom(), age);

if (cb instanceof IAsyncCallback)
{
Expand All @@ -49,5 +52,4 @@ public void doVerb(Message message)
((IAsyncResult) cb).result(message);
}
}

}
25 changes: 14 additions & 11 deletions src/java/org/apache/cassandra/service/ConsistencyChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ public void run()
ReadCommand readCommandDigestOnly = constructReadMessage(true);
try
{
Message message = readCommandDigestOnly.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");

MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
DigestResponseHandler handler = new DigestResponseHandler();
for (InetAddress endpoint : replicas_)
{
if (!endpoint.equals(dataSource))
MessagingService.instance.sendOneWay(message, endpoint);
{
Message message = readCommandDigestOnly.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@" + endpoint);
MessagingService.instance.sendRR(message, endpoint, handler);
}
}
}
catch (IOException ex)
Expand Down Expand Up @@ -128,14 +129,16 @@ public synchronized void response(Message response)
if (!Arrays.equals(dataDigest, digest))
{
ReadCommand readCommand = constructReadMessage(false);
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId());
DataRepairHandler handler = new DataRepairHandler();
for (InetAddress endpoint : replicas_)
{
if (!endpoint.equals(dataSource))
MessagingService.instance.sendOneWay(message, endpoint);
{
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@" + endpoint);
MessagingService.instance.sendRR(message, endpoint, handler);
}
}

repairInvoked = true;
Expand Down
36 changes: 13 additions & 23 deletions src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,25 @@ public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseRes

public T get() throws TimeoutException, DigestMismatchException, IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
boolean success;
try
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
boolean success;
try
{
success = condition.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ex)
{
throw new AssertionError(ex);
}

if (!success)
{
StringBuilder sb = new StringBuilder("");
for (Message message : responses)
{
sb.append(message.getFrom());
}
throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " .");
}
success = condition.await(timeout, TimeUnit.MILLISECONDS);
}
finally
catch (InterruptedException ex)
{
throw new AssertionError(ex);
}

if (!success)
{
for (Message response : responses)
StringBuilder sb = new StringBuilder("");
for (Message message : responses)
{
MessagingService.removeRegisteredCallback(response.getMessageId());
sb.append(message.getFrom());
}
throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " .");
}

return responseResolver.resolve(responses);
Expand Down
Loading

0 comments on commit 44cda97

Please sign in to comment.