Permalink
Browse files

multithreaded hint replay

patch by vijay; reviewed by jbellis for CASSANDRA-4189
  • Loading branch information...
1 parent 8516bcb commit b993eecfb033d01345718d396a2c32c5120b5e0e @jbellis jbellis committed Jun 14, 2012
View
@@ -1,4 +1,5 @@
1.2-dev
+ * multithreaded hint replay (CASSANDRA-4189)
* add inter-node message compression (CASSANDRA-3127)
* enforce 1m min keycache for auto (CASSANDRA-4306)
* remove COPP (CASSANDRA-2479)
View
@@ -33,6 +33,8 @@ Upgrading
- The somewhat ill-concieved CollatingOrderPreservingPartitioner
has been removed. Use RandomPartitioner (recommended) or
ByteOrderedPartitioner instead.
+ - Global option hinted_handoff_throttle_delay_in_ms has been removed.
+ hinted_handoff_throttle_in_kb has been added instead.
1.1.1
View
@@ -26,8 +26,12 @@ hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
# generated. After it has been dead this long, hints will be dropped.
max_hint_window_in_ms: 3600000 # one hour
-# Sleep this long after delivering each hint
-hinted_handoff_throttle_delay_in_ms: 1
+# throttle in KB's per second, per delivery thread
+hinted_handoff_throttle_in_kb: 1024
+# Number of threads with which to deliver hints;
+# Consider increasing this number when you have multi-dc deployments, since
+# cross-dc handoff tends to be slower
+max_hints_delivery_threads: 2
# The following setting populates the page cache on memtable flush and compaction
# WARNING: Enable this setting only when the whole node's data fits in memory.
@@ -119,7 +119,8 @@
public Double flush_largest_memtables_at = 1.0;
public Double reduce_cache_sizes_at = 1.0;
public double reduce_cache_capacity_to = 0.6;
- public int hinted_handoff_throttle_delay_in_ms = 0;
+ public int hinted_handoff_throttle_in_kb = 1024;
+ public int max_hints_delivery_threads = 1;
public boolean compaction_preheat_key_cache = true;
public boolean incremental_backups = false;
@@ -954,9 +954,14 @@ public static double getReduceCacheCapacityTo()
return conf.reduce_cache_capacity_to;
}
- public static int getHintedHandoffThrottleDelay()
+ public static int getHintedHandoffThrottleInKB()
{
- return conf.hinted_handoff_throttle_delay_in_ms;
+ return conf.hinted_handoff_throttle_in_kb;
+ }
+
+ public static int getMaxHintsThread()
+ {
+ return conf.max_hints_delivery_threads;
}
public static boolean getPreheatKeyCache()
@@ -33,6 +33,7 @@
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.*;
@@ -44,11 +45,13 @@
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -94,7 +97,11 @@
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
- private final ExecutorService executor = new JMXEnabledThreadPoolExecutor("HintedHandoff", Thread.MIN_PRIORITY);
+ private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), "HintedHandoff");
public void start()
{
@@ -119,20 +126,11 @@ public void run()
StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
- private static void sendMutation(InetAddress endpoint, RowMutation mutation) throws TimeoutException
+ private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws TimeoutException
{
IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
- MessagingService.instance().sendRR(mutation.createMessage(), endpoint, responseHandler);
+ MessagingService.instance().sendRR(message, endpoint, responseHandler);
responseHandler.get();
-
- try
- {
- Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
}
private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer hintId, long timestamp) throws IOException
@@ -260,6 +258,21 @@ private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, Di
private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
{
+ long hintSizes = 0;
+ Throttle hintThrottle = new Throttle("HintThrottle", new Throttle.ThroughputFunction()
+ {
+ public int targetThroughput()
+ {
+ if (DatabaseDescriptor.getHintedHandoffThrottleInKB() < 1)
+ // throttling disabled
+ return 0;
+ // total throughput
+ int totalBytesPerMS = (DatabaseDescriptor.getHintedHandoffThrottleInKB() * 1024) / 8 / 1000;
+ // per hint throughput (target bytes per MS)
+ return totalBytesPerMS / Math.max(1, executor.getActiveCount());
+ }
+ });
+
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
if (hintStore.isEmpty())
return; // nothing to do, don't confuse users by logging a no-op handoff
@@ -360,7 +373,11 @@ private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOExcep
{
if (rm != null)
{
- sendMutation(endpoint, rm);
+ MessageOut<RowMutation> message = rm.createMessage();
+ sendMutation(endpoint, message);
+ // throttle for the messages sent.
+ hintSizes += message.serializedSize(MessagingService.current_version);
+ hintThrottle.throttle(hintSizes);
rowsReplayed++;
}
deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
@@ -423,10 +440,10 @@ private void scheduleAllDeliveries()
*/
public void scheduleHintDelivery(final InetAddress to)
{
- logger.debug("deliverHints to {}", to);
- if (!queuedDeliveries.add(to))
+ // We should not deliver hints to the same host in 2 different threads
+ if (queuedDeliveries.contains(to) || !queuedDeliveries.add(to))
return;
-
+ logger.debug("Scheduling delivery of Hints to {}", to);
Runnable r = new WrappedRunnable()
{
public void runMayThrow() throws Exception
@@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.utils.FBUtilities;
@@ -112,4 +113,24 @@ public void serialize(DataOutputStream out, int version) throws IOException
if (payload != null)
serializer.serialize(payload, out, version);
}
+
+ public int serializedSize(int version)
+ {
+ int size = CompactEndpointSerializationHelper.serializedSize(from);
+
+ size += TypeSizes.NATIVE.sizeof(verb.ordinal());
+ size += TypeSizes.NATIVE.sizeof(parameters.size());
+ for (Map.Entry<String, byte[]> entry : parameters.entrySet())
+ {
+ TypeSizes.NATIVE.sizeof(entry.getKey());
+ TypeSizes.NATIVE.sizeof(entry.getValue().length);
+ size += entry.getValue().length;
+ }
+
+ long longSize = payload == null ? 0 : serializer.serializedSize(payload, version);
+ assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages
+ size += TypeSizes.NATIVE.sizeof((int) longSize);
+ size += longSize;
+ return size;
+ }
}

0 comments on commit b993eec

Please sign in to comment.