Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

Commit

Permalink
Add tuning documentation and update style
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieumorel committed Feb 24, 2013
1 parent 755ed6b commit abc6a77
Show file tree
Hide file tree
Showing 15 changed files with 403 additions and 114 deletions.
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -7,7 +7,6 @@
target/
build/
bin/
doc/
tmp/
pepe/

Expand Down
3 changes: 3 additions & 0 deletions website/Rules
Expand Up @@ -27,6 +27,9 @@ compile '/doc/*' do
# don’t filter binary items
else
filter :kramdown
filter :colorize_syntax,
:default_colorizer => :pygmentize,
:pygmentize => { :linenos => 'inline', :options => { :startinline => 'True' } }
layout 'default'
end
end
Expand Down
1 change: 1 addition & 0 deletions website/compass/config.rb
Expand Up @@ -3,6 +3,7 @@
css_dir = "output/style"
sass_dir = "content/style"
images_dir = "output/images"
javascripts_dir = "assets/javascripts"

# when using SCSS:
sass_options = {
Expand Down
123 changes: 95 additions & 28 deletions website/content/doc/0.6.0/event_dispatch.md
Expand Up @@ -7,21 +7,26 @@ Events are dispatched according to their key.
The key is identified in an `Event` through a `KeyFinder`.

Dispatch can be configured for:

* dispatching events to partitions (_outgoing dispatch_)
* dispatching external events within a partition (_incoming dispatch_)

# Outgoing dispatch

A stream can be defined with a KeyFinder, as :

Stream<TopicEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicEvent>() {
~~~
@Override
public List<String> get(final TopicEvent arg0) {
return ImmutableList.of(arg0.getTopic());
}
}, topicCountAndReportPE);
#!java
Stream<TopicEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicEvent>() {
@Override
public List<String> get(final TopicEvent arg0) {
return ImmutableList.of(arg0.getTopic());
}
}, topicCountAndReportPE);
~~~

When an event is sent to the "TopicSeen" stream, its key will be identified through the KeyFinder implementation, hashed and dispatched to the matching partition.

Expand All @@ -30,48 +35,110 @@ The same logic applies when defining _output streams_.

If we use an AdapterApp subclass, the `remoteStreamKeyFinder` should be defined in the `onInit()` method, _before_ calling `super.onInit()`:

