Persistent event bus implementation for Java. Easily integrates with Spring Framework. By default uses file backed database. Guarantees that event will be dispatched.
-
Add Duramen dependency:
com.devskiller.duramen:duramen:1.1.2
-
Use
@EnableDuramen
annotation to import Duramen into your project:
@Configuration
@ComponentScan
@EnableDuramen
public class FooConfiguration {
}
- Implement custom event class. Remember that this class must contain (even private) default constructor
public class FooEvent implements Event {
private String message;
// getters and setters
}
- To produce events you have to implement producer component:
import io.codearte.duramen.EventBus;
@Component
public class FooEventProducer {
private final EventBus eventBus;
@Autowired
public FooEventProducer(EventBus eventBus) {
this.eventBus = eventBus;
}
/**
* You will invoke this method somewhere in your code
*/
public void produceEvent() {
FooEvent event = new FooEvent();
event.setMessage("Test message");
eventBus.publish(event);
}
}
- To receive events you have to implement consumer. Generic type in
EventHandler
will decide which events will be processed in particular consumer:
import io.codearte.duramen.handler.EventHandler;
@Component
public class FooEventConsumer implements EventHandler<FooEvent> {
@Override
public void onEvent(FooEvent event) {
System.out.println("Received message: " + event.getMessage());
}
}
All Spring beans implementing EventHandler
interface will be automatically registered as handlers. It's also possible to manually register EventHandler
by invoking eventBus.register(qualifiedEventClassName, eventHandler)
method.
-
If you want to retry events after some error occur you just have to implement
RetryableEvent
instead ofEvent
-
If you want to use transaction-aware events just please annotate
Event
classes with@ProcessAfterCommit
. As the name suggests such events will be processed after committing the transaction.
Usually in test scope we don't want to persist our events. To achieve such behaviour we can configure custom bean:
import io.codearte.duramen.datastore.InMemory();
@Bean
public Datastore inMemoryDatastore() {
return new InMemory();
}
When EventHandler
processing bean throws an exception, it will be logged with event data serialized to JSON.
You can specify custom ExceptionHandler
by creating bean implementing io.codearte.duramen.handler.ExceptionHandler
interface.
In Duramen there are 2 Datastore
objects.
Default implementation. Backed by Chronicle Map. It stores events in binary file (by default duramen.data). To use this implementation you don't have to do anything, as long as you accept default values (see "Specifying messages limits"). To change defaults you need create own bean:
import io.codearte.duramen.datastore.FileData;
@Bean
public Datastore fileDatastore() {
return new FileData("/tmp/myfile.data", /*entries*/ 10, /*entrySize*/ 8192);
}
We've already described InMemory
datastore in "Testing" section
As you can see to use Duramen no configuration is required. However if you want, there are some options to customize.
To do that just provide DuramenConfiguration
bean:
@Bean
public DuramenConfiguration duramenConfiguration() {
return DuramenConfiguration.builder().retryDelayInSeconds(30).build()
}
By default message size is set to 4096 bytes. You can change this value by defining bean:
DuramenConfiguration.builder().maxMessageSize(8192).build()
Message count limit is set to 1024 events in queue. You can change this value by defining bean:
DuramenConfiguration.builder().maxMessageCount(2048).build()
}
By default Duramen reties all RetryableEvent
events after any exception 3 times with 5 seconds delay. Of course you can customize
those settings by DuramenConfiguration
:
DuramenConfiguration.builder().retryDelayInSeconds(5).retryCount(10)
.retryableExceptions(SocketTimeoutException.class).build()
By default Duramen uses daemon threads, but it can be easily changes by declaring:
DuramenConfiguration.builder().useDaemonThreads(false).build()
Also number of threads processing events (default we use only one thread) can be increased:
DuramenConfiguration.builder().maxProcessingThreads(2).build()
Finally, if you want, there is a possibility to use own ExecutorService for processing events.
@Bean
public ExecutorService duramenExecutorService() {
return Executors.newCachedThreadPool();
}
Performance tests executed using JMH on Linux-4.12.5, Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Datastore | Event type* | Events / second |
---|---|---|
Filedata | Simple | 943 431 |
Filedata | Complex | 326 803 |
InMemory | Simple | 1 494 560 |
InMemory | Complex | 384 291 |
- Simple event contains one short String field
- Complex event contains 512 chars String field, one BigDecimal field and 6 element ArrayList of Integers