Permalink
Browse files

S4-7 fix a few issues with previous patch

- synchro with topology
- add error logging for emitter asynchronous tasks
- add missing config for tcp queue size
- TODO fix concurrency issue in TCPEmitter
  • Loading branch information...
1 parent 8e2427b commit 0931075a914782fc6c7ab11cd6716c91ece95bb3 @matthieumorel matthieumorel committed Feb 1, 2012
@@ -1,5 +1,6 @@
package org.apache.s4.comm.tcp;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.Hashtable;
@@ -34,23 +35,29 @@
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBiMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.name.Named;
-/*
- * TCPEmitter - Uses TCP to send messages across to other partitions.
- * - Message ordering between partitions is preserved.
- * - For efficiency, NettyEmitter.send() queues the messages partition-wise,
- * a threadPool sends the messages asynchronously; message dequeued only on success.
- * - Tolerates topology changes, partition re-mapping, and network glitches.
+/**
+ * TCPEmitter - Uses TCP to send messages across to other partitions. - Message ordering between partitions is
+ * preserved. - For efficiency, NettyEmitter.send() queues the messages partition-wise, a threadPool sends the messages
+ * asynchronously; message dequeued only on success. - Tolerates topology changes, partition re-mapping, and network
+ * glitches.
*/
public class TCPEmitter implements Emitter, TopologyChangeListener {
private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
+ // TODO must be configurable
private final int numRetries = 10;
+ // TODO must be configurable
+ private final long retryDelayMs = 10;
private final int bufferSize;
private Topology topology;
private final ClientBootstrap bootstrap;
+ // id for prefixing emitter threads names
+ private static int instanceId = 0;
+ private volatile long sentMsgCount = 0;
/*
* Channel used to send messages to each partition
@@ -70,7 +77,14 @@
/*
* Thread pool to actually send messages
*/
- private ExecutorService sendService = Executors.newCachedThreadPool();
+ private ExecutorService sendService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat("TCPEmitterSendServiceThread-#" + instanceId++ + "-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread paramThread, Throwable paramThrowable) {
+ logger.error("Cannot send message", paramThrowable);
+ }
+ }).build());
@Inject
public TCPEmitter(Topology topology, @Named("tcp.partition.queue_size") int bufferSize) throws InterruptedException {
@@ -107,7 +121,7 @@ public ChannelPipeline getPipeline() {
bootstrap.setOption("connectTimeoutMillis", 100);
}
- private static class Message implements ChannelFutureListener {
+ private class Message implements ChannelFutureListener {
private final SendQueue sendQ;
private final byte[] message;
private boolean sendInProcess;
@@ -129,8 +143,10 @@ private void sendMessage() {
@Override
public synchronized void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
+ sentMsgCount++;
synchronized (sendQ.messages) {
sendQ.messages.remove(this);
+ logger.debug("sent messages {}, msg size {}", sentMsgCount, sendQ.messages.size());
}
return;
}
@@ -146,7 +162,7 @@ public synchronized void operationComplete(ChannelFuture future) throws Exceptio
}
}
- private static class SendQueue {
+ private class SendQueue {
private final TCPEmitter emitter;
private final int partitionId;
private final int bufferSize;
@@ -168,7 +184,7 @@ private void spawnSendThread() {
sendThreadRecheck = true;
} else {
sendThreadActive = true;
- emitter.sendService.execute(new SendThread(this));
+ emitter.sendService.execute(new SendTask(this));
}
}
}
@@ -193,10 +209,10 @@ private void sendMessagesInQueue() {
}
}
- private static class SendThread extends Thread {
+ private static class SendTask implements Runnable {
private final SendQueue sendQ;
- SendThread(SendQueue sendQ) {
+ SendTask(SendQueue sendQ) {
this.sendQ = sendQ;
}
@@ -220,8 +236,10 @@ public void run() {
private boolean connectTo(Integer partitionId) {
ClusterNode clusterNode = partitionNodeMap.get(partitionId);
-
if (clusterNode == null) {
+ if (topology.getTopology().getNodes().size() <= partitionId) {
+ return false;
+ }
clusterNode = topology.getTopology().getNodes().get(partitionId);
partitionNodeMap.forcePut(partitionId, clusterNode);
}
@@ -240,7 +258,7 @@ private boolean connectTo(Integer partitionId) {
return true;
}
try {
- Thread.sleep(10);
+ Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
clusterNode.getPort()));
@@ -256,8 +274,9 @@ private void sendMessage(int partitionId, Message m) {
while (true) {
if (!partitionChannelMap.containsKey(partitionId)) {
- connectTo(partitionId);
- continue;
+ if (!connectTo(partitionId)) {
+ continue;
+ }
}
SendQueue sendQ = sendQueues.get(partitionId);
@@ -12,6 +12,7 @@
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Topology;
import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBiMap;
import com.google.inject.Inject;
@@ -29,6 +30,8 @@ public long getMessageDropInQueueCount() {
@Inject
public UDPEmitter(Topology topology) {
+ LoggerFactory.getLogger(getClass()).debug("UDPEmitter with topology {}",
+ topology.getTopology().getPartitionCount());
this.topology = topology;
topology.addListener(this);
nodes = HashBiMap.create(topology.getTopology().getNodes().size());
@@ -48,7 +51,10 @@ public boolean send(int partitionId, byte[] message) {
try {
ClusterNode node = nodes.get(partitionId);
if (node == null) {
- throw new RuntimeException(String.format("Bad partition id %d", partitionId));
+ LoggerFactory.getLogger(getClass()).error(
+ "Cannot send message to partition {} because this partition is not visible to this emitter",
+ partitionId);
+ return false;
}
byte[] byteBuffer = new byte[message.length];
System.arraycopy(message, 0, byteBuffer, 0, message.length);
@@ -11,15 +11,15 @@
import com.google.inject.Inject;
import com.google.inject.name.Named;
-/*
+/**
* Test util for communication protocols.
- *
+ *
* <ul>
- * <li> The util defines Send and Receive Threads </li>
- * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
- * <li> ReceiveThread receives all/most of these messages </li>
- * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would
- * interrupt after a pre-defined but long enough interval </li>
+ * <li>The util defines Send and Receive Threads</li>
+ * <li>SendThread sends out a pre-defined number of messages to all the partitions</li>
+ * <li>ReceiveThread receives all/most of these messages</li>
+ * <li>To avoid the receiveThread waiting for ever, it spawns a TimerThread that would interrupt after a pre-defined but
+ * long enough interval</li>
* </ul>
*
*/
@@ -53,15 +53,23 @@ public PartitionInfo(Emitter emitter, Listener listener, @Named("emitter.send.in
}
public class SendThread extends Thread {
+
+ public SendThread() {
+ super("SendThread");
+ }
+
@Override
public void run() {
try {
for (int i = 0; i < numMessages; i++) {
for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
byte[] message = new String(partitionId + " " + i).getBytes();
- while (!emitter.send(partition, message)) {
+ if (!emitter.send(partition, message)) {
logger.debug("SendThread {}: Resending message to {}", partitionId, partition);
Thread.sleep(interval);
+ if (!emitter.send(partition, message)) {
+ throw new RuntimeException("failed to send message");
+ }
}
}
}
@@ -95,6 +103,7 @@ public void run() {
private Timer timer;
ReceiveThread() {
+ super("ReceiveThread");
receivedMessages = new int[emitter.getPartitionCount()];
for (int i = 0; i < receivedMessages.length; i++)
receivedMessages[i] = -1;
@@ -1,13 +1,8 @@
package org.apache.s4.core;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
import java.util.Arrays;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,16 +24,19 @@
* @param args
*/
public static void main(String[] args) {
-
- if (args.length == 0) {
- logger.info("Starting S4 node with default configuration");
- startDefaultS4Node();
- } else if (args.length == 1) {
- logger.info("Starting S4 node with custom configuration from file {}", args[0]);
- startCustomS4Node(args[0]);
- } else {
- logger.info("Starting S4 node in development mode");
- startDevelopmentMode(args);
+ try {
+ if (args.length == 0) {
+ logger.info("Starting S4 node with default configuration");
+ startDefaultS4Node();
+ } else if (args.length == 1) {
+ logger.info("Starting S4 node with custom configuration from file {}", args[0]);
+ startCustomS4Node(args[0]);
+ } else {
+ logger.info("Starting S4 node in development mode");
+ startDevelopmentMode(args);
+ }
+ } catch (Exception e) {
+ logger.error("S4 node failure", e);
}
}
@@ -7,3 +7,4 @@ cluster.name = s4-test-cluster
cluster.zk_address = localhost:21810
cluster.zk_session_timeout = 10000
cluster.zk_connection_timeout = 10000
+tcp.partition.queue_size=256
@@ -8,4 +8,5 @@ cluster.zk_session_timeout = 10000
cluster.zk_connection_timeout = 10000
comm.module = org.apache.s4.deploy.TestModule
s4.logger_level = TRACE
-appsDir=/tmp/deploy-test
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=256

0 comments on commit 0931075

Please sign in to comment.