Permalink
Browse files

Address review comments + provide more sender executors

- use a throttling sender by default
  • Loading branch information...
matthieumorel committed Jan 4, 2013
1 parent 6fe7ea8 commit 49dfe9bafeb72a94ff59c2302b522f44641a54a1
Showing with 259 additions and 87 deletions.
  1. +0 −1 build.gradle
  2. +3 −1 subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
  3. +10 −1 subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
  4. +1 −1 subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
  5. +0 −2 subprojects/s4-benchmarks/README.md
  6. +1 −1 subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
  7. +2 −2 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
  8. +7 −1 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
  9. +26 −0 ...rojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingDeserializerExecutorFactory.java
  10. +4 −2 subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
  11. +1 −1 ...rojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
  12. +3 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
  13. +6 −10 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
  14. +16 −2 subprojects/s4-comm/src/main/resources/default.s4.comm.properties
  15. +19 −24 subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
  16. +11 −18 subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
  17. +3 −1 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
  18. +5 −4 subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
  19. +1 −1 subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
  20. +6 −1 subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
  21. +11 −2 subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
  22. +0 −3 subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
  23. +2 −0 ...s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
  24. +2 −0 ...ojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
  25. +5 −1 ...ojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
  26. +20 −0 ...ore/src/main/java/org/apache/s4/core/staging/LoadSheddingRemoteSendersExecutorServiceFactory.java
  27. +62 −0 ...ts/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingSenderExecutorServiceFactory.java
  28. +8 −2 ...-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
  29. +9 −0 ...ects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
  30. +12 −2 subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
  31. +1 −1 subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
  32. +1 −1 subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
  33. +1 −1 subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
