Flusswerk makes it easy to create multi threaded workers for read/transform/write chains (aka ETL jobs). Workflows are coordinated via RabbitMQ, so it's easy to create chains of independent workflow jobs (each a new Java application).
Maven:
<dependency>
<groupId>com.github.dbmdz.flusswerk</groupId>
<artifactId>framework</artifactId>
<version>5.0.0</version>
</dependency>
Gradle:
dependencies {
compile group: 'com.github.dbmdz.flusswerk', name: 'flusswerk', version: '5.0.0'
}
To get started, clone or copy the Flusswerk Example application.
- Flusswerk connections are now shown in RabbitMQ management UI
- structured logging now automatically contains fields
duration
andduration_ms
that report the time your app spent on processing a certain message in seconds or milliseconds. - The deprecated
tracing_id
has been removed - The deprecated
Envelope.timestamp
has been removed in favor ofEnvelope.created
which is now of typeInstant
. - Constructor of FlusswerkApplication now takes
Optional<Engine>
as an argument (call assuper(Optional.of(engine));
). - Options for centralized locking using Redis have been removed
A typical Flusswerk application has three parts:
- Messages
- the data processing flow (usually a
Reader
, aTransformer
and aWriter
) - Some Spring Boot glue code
- Spring Boot
application.yml
to configure all the Flusswerk things
Usually it is also useful to define your own data model classes, although that is not strictly required.
Other optional parts are
- Custom metrics collection
- Custom logging formats (aka
ProcessReport
)
Message classes are for a sending and receiving data from RabbitMQ. All Message classes extend Message, which automatically forwards tracing ids from incoming to outgoing messages (if you set the tracing id by hand, it will not be overwritten).
class IndexMessage implements Message {
private String documentId;
public IndexMessage(String documentId) {
this.documentId = requireNonNull(documentId);
}
public String getId() { ... }
public boolean equals(Object other) { ... }
public int hashCode() { ... }
}
Register the type for the incoming message, so it gets automatically deserialized:
@Bean
public IncomingMessageType incomingMessageType() {
return new IncomingMessageType(IndexMessage.class);
}
All configuration magic happens in Spring's application.yml
.
A minimal configuration might look like:
# Default spring profile is local
spring:
application:
name: flusswerk-example
flusswerk:
routing:
incoming:
- search.index
outgoing:
default: search.publish
This defaults to connecting to RabbitMQ localhost:5672
, with user and password
guest
, five threads and retrying a message five times. The only outgoing route
defined is default, which is used by Flusswerk to automatically send messages.
For most applications these are sensible defaults and works out of the box for
local testing.
The connection information can be overwritten for different environments using Spring Boot profiles:
---
spring:
profiles: production
flusswerk:
rabbitmq:
hosts:
- rabbitmq.stg
username: secret
password: secret
The sections of the Flusswerk
configuration
processing
- control of processing
property | default | |
---|---|---|
threads |
5 | Number of threads to use for parallel processing |
rabbitmq
- Connection to RabbitMQ:
property | default | |
---|---|---|
hosts |
localhost |
list of hosts to connect to |
username |
guest |
RabbitMQ username |
passwords |
guest |
RabbitMQ password |
routing
- Messages in and out
property | default | |
---|---|---|
incoming |
– |
list of queues to read from in order |
outgoing |
– |
routes to send messages to (format 'name: topic') |
exchange |
flusswerk_default |
default exchange for all queues |
dead letter exchange |
<exchange> + ".retry" |
default dead letter exchange for all queues |
exchanges |
- |
queue: exchange name to override default exchanges |
dead letter exchanges |
<exchange> + ".retry" |
queue: exchange name to override default dead letter exchanges |
failure policies |
default |
how to handle messages with processing errors |
routing.failure policies
- how to handle messages with processing errors
property | default | |
---|---|---|
retries |
5 |
how many times to retry |
retryRoutingKey |
– |
where to send messages to retry later (dead lettering) |
failedRoutingKey |
– |
where to send messages to that should not be processed again |
backoff |
– |
how long to wait until retrying a message |
The queue that should get a custom failure policy is a YAML key here. Please note that the name not only gets processed by YAML, but also by Spring and therefore needs to be quoted and inside square brackets - otherwise Spring Boot will silently ignore it:
flusswerk:
routing:
failurePolicies:
"[search.index]":
retries: 3
backoff: '1s'
monitoring
- Prometheus settings
property | default | |
---|---|---|
prefix |
flusswerk |
prefix for prometheus metrics |
To set up your data processing flow, define a Spring bean of type FlowSpec:
@Bean
public FlowSpec flowSpec(Reader reader, Transformer transformer, Writer writer) {
return FlowBuilder.flow(IndexMessage.class, Document.class, IndexDocument.class)
.reader(reader)
.transformer(transformer)
.writerSendingMessage(writer)
.build();
}
With the Reader
, Transformer
and Writer
implementing the Function
interface:
Reader |
Function<IndexMessage, Document> |
loads document from storage |
Transformer |
Function<Document, IndexDocument> |
uses Document to build up the data structure needed for indexing |
Writer |
Function<IndexDocument, Message> |
sends indexes the data and returns a message for the next workflow step |
All classes that do data processing (Reader, Transformer, Writer,...) should be stateless. This has two reasons:
First, it makes your code thread-safe and multiprocessing easy without you having to even think about it. Just keep it stateless and fly!
Second, it makes testing a breeze: You throw in data and check the data that comes out. Things can go wrong? Just check if your code throws the right exceptions. Wherever you need to interact with external services, mock the behaviour, and your good to go (the Flusswerk tests make heavy use of Mockito, btw.).
If you absolutely have to introduce state, make sure your code is thread-safe.
Wherever sensible, make your data classes immutable - set everything via the
constructor and avoid setters. Implement equals()
and hashCode()
. This leads
usually to more readable code, and makes writing tests much easier. This applies
to Message classes and to the classes that contain data.
Your particular data processing needs to build your data over time and can't be immutable? Think again if that is the best way, but don't worry too much.
For manual interaction with RabbitMQ there is a Spring component with the same class:
RabbitMQ |
|
---|---|
ack(Message) |
acknowledges a Message received from a Queue |
queue(String) |
returns the Queue instance to interact with a queue of the given name |
topic(Message) |
returns the Topic instance for the given name to send messages to |
route(Message) |
returns the Topic instance for the given route from application.yml |
Any data processing can go wrong. Flusswerk supports two error handling modes:
- stop processing for a message completely. This behaviour is triggered by a StopProcessingException.
- retry processing for a message later. This behaviour is triggered by a RetryProcessingException or any other RuntimeException.
The default retry behaviour is to wait 30 seconds between retries and try up to 5 times. If processing a message still keeps failing, it is then treated like as if a StopProcessingException had been thrown and will be routed to a failed queue.
For more fine-grained control, see the configuration parameters for
flusswerk.routing.failure policies
.
Processing can be skipped for a message by throwing a SkipProcessingException
at any point in your
code. Log messages and metrics will have status=skip
to indicate that processing was skipped.
Every Flusswerk application provides base metrics via as a Prometheus endpoint:
flusswerk.messages |
total number of processed messages since application start |
flusswerk.messages.seconds |
total amount of time spent on processing these messages |
To include custom metrics, get counters via MeterFactory. A bean of type FlowMetrics can also consume execution information of single flows (best to extend BaseMetrics for that).
The prometheus endpoint is available at /actuator/prometheus
.
To customize log messages, provide a bean of type ProcessReport.