Integration with Helix
Goal is to provide better partition management, fault tolerance and automatic rebalancing during cluster expansion.
Limitation in S4
- Tasks are always partitioned based on the number of nodes
- If the stream is already partitioned outside of s4 or with a different partitioning factor, then the stream needs to be re-partitioned which results in additional hop. In cases where multiple streams that are already partitioned outside of S4 but needs to be joined in S4, it requires re-hashing both the streams.
- Adding new nodes to S4 cluster cause the number of partitions to change. This results in lot of data shuffling. For example if there are 4 nodes and you add another node then nearly all the keys need to be remapped where as ideally only 20% of the data should move.
- Fault tolerance is achieved by having standby nodes which become active when a node fails. This results in inefficient use of hardware resources.
Advantages of integrating with Apache Helix,
- Allows one to partition the task processing differently for each stream, which provides better load balancing.
- Uses consistent hashing to map partitions to nodes.
- When new nodes are added, partitions can be migrated to new nodes without changing the number of partitions and minimizes the data movement.
- All Nodes in the cluster are active and when a node fails, its load gets redistributed among the remaining active nodes
This is still in prototype mode.
This will install the helix jars into local repo
git clone git://github.com/apache/incubator-helix.git ./build or mvn clean install -Dmaven.test.exec.skip=true
Checkout the S4 integration with Helix code
git clone git://github.com/kishoreg/incubator-s4.git
./gradlew install ./gradlew s4-tools:installApp
Create the cluster, -nbNodes is just the number of s4 nodes that will be run. This will create two nodes localhost_12000 and localhost_12001
./s4 newCluster -c=cluster1 -nbNodes=2 -flp=12000
Create a task that processes events from stream(names). -id can be anything but should be unique within a cluster, for now id and stream name needs to be the same. p is the number of partitions, so in this case it distributes 6 partitions among two nodes. -r is the number of replica/standby needed for each partition. Note that, when a node fails its load would be distributed among remaining nodes. So even though theoretically its possible to have number of standby's as the number of nodes, the performance would be horrible. In general this can be decided based on the head room available in the cluster.
./s4 createTask -c=cluster1 -id=names -t=consumer -p=6 -r=1 -s=names
Generate a HelloWorld App
./s4 newApp myApp -parentDir=/tmp cd /tmp/myApp ./s4 s4r -a=hello.HelloApp -b=/tmp/myApp/build.gradle myApp
Deploy the App by providing the s4r. One can optionally provide the list of nodes where this App has to be deployed.
./s4 deployApp -c=cluster1 -s4r=/tmp/myApp/build/libs/myApp.s4r -appName=myApp -zk=localhost:2181
Start the two s4 nodes in two separate windows. Note we now need to specify the node id while starting. This allows nodes to associate with same partitions when they are re-started.
./s4 node -c=cluster1 -zk=localhost:2181 -id=localhost_12000 ./s4 node -c=cluster1 -zk=localhost:2181 -id=localhost_12001
Send some events to names stream. Notice that the partitions are divided among two nodes and each event is routed to appropriate node.
./s4 genericAdapter -c=cluster1 -s=names
Run the status tool to view the cluster state. It provide which nodes are up, what Apps are deployed, metadata about tasks like what stream is it processing how many partitions, which node is leader for each partition etc
./s4 status -c=cluster1
Add new nodes, deploy the app to new nodes and re-distribute the task amongst all nodes
./s4 addNodes -c=cluster1 -nbNodes=1 -flp=12002 ./s4 deployApp -c=cluster1 -s4r=/tmp/myApp/build/libs/myApp.s4r -appName=myApp -zk=localhost:2181 ./s4 rebalanceTask -c=cluster1 -id=names
The partitions get re-distributed among 3 nodes. Run the status tool, it should show the new nodes and partition status.
./s4 status -c=cluster1
S4 is a general-purpose, distributed, scalable, partially fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.
S4 0.5.0 is a complete refactoring of the previous version of S4. It grounds on the same concepts (partitioning inspired by map-reduce, actors-like distribution model), but with the following objectives:
- cleaner and simpler API
- robust configuration through statically defined modules
- cleaner architecture
- robust codebase
- easier to develop S4 apps, to test, and to use the platform
We added the following core features:
- TCP-based communications
- state recovery through a flexible checkpointing mechanism
- inter-cluster/app communications through a pub-sub model
- dynamic application deployment
- toolset for easily starting S4 nodes, testing, packaging, deploying and monitoring S4 apps
For the latest information about S4, please visit our website at:
and our wiki, at:
Currently the wiki contains the most relevant and up-to-date documentation.
Source code is available here: https://git-wip-us.apache.org/repos/asf?p=incubator-s4.git
- JDK 6 or higher
- *nix or macosx (you may build the project and develop S4 applications with microsoft windows though, the only limitation is that the "s4" script has not been ported to windows yet)
How to build
This only applies if you checkout from the source repository or if you download a released source package.
We use gradle http://gradle.org as the build system.
- From the root directory of the S4 project:
This will build the packages and install the artifacts in the local maven repository.
- Then, build the tools:
This will build the tools so that you can drive the platform through the "s4" command.
If you have a source package:
- root directory contains build and utility scripts, as well as informative files
- config/ directory contains configuration files for source code formatting
- doc/ directory contains the javadoc
- gradle/ directory contains libraries used by the gradle build tool
- lib/ directory contains some other gradle libraries
- subproject/ directory contains the plaftorm subprojects: base, comm, core, tools, as well as example (example is not fully ported to 0.5.0)
- test-apps/ directory contains some examples (some of them very trivial are used in regression tests)
If you have a binary package:
- root directory contains the s4 utility script as well as informative files
- doc/ directory contains the javadoc
- lib/ directory contains :
- the platform libraries (prefixed with "s4")
- the dependencies
- bin/ directory contains some scripts that are used by the s4 script
- gradle/ directory contains libraries used for building S4 projects