View
@@ -147,7 +147,6 @@ subprojects {
compile(libraries.reflectasm)
runtime(libraries.objenesis)
runtime(libraries.kryo)
- runtime(libraries.reflectasm)
runtime(libraries.netty)
runtime(libraries.asm)
compile(libraries.javax_inject)
@@ -34,8 +34,10 @@
* - message payload that needs to be sent
*
* @return - true - if message is sent across successfully - false - if send fails
+ * @throws InterruptedException
+ * if interrupted during blocking send operation
*/
- boolean send(int partitionId, ByteBuffer message);
+ boolean send(int partitionId, ByteBuffer message) throws InterruptedException;
int getPartitionCount();
@@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
+import com.google.common.primitives.Primitives;
/**
* The base event class in S4. The base class supports generic key/value pairs which us useful for rapid prototyping and
@@ -134,7 +135,15 @@ public void setAppId(int appId) {
Data<?> data = map.get(key);
- return (T) data.value;
+ try {
+ return (T) data.value;
+ } catch (ClassCastException e) {
+ if (!Primitives.wrap(type).isAssignableFrom(Primitives.wrap(data.type))) {
+ logger.error("Trying to get a value of type {} for an attribute of type {}.", type, data.type);
+ return null;
+ }
+ throw e;
+ }
}
/**
@@ -18,7 +18,7 @@
/**
* Defines some of the basic elements of the S4 platforms.
- *
+ *
*
*
*/
@@ -56,8 +56,6 @@ Exmample configuration files are available in `/config` and you can configure :
- It is also possible to limit the injection rate by using and configuring the InjectionLimiterModule class. Parameters are `s4.sender.maxRate` and `s4.sender.warmupPeriod`.
-The total number of events sent from an injector is `number of keys * number of test iterations * number of parallel injection threads`. Make sure this is significant in order to be able to correctly interpret the messaging rates (1000 would be too little for instance!).
-
The total number of events sent from an injector is `number of keys * number of test iterations * number of parallel injection threads`. Make sure this is significant in order to be able to correctly interpret the messaging rates (1000 would be too little for instance!).
By default in this example the size of a message is 188 bytes.
@@ -52,7 +52,7 @@ protected void onInit() {
public List<String> get(Event event) {
return ImmutableList.of(event.get("key"));
}
- }, simplePE1).setParallelism(1);
+ }, simplePE1).setParallelism(3);
SimplePE2 simplePE2 = createPE(SimplePE2.class, "simplePE2");
@@ -31,7 +31,7 @@
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
+import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
@@ -102,7 +102,7 @@ protected void configure() {
bind(RemoteEmitters.class);
- bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
+ bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
try {
Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
@@ -11,8 +11,14 @@
* There are many possible implementations, that may consider various factors, in particular:
* <ul>
* <li>parallelism
- * <li>memory usage
+ * <li>memory usage (directly measured, or inferred from the number of buffered events)
* <li>sharing threadpool among channel workers
+ * </ul>
+ * <p>
+ * When related thresholds are reached, deserializer executors may:
+ * <ul>
+ * <li>block: this indirectly blocks the reception of messages for this channel, applying upstream backpressure.
+ * <li>drop messages: a form of load shedding
*
*
*/
@@ -0,0 +1,26 @@
+package org.apache.s4.comm.staging;
+
+import java.util.concurrent.Executor;
+
+import org.apache.s4.comm.DeserializerExecutorFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Executors factory for the deserialization stage that blocks incoming tasks when the work queue is full.
+ *
+ */
+public class BlockingDeserializerExecutorFactory implements DeserializerExecutorFactory {
+
+ @Named("s4.listener.maxEventsPerDeserializer")
+ @Inject(optional = true)
+ protected int maxEventsPerDeserializer = 100000;
+
+ @Override
+ public Executor create() {
+ return new BlockingThreadPoolExecutorService(1, false, "deserializer-%d", maxEventsPerDeserializer, getClass()
+ .getClassLoader());
+ }
+
+}
@@ -18,6 +18,7 @@
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -101,8 +102,8 @@ protected ListeningExecutorService delegate() {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
- e.printStackTrace();
Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
}
ListenableFuture<T> future = super.submit(new CallableWithPermitRelease<T>(task));
return future;
@@ -114,6 +115,7 @@ protected ListeningExecutorService delegate() {
queueingPermits.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
}
ListenableFuture<T> future = super.submit(new RunnableWithPermitRelease(task), result);
return future;
@@ -125,6 +127,7 @@ protected ListeningExecutorService delegate() {
queueingPermits.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
}
ListenableFuture<?> future = super.submit(new RunnableWithPermitRelease(task));
return future;
@@ -135,7 +138,6 @@ public void execute(Runnable command) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
- e.printStackTrace();
Thread.currentThread().interrupt();
}
super.execute(new RunnableWithPermitRelease(command));
@@ -44,7 +44,7 @@
* Maximum number of threads in the pool
* @param fairParallelism
* If true, in case of contention, waiting threads will be scheduled in a first-in first-out manner. This
- * can be help ensure ordering, though there is an associated performance cost (typically small).
+ * can help ensure ordering, though there is an associated performance cost (typically small).
* @param threadName
* Naming scheme
* @param workQueueSize
@@ -47,6 +47,9 @@ public RemoteEmitter getEmitter(Cluster topology) {
emitter = emitters.putIfAbsent(topology, newEmitter);
if (emitter == null) {
emitter = newEmitter;
+ } else {
+ // use the existing emitter instead
+ newEmitter.close();
}
}
return emitter;
@@ -159,7 +159,7 @@ private void init() {
}
- private boolean connectTo(Integer partitionId) {
+ private boolean connectTo(Integer partitionId) throws InterruptedException {
ClusterNode clusterNode = partitionNodeMap.get(partitionId);
if (clusterNode == null) {
@@ -181,12 +181,12 @@ private boolean connectTo(Integer partitionId) {
} catch (InterruptedException ie) {
logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
clusterNode.getPort()));
- Thread.currentThread().interrupt();
+ throw ie;
}
return false;
}
- private void sendMessage(int partitionId, ByteBuffer message) {
+ private void sendMessage(int partitionId, ByteBuffer message) throws InterruptedException {
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message);
if (!partitionChannelMap.containsKey(partitionId)) {
@@ -197,12 +197,7 @@ private void sendMessage(int partitionId, ByteBuffer message) {
}
}
- try {
- writePermits.get(partitionId).acquire();
- } catch (InterruptedException e) {
- logger.error("Interrupted while acquiring permit", e);
- Thread.currentThread().interrupt();
- }
+ writePermits.get(partitionId).acquire();
Channel c = partitionChannelMap.get(partitionId);
if (c == null) {
@@ -214,7 +209,7 @@ private void sendMessage(int partitionId, ByteBuffer message) {
}
@Override
- public boolean send(int partitionId, ByteBuffer message) {
+ public boolean send(int partitionId, ByteBuffer message) throws InterruptedException {
// TODO a possible optimization would be to buffer messages per partition, with a small timeout. This will limit
// the number of writes and therefore system calls.
sendMessage(partitionId, message);
@@ -240,6 +235,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
@Override
public void close() {
try {
+ topology.removeListener(this);
channels.close().await();
bootstrap.releaseExternalResources();
} catch (InterruptedException ie) {
@@ -8,12 +8,26 @@ s4.cluster.zk_address = localhost:2181
s4.cluster.zk_session_timeout = 10000
s4.cluster.zk_connection_timeout = 10000
+
+# NOTE: the following numbers should be tuned according to the application, use case, and infrastructure
+
# how many threads to use for the sender stage (i.e. serialization)
s4.sender.parallelism=1
# maximum number of events in the buffer of the sender stage
-s4.sender.workQueueSize=100000
+s4.sender.workQueueSize=10000
+# maximum sending rate from a given node, in events / s (used with throttling sender executors)
+s4.sender.maxRate=200000
+
+# how many threads to use for the *remote* sender stage (i.e. serialization)
+s4.remoteSender.parallelism=1
+# maximum number of events in the buffer of the *remote* sender stage
+s4.remoteSender.workQueueSize=10000
+# maximum *remote* sending rate from a given node, in events / s (used with throttling *remote* sender executors)
+s4.remoteSender.maxRate=200000
+
# maximum number of pending writes to a given comm channel
s4.emitter.maxPendingWrites=1000
# maximum number of events in the buffer of the processing stage
-s4.stream.workQueueSize=100000
+s4.stream.workQueueSize=10000
+
@@ -22,29 +22,24 @@
public class TCPBasicTest extends ZkBasedTest {
@Test
- public void testSingleMessage() {
-
- try {
- Injector injector1 = Guice
- .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), "cluster1"), new TCPTransportModule(), new NoOpReceiverModule());
- Emitter emitter = injector1.getInstance(Emitter.class);
-
- Injector injector2 = Guice
- .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), "cluster1"), new TCPTransportModule(), new MockReceiverModule());
- // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
- // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
- injector2.getInstance(Listener.class);
-
- emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
-
- // check receiver got the message
- Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
+ public void testSingleMessage() throws Exception {
+
+ Injector injector1 = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new TCPTransportModule(), new NoOpReceiverModule());
+ Emitter emitter = injector1.getInstance(Emitter.class);
+
+ Injector injector2 = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new TCPTransportModule(), new MockReceiverModule());
+ // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+ // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
+ injector2.getInstance(Listener.class);
+
+ emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+
+ // check receiver got the message
+ Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+
}
}
@@ -22,28 +22,21 @@
public class UDPBasicTest extends ZkBasedTest {
@Test
- public void testSingleMessage() {
+ public void testSingleMessage() throws Exception {
- try {
- Injector injector1 = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
- new UDPTransportModule(), new NoOpReceiverModule());
- Emitter emitter = injector1.getInstance(Emitter.class);
+ Injector injector1 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
+ .openStream(), "cluster1"), new UDPTransportModule(), new NoOpReceiverModule());
+ Emitter emitter = injector1.getInstance(Emitter.class);
- Injector injector2 = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
- new UDPTransportModule(), new MockReceiverModule());
- // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
- // listener)
- injector2.getInstance(Listener.class);
+ Injector injector2 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
+ .openStream(), "cluster1"), new UDPTransportModule(), new MockReceiverModule());
+ // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+ // listener)
+ injector2.getInstance(Listener.class);
- emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+ emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
- Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
}
}
@@ -21,6 +21,8 @@
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
+import junit.framework.Assert;
+
import org.apache.s4.comm.tools.TaskSetup;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
@@ -51,7 +53,7 @@ public static void initClass() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught error in thread {}: {}", t.getName(), e);
-
+ Assert.fail("Uncaught error in thread " + t.getName() + " : " + e.getMessage());
}
});
}
Oops, something went wrong.

0 comments on commit 49dfe9b

Please sign in to comment.