@Override
protected void onInit() {
...
remoteStreamKeyFinder = new KeyFinder<Event>() {

@Override
public List<String> get(Event event) {
return ImmutableList.of(event.get("theKeyField"));
}
};
super.onInit()
...
~~~
#!java
@Override
protected void onInit() {
...
remoteStreamKeyFinder = new KeyFinder<Event>() {
@Override
public List<String> get(Event event) {
return ImmutableList.of(event.get("theKeyField"));
}
};
super.onInit()
...
~~~

If we use a standard App, we use the `createOutputStream(String name, KeyFinder<Event> keyFinder)` method.


bq. If the KeyFinder is not defined for the output streams, events are sent to partitions of the connected cluster in a round robin fashion.
> If the KeyFinder is not defined for the output streams, events are sent to partitions of the connected cluster in a round robin fashion.

# Incoming dispatch from external events

When receiving events from a remote application, we _must_ define how external events are dispatched internally, to which PEs and based on which keys. For that purpose, we simply define and _input stream_ with the corresponding KeyFinder:

createInputStream("names", new KeyFinder<Event>() {
~~~
#!java
@Override
public List<String> get(Event event) {
return Arrays.asList(new String[] { event.get("name") });
}
}, helloPE);
createInputStream("names", new KeyFinder<Event>() {
@Override
public List<String> get(Event event) {
return Arrays.asList(new String[] { event.get("name") });
}
}, helloPE);
~~~

In this case, a name is extracted from each event, the PE instance with this key is retrieved or created, and the event sent to that instance.


Alternatively, we can use a unique PE instance for processing events in a given node. For that we simply define the input stream without a KeyFinder, _and_ use a singleton PE:

HelloPE helloPE = createPE(HelloPE.class);
helloPE.setSingleton(true);
createInputStream("names", helloPE);
~~~
#!java
HelloPE helloPE = createPE(HelloPE.class);
helloPE.setSingleton(true);
createInputStream("names", helloPE);
~~~

In this case, all events will be dispatched to the only HelloPE instance in this partition, regardless of the content of the event.


# Internals and tuning

S4 follows a staged event driven architecture and uses a pipeline of executors to process messages.

## executors
An executor is an object that executes tasks. It usually keeps a bounded queue of task items and schedules their execution through a pool of threads.

When processing queues are full, executors may adopt various possible behaviours, in particular, in S4:
* **blocking**: the current thread simply waits until the queue is not full
* **shedding**: the current event is dropped

**Throttling**, i.e. placing an upper bound on the maximum processing rate, is a convenient way to avoid sending too many messages too fast.

S4 provides various default implementations of these behaviours and you can also define your own custom executors as appropriate.

## workflow

The following picture illustrates the pipeline of executors.

![image](/images/doc/0.6.0/executors.png)

### When a node receives a message:

1. data is received on a socket and chunked into a message, in the form of an array of bytes
1. the message is passed to a deserializer executor
* this executor is loaded with the application, and therefore has access to application classes, so that application specific messages can be deserialized
* by default it uses 1 thread and **blocks** if the processing queue is full
1. the event (deserialized message) is dispatched to a stream executor
* the stream executor is selected according to the stream information contained in the event
* by default it **blocks** if the processing queue is full
1. the event is processed in the PE instance that matches the key of the event

### When a PE emits a message:

1. an event is passed to a referenced stream
1. if the target cluster is remote, the event is passed to a remote sender executor
1. otherwise,
* if the target partition is the current one, the event is directly passed to the corresponding stream executor (see step 3 above)
* otherwise, the event is passed to a local sender executor
1. remote or local sender executors serialize the event into an array of bytes
* remote sender executors are **blocking** by default, if their processing queue is full
* local sender executors are **throttling** by default, with a configurable maximum rate. If events arrive at a higher rate, they are **dropped**.


## configuration parameters

* blocking executors can lead to deadlocks, depending on the application graph
* back pressure can be taken into account when using TCP: if downstream systems saturate, messages cannot be sent downstream, and sending queues fill up. With knowledge of the application, is is possible to add some mechanisms to react appropriately
* executors can be replaced by other implementations in custom modules, by overriding the appropriate bindings (see `DefaultCommModule` and `DefaultCodeModule`)
* the maximum number of threads to use to process a given stream can be specified **in the application**, using the `setParallelism()` method of `Stream`
* default parameters are specified in `default.s4.comm.properties`


24 changes: 14 additions & 10 deletions website/content/doc/0.6.0/fault_tolerance.md
Expand Up @@ -4,7 +4,7 @@ title: Fault tolerance
Stream processing applications are typically long running applications, and they may accumulate state over extended periods of time.


Running a distributed system over a long period of time implies there will be:
When running a distributed system over a long period of time, expect:


- failures
Expand Down Expand Up @@ -138,21 +138,25 @@ We provide a default module (FileSystemBackendCheckpointingModule) that uses a f

##### Customizing the checkpointing backend

It is quite straightforward to implement backends for other kinds of storage (key value stores, datagrid, cache, RDBMS). Using an alternative backend is as simple as providing a new module to the S4 node. Here is an example of a module using a 'Cool' backend implementation:
It is quite straightforward to implement backends for other kinds of storage (key value stores, datagrid, cache, RDBMS). Writing a checkpointing backend consists of implementing a simple interface (`StateStorage`) matching your infrastructure or system.

Using an alternative backend is as simple as providing a new module to the S4 node. Here is an example of a module using a 'Cool' backend implementation:

public class CoolBackendCheckpointingModule extends AbstractModule {
@Override
protected void configure() {
bind(StateStorage.class).to(CoolStateStorage.class);
bind(CheckpointingFramework.class).to(SafeKeeper.class);
}
}
~~~
#!java
public class CoolBackendCheckpointingModule extends AbstractModule {
@Override
protected void configure() {
bind(StateStorage.class).to(CoolStateStorage.class);
bind(CheckpointingFramework.class).to(SafeKeeper.class);
}
}
~~~

##### Overriding checkpointing and recovery operations

By default, S4 uses [kryo](http://code.google.com/p/kryo) to serialize and deserialize checkpoints, but it is possible to use a different mechanism, by overriding the `checkpoint()`, `serializeState()` and `restoreState()` methods of the `ProcessingElement` class.
By default, S4 keeps all non transient fields as part of the state, and uses [kryo](http://code.google.com/p/kryo) to serialize and deserialize checkpoints, but it is possible to use a different mechanism, by overriding the `checkpoint()`, `serializeState()` and `restoreState()` methods of the `ProcessingElement` class.


PEs are eligible for checkpointing when their state is 'dirty'. The dirty flag is checked through the `isDirty()` method, and cleared by calling the `clearDirty()` method. In some cases, dependent on the application code, only some of the events may actually change the state of the PE. You should override these methods in order to avoid unjustified checkpointing operations.
Expand Down
32 changes: 14 additions & 18 deletions website/content/doc/0.6.0/overview.md
Expand Up @@ -13,40 +13,40 @@ S4 0.5 focused on providing a functional complete refactoring.

S4 0.6 builds on this basis and brings plenty of exciting features, in particular:

* *performance improvements*: stream throughput improved by 1000 % (~200k messages / s / stream)
* **performance improvements**: stream throughput improved by 1000 % (~200k messages / s / stream)
* improved [configurability](S4:Configuration - 0.6.0], for both the S4 platform and deployed applications
* *elasticity* and fine partition tuning, through an integration with Apache Helix
* **elasticity** and fine partition tuning, through an integration with Apache Helix


# What are the cool features?

**Flexible deployment**:

* By default keys are homogeneously sparsed over the cluster: helps balance the load, especially for fine grained partitioning
* S4 also provides fine control over the partitioning
* S4 also provides fine control over the partitioning (with Apache Helix)
* Features automatic rebalancing

**Modular design**:

* both the platform and the applications are built by dependency injection, and configured through independent modules.
* makes it easy to customize the system according to specific requirements
* pluggable event serving policies: load shedding, throttling, blocking
* makes it **easy to customize** the system according to specific requirements
* pluggable event serving policies: **load shedding, throttling, blocking**

**Dynamic and loose coupling of S4 applications**:

* through a pub-sub mechanism
* makes it easy to:
** assemble subsystems into larger systems
** reuse applications
** separate pre-processing
** provision, control and update subsystems independently
* assemble subsystems into larger systems
* reuse applications
* separate pre-processing
* provision, control and update subsystems independently


**[Fault tolerant](fault_tolerance)**


* *Fail-over* mechanism for high availability
* *Checkpointing and recovery* mechanism for minimizing state loss
* **Fail-over** mechanism for high availability
* **Checkpointing and recovery** mechanism for minimizing state loss

**Pure Java**: statically typed, easy to understand, to refactor, and to extend

Expand All @@ -68,19 +68,15 @@ S4 0.6 builds on this basis and brings plenty of exciting features, in particula
**Applications**



* Users develop applications and deploy them on S4 clusters
* Applications are built from:
** *Processing elements* (PEs)
** *Streams* that interconnect PEs

* PEs communicate asynchronously by sending *events* on streams.
* **Processing elements** (PEs)
* **Streams** that interconnect PEs
* PEs communicate asynchronously by sending **events** on streams.
* Events are dispatched to nodes according to their key

**External streams** are a special kind of stream that:



* send events outside of the application
* receive events from external sources
* can interconnect and assemble applications into larger systems.
Expand Down

0 comments on commit abc6a77

Please sign in to comment.