Permalink
Browse files

Performance improvements and configurability

- clearly identify asynchronous stages and use configurable and injectable executors
for each of them (deserialization, processing, serialization)
- default executors for processing/sending use throttling
- stream processing now parallelizable, per stream
- Sender and Receiver are now interfaces, part of the base API
- SenderImpl and ReceiverImpl are default implementations in s4-core
- simplified regression tests for communication protocols
  • Loading branch information...
1 parent f04bd09 commit f9689ea0055e1d7a2a8459b8ee380283767f0ac8 @matthieumorel matthieumorel committed Nov 12, 2012
Showing with 1,158 additions and 1,200 deletions.
  1. +6 −4 build.gradle
  2. BIN lib/reflectasm-1.07-shaded.jar
  3. +4 −11 subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
  4. +24 −0 subprojects/s4-base/src/main/java/org/apache/s4/base/Receiver.java
  5. +32 −0 subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
  6. +3 −0 subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
  7. +19 −10 subprojects/s4-benchmarks/README.md
  8. +38 −7 subprojects/s4-benchmarks/bench-cluster.sh
  9. +1 −1 subprojects/s4-benchmarks/config/injector.config
  10. +0 −1 subprojects/s4-benchmarks/config/node.config
  11. +2 −76 subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
  12. +14 −9 subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
  13. +4 −66 subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
  14. +30 −0 subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java
  15. +19 −0 subprojects/s4-benchmarks/startInjector.sh
  16. +5 −9 subprojects/s4-benchmarks/startNode.sh
  17. +1 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
  18. +34 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java
  19. +13 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
  20. +185 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java
  21. +1 −2 subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
  22. +0 −1 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
  23. +15 −28 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
  24. +12 −8 subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
  25. +8 −1 subprojects/s4-comm/src/main/resources/default.s4.comm.properties
  26. +0 −162 subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
  27. +0 −34 subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
  28. +0 −34 subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
  29. +49 −0 subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
  30. +0 −68 subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
  31. +2 −1 subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
  32. +2 −1 subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
  33. +0 −44 subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
  34. +0 −27 subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
  35. +0 −27 subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
  36. +48 −0 subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
  37. +0 −79 subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
  38. +0 −194 subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
  39. +0 −116 subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
  40. +3 −0 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
  41. +38 −0 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java
  42. +27 −0 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
  43. +22 −0 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java
  44. +27 −0 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
  45. +7 −0 subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
  46. +1 −0 subprojects/s4-core/s4-core.gradle
  47. +11 −2 subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
  48. +17 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
  49. +2 −3 subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
  50. +30 −38 subprojects/s4-core/src/main/java/org/apache/s4/core/{Receiver.java → ReceiverImpl.java}
  51. +31 −7 subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
  52. +66 −26 subprojects/s4-core/src/main/java/org/apache/s4/core/{Sender.java → SenderImpl.java}
  53. +116 −88 subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
  54. +2 −1 subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
  55. +20 −0 .../s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
  56. +34 −0 ...rojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
  57. +37 −0 ...-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
  58. +12 −0 ...rojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
  59. +13 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
  60. +29 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
  61. +15 −7 subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
  62. +1 −0 subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
  63. +0 −1 subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
  64. +21 −2 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
  65. +5 −4 subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
View
@@ -58,10 +58,11 @@ project.ext["libraries"] = [
guice: 'com.google.inject:guice:3.0',
aop_alliance: 'aopalliance:aopalliance:1.0',
guice_assist: 'com.google.inject.extensions:guice-assistedinject:3.0',
- kryo: 'com.esotericsoftware.kryo:kryo:2.17',
+ kryo: 'com.esotericsoftware.kryo:kryo:2.20',
minlog: 'com.googlecode:minlog:1.2',
- reflectasm: 'com.esotericsoftware.reflectasm:reflectasm:1.07',
- netty: 'org.jboss.netty:netty:3.2.5.Final',
+ // NOTE shaded jar is not resolved correctly, we include it in /lib directory
+ reflectasm: 'com.esotericsoftware.reflectasm:reflectasm:1.07-shaded',
+ netty: 'io.netty:netty:3.5.9.Final',
mockito_core: 'org.mockito:mockito-core:1.9.0',
commons_config: 'commons-configuration:commons-configuration:1.6',
commons_codec: 'commons-codec:commons-codec:1.4',
@@ -145,9 +146,10 @@ subprojects {
compile(libraries.reflectasm)
runtime(libraries.objenesis)
runtime(libraries.kryo)
+ runtime(libraries.reflectasm)
runtime(libraries.netty)
runtime(libraries.asm)
- runtime(libraries.javax_inject)
+ compile(libraries.javax_inject)
runtime(libraries.commons_codec)
}
Binary file not shown.
@@ -18,25 +18,18 @@
package org.apache.s4.base;
-import java.nio.ByteBuffer;
-
/**
*
- * Get a byte array received by a lower level layer.
+ * Defines the communication level entry point for serialized events. The implementation is expected to open a server
+ * socket on the node's listening port, receive messages through this channel, and delegate to the application layer
+ * through the {@link Receiver} interface.
*
*/
public interface Listener {
/**
- * Perform blocking receive on the appropriate communication channel
- *
- * @return <ul>
- * <li>byte[] message returned by the channel</li>
- * <li>null if the associated blocking thread is interrupted</li>
- * </ul>
+ * Returns the id of the partition currently assigned to this node.
*/
- ByteBuffer recv();
-
public int getPartitionId();
void close();
@@ -0,0 +1,24 @@
+package org.apache.s4.base;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Defines the entry point from the communication layer to the application layer.
+ *
+ * Events received as raw bytes through the {@link Listener} implementation use the {@link Receiver#receive(ByteBuffer)}
+ * method so that events can be deserialized (conversion from byte[] to Event objects) and enqueued for processing.
+ *
+ */
+public interface Receiver {
+
+ /**
+ * Handle a serialized message, i.e. deserialize the message and pass it to event processors.
+ */
+ void receive(ByteBuffer message);
+
+ /**
+ * Returns the partition id currently assigned to this node.
+ */
+ int getPartitionId();
+
+}
@@ -0,0 +1,32 @@
+package org.apache.s4.base;
+
+/**
+ * Defines the exit point from the event processing layer to the communication layer. The implementation is responsible
+ * for serializing events and passing serialized data to the communication layer.
+ *
+ */
+public interface Sender {
+
+ /**
+ * This method attempts to send an event to a remote partition. If the destination is local, the method does not
+ * send the event and returns false. <b>The caller is then expected to put the event in a local queue instead.</b>
+ *
+ * @param hashKey
+ * the string used to map the value of a key to a specific partition.
+ * @param event
+ * the event to be delivered to a Processing Element instance.
+ * @return true if the event was sent because the destination is <b>not</b> local.
+ *
+ */
+ boolean checkAndSendIfNotLocal(String hashKey, Event event);
+
+ /**
+ * Send an event to all the remote partitions in the cluster. The caller is expected to also put the event in a
+ * local queue.
+ *
+ * @param event
+ * the event to be delivered to Processing Element instances.
+ */
+ void sendToAllRemotePartitions(Event event);
+
+}
@@ -18,5 +18,8 @@
/**
* Defines some of the basic elements of the S4 platforms.
+ *
+ *
+ *
*/
package org.apache.s4.base;
@@ -9,19 +9,19 @@ That said, let's look at what the benchmarking framework does and how to use it.
## Measurements
-The benchmarking framework consists of a multithreaded injector and an application. App nodes and injector are launched directly, there is no deployment step. This allows to skip the packaging and deployment steps and allows to easily add profiling parameters, but requires a source distribution and a shared file system.
+The benchmarking framework consists of a multithreaded injector and an application. App nodes and injector are launched directly, there is no deployment step. This allows to skip the packaging and deployment steps and to easily add profiling parameters, but requires a source distribution and a shared file system.
-The simplest application does nothing but count incoming keyed messages, on a single stream, but other simple application can be easily added. For instance, with multiple streams, and communicating PEs.
+The simplest application does nothing but count incoming keyed messages, but other simple application can be easily added, in particular, involving multiple communicating PEs. There are 2 input streams available: `inputStream` and `inputStream2`.
-The injector sends basic keyed messages. The outputstream of the injector uses a keyfinder to partition the events across the application nodes.
+The injector sends basic keyed messages to a given named stream. The outputstream of the injector uses a keyfinder to partition the events across the application nodes.
We get metrics from the probes across the codebase, in particular:
- the rate of events sent per second (in the injector)
- the rate of events received per second (in the app nodes)
-Metrics from the platform code are computed with weighted moving averages.
+Metrics from the platform code are computed with weighted moving averages. It is recommended to let the application run for a few minutes and observe the metrics from the last minute.
-Profiling options can easily be added to the injector or app nodes in order to identify hotspots.
+Profiling options (e.g. YourKit) can easily be added to the injector or app nodes in order to identify hotspots.
## Parameters
@@ -38,9 +38,9 @@ Input parameters are:
Exmample configuration files are available in `/config` and you can configure :
- the number of keys
-- the number of warmup iterations
- the number of test iterations
- the number of parallel injection threads
+- the number of threads for the sender stage
- etc…
By default in this example the size of a message is 188 bytes.
@@ -53,15 +53,24 @@ Running 2 S4 nodes on the local machine:
For a distributed setup, you should modify the host names in the above command line, and specify the correct Zookeeper connection string in `node.config`.
+Here is an example for driving the test on a cluster:
+`incubator-s4/subprojects/s4-benchmarks/bench-cluster.sh "processingHost1 processingHost2 processingHost3" "injectorConfigStream1.cfg injectorConfigStream2.cfg" node.cfg 2 "injectorHost1 injectorHost2 injectorHost3 injectorHost4"` (the `2` controls the number of injectors per stream per injector host)
+
## Results
-When the benchmark finishes, results are available in `measurements/injectors` for the injection rates and in `measurements/node[0-n]` for other statistics.
+When the benchmark finishes (and even during the execution), results are available in `measurements/injectors` for the injection rates and in `measurements/node[0-n]` for other statistics.
+
+Results are also available from the console output for each of the nodes.
Most statistics files come from the probes of the platform and some of them use weighted moving averages. These are good for long running applications. For the benchmarks we also show instant rates, which are available in `injection-rate.csv` and `simplePE1.csv` files.
## Notes
-In the current design of S4, messages sent to output streams are not queued by S4 and directly passed to the communication layer.
-
-This implies that if the communication layer is not able to send those messages at least as fast as they are generated, the buffer of pending messages will increase rapidly. This may lead to memory problems in the injector. Solving the problem requires tuning the number of parallel injection threads.
+There are a lot of knobs for optimally configuring the stages, and the optimal settings will also depend upon:
+- the hardware
+- the network
+- the operating system (scheduling, networking)
+- the JVM implementation and tuning parameters
+- the application
+- the skewness and diversity of the data (there a maximum of 1 event executing in a given PE instance (i.e. keyed))
@@ -2,16 +2,21 @@
HOSTS=$1
-INJECTOR_CONFIG=$2
+INJECTOR_CONFIGS=$2 # 1 injector injects to 1 stream. Use more injector configs for injecting to more streams
NODE_CONFIG=$3
-NB_INJECTORS=$4
+NB_INJECTORS_PER_NODE=$4
+INJECTOR_NODES=$5
BENCH_ROOTDIR=`pwd`
echo "hosts = $HOSTS"
echo "injector config file = $INJECTOR_CONFIG"
echo "node config file = $NODE_CONFIG"
echo "bench root dir = $BENCH_ROOTDIR"
+#########################################################
+#### cleanup files and processes, and build platform
+#########################################################
+
killall -9 java
cd $BENCH_ROOTDIR
@@ -25,8 +30,18 @@ NB_NODES=0
for host in $HOSTS
do
((NB_NODES++))
+ ssh $host "killall -9 java"
+done
+
+NB_INJECTORS=0
+for injectorNode in $INJECTOR_NODES ; do
+ for INJECTOR_CONFIG in $INJECTOR_CONFIGS ; do
+ NB_INJECTORS=$(($NB_INJECTORS + $NB_INJECTORS_PER_NODE))
+ done
+ ssh $injectorNode "killall -9 java"
done
+# must run from where ZooKeeper server is running (as specified in injector config file)
(cd $BENCH_ROOTDIR/../../ && ./s4 zkServer -clusters=c=testCluster1:flp=12000:nbTasks=$NB_INJECTORS,c=testCluster2:flp=13000:nbTasks=$NB_NODES &)
@@ -40,22 +55,38 @@ mkdir $BENCH
echo "nb nodes = $NB_NODES\n" > $BENCH/benchConf.txt
echo "hosts = $HOSTS" >> $BENCH/benchConf.txt
echo "injector config ">> $BENCH/benchConf.txt
-cat $INJECTOR_CONFIG >> $BENCH/benchConf.txt
+for INJECTOR_CONFIG in $INJECTOR_CONFIGS ; do
+ cat $INJECTOR_CONFIG >> $BENCH/benchConf.txt
+done
+
+#########################################################
+#### start S4 nodes
+#########################################################
+
+i=0
for host in $HOSTS
do
+ ((i++))
if [ $host == "localhost" ] || [ $host == "127.0.0.1" ] ; then
$BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG "localhost" > $BENCH_DIR/output_$i.log 2>$BENCH_DIR/s4err_$i.err < /dev/null &
else
- ssh $host "$BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG $host > $BENCH_DIR/output_$host.log 2>$BENCH_DIR/s4err_$host .err < /dev/null &"
+ ssh $host "$BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG $host > $BENCH_DIR/output_$host-$i.log 2>$BENCH_DIR/s4err_$host-$i.err < /dev/null &"
fi
done
sleep 15
-for ((i = 1; i <= $NB_INJECTORS; i++)); do
- java -Xmx200m -Xms200m -verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -cp `cat classpath.txt` org.apache.s4.core.Main "@$INJECTOR_CONFIG" &
-done
+PROFILING_OPTS=""
+#########################################################
+#### start injectors
+#########################################################
+
+for INJECTOR_NODE in $INJECTOR_NODES ; do
+ for INJECTOR_CONFIG in $INJECTOR_CONFIGS ; do
+ ssh $INJECTOR_NODE "cd $BENCH_ROOTDIR ; $BENCH_ROOTDIR/startInjector.sh $NB_INJECTORS_PER_NODE $INJECTOR_CONFIG $ZK_SERVER > $BENCH_DIR/out.injector_$INJECTOR_NODE.log 2>$BENCH_DIR/err.injector_$INJECTOR_NODE.log < /dev/null &"
+ done
+done
@@ -1,3 +1,3 @@
-c=testCluster1
-appClass=org.apache.s4.benchmark.simpleApp1.Injector
--p=s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=2,s4.benchmark.warmupIterations=100000,s4.benchmark.testIterations=1000000,s4.benchmark.testSleepInterval=0,s4.benchmark.warmupSleepInterval=0,s4.benchmark.injector.parallelism=2
+-p=s4.sender.parallelism=4,s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=100000,s4.benchmark.testIterations=1000000,s4.benchmark.testSleepInterval=0,s4.benchmark.injector.parallelism=2
@@ -1,4 +1,3 @@
-c=testCluster2
-appClass=org.apache.s4.benchmark.simpleApp1.SimpleApp
--p=s4.adapter.output.stream=inputStream
-zk=127.0.0.1:2181
Oops, something went wrong.

0 comments on commit f9689ea

Please sign in to comment.