diff --git a/README.md b/README.md
index 60a31dc5..4e596864 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,11 @@ MBassador is designed to be extensible with custom implementations of various co
Usage
-Handler definition (in any bean):
+### Handler definition
+
+Message handlers are defined via annotations added to instance methods. The simplest definition is to just use `@Handler` without changing any parameters or adding of any other annotations.
+> NOTE: MBassador uses WEAK REFERENCES by default. If you do not hold references to your listeners somewhere else they will be garbage collected! This can be changed by adding `@Listener(references=References.Strong)` to the defining class
+
// every message of type TestMessage or any subtype will be delivered
// to this handler
@@ -123,11 +127,21 @@ Handler definition (in any bean):
}
-Creation of message bus and registration of listeners:
+### Message bus creation
- // create as many instances as necessary
- // bind it to any upper bound
+ // Use a default constructor for convenience and create as many instances as you like
MBassador bus = new MBassador();
+ MBassador bus2 = new MBassador();
+
+ // Use feature driven configuration to have more control over the configuration details
+ MBassador globalBus = new MBassador(new BusConfiguration()
+ .addFeature(Feature.SyncPubSub.Default())
+ .addFeature(Feature.AsynchronousHandlerInvocation.Default())
+ .addFeature(Feature.AsynchronousMessageDispatch.Default())
+ .setProperty(Properties.Common.Id, "global bus")
+ .setProperty(Properties.Handler.PublicationError, new IPublicationErrorHandler{...}));
+
+### Listener subscription
ListeningBean listener = new ListeningBean();
// the listener will be registered using a weak-reference if not configured otherwise with @Listener
bus.subscribe(listener);
@@ -135,13 +149,18 @@ Creation of message bus and registration of listeners:
bus.subscribe(new ClassWithoutAnyDefinedHandlers());
-Message publication:
+### Message publication
TestMessage message = new TestMessage();
TestMessage subMessage = new SubTestMessage();
+Messages can be published asynchronously in another thread (fire and forget):
+
bus.publishAsync(message); //returns immediately, publication will continue asynchronously
bus.post(message).asynchronously(); // same as above
+
+Message can be published synchronously in the same thread:
+
bus.publish(subMessage); // will return after each handler has been invoked
bus.post(subMessage).now(); // same as above
@@ -161,15 +180,19 @@ You can also download binary release and javadoc from the [maven central reposit
There is ongoing effort to extend documentation and provide code samples and detailed explanations of how the message bus works. Code samples can also be found in the various test cases. Please read about the terminology used in this project to avoid confusion and misunderstanding.
Release Notes
-
[1.2.1](milestones/1.2.1)
- + Not yet released!
- + API-Changes:
- + Removed deprecated method BusConfiguration.SyncAsync() -> use MBassador default constructor instead
- + Deleted interface ISyncMessageBus since it was merely an aggregation of existing interfaces -> replace with GenericMessagePublicationSupport
-
1.2.0
- + Added support for conditional handlers using Java EL. Thanks to Bernd Rosstauscher
- for the initial implementation.
+### [1.2.1](milestones/1.2.1)
+ + Not yet released!
+ + Centralized handling of common (and arbitrary) properties (see BusConfiguration#setProperty and net.engio.mbassy.bus.common.Properties)
+ + Each bus now has a configurable id and respective #toString() implementation (useful for debugging)
+ + Each bus now has a default logger (System.out) for publication errors (exception in handlers) which can be replaced with BusConfiguration#setProperty
+ + __API-Changes:__
+ + Interface `IMessageFilter` now receives the SubscriptionContext as second parameter. This gives access to the bus runtime within filter logic (useful for error propagation). -> Change your filters signature. You can access the `MessageHandler` object directly from the context.
+ + Removed deprecated method BusConfiguration.SyncAsync() -> Use default constructor or feature based configuration instead
+ + Deleted interface ISyncMessageBus since it was merely an aggregation of existing interfaces -> Replace with GenericMessagePublicationSupport
+
+### 1.2.0
+ + Added support for conditional handlers using Java EL. Thanks to Bernd Rosstauscher for the initial implementation.
+ BREAKING CHANGES in BusConfiguration
+ Complete redesign of configuration setup using Features instead of simple get/set parameters. This will allow
to flexibly combine features and still be able to exclude those not available in certain environments,for example, threading and reflection in GWT (this will be part of future releases)
@@ -178,7 +201,7 @@ There is ongoing effort to extend documentation and provide code samples and det
with its corresponding interface which will be used for all types of message bus implementations
-
1.1.10
+### 1.1.10
+ Fixed broken sort order of prioritized handlers (see #58)
+ Addressed issue #63 by making the constructor of `MessageHandler` use a map of properties and by replacing dependencies to
all MBassador specific annotations with Java primitives and simple interfaces
@@ -188,11 +211,11 @@ There is ongoing effort to extend documentation and provide code samples and det
asynchronous FIFO (asynchronous message publications guaranteed to be delivered in the order they occurred)
+ Renamed runtime property of `BusRuntime` "handler.async-service" to "handler.async.executor"
-
+### 1.1.8
+ Internal refactorings and code improvements
+ Fixed #44 #45 #47
@@ -200,7 +223,7 @@ There is ongoing effort to extend documentation and provide code samples and det
version 1.1.8 is not available from the central repository
-
1.1.7
+### 1.1.7
+ Console Logger not added to message bus instances by default -> use addErrorHandler(IPublicationErrorHandler.ConsoleLogger)
+ Fixed race conditions in net.engio.mbassy.subscription.Subscription and of WeakConcurrentSet.contains()
@@ -209,7 +232,7 @@ There is ongoing effort to extend documentation and provide code samples and det
+ Improved test-infrastructure and increased test-coverage
+ Thanks for your feedback!
-
1.1.6
+### 1.1.6
+ Added support for choosing between strong and weak references using the new @Listener annotation. @Listener can be
added to any class that defines message handlers and allows to configure which reference type is used
@@ -221,7 +244,7 @@ There is ongoing effort to extend documentation and provide code samples and det
+ Created a message bus implementation that does not use threading to support use in non-multi-threaded environments like GWT,
see ISyncMessageBus
-
1.1.3
+### 1.1.3
+ Added support for FilteredMessage event
+ Renamed @Listener to @Handler and DeadEvent to DeadMessage to increase alignment with the established terminology.
@@ -230,7 +253,7 @@ There is ongoing effort to extend documentation and provide code samples and det
+ Introduced message publication factories as configurable components to make MBassador more extensible/customizable
+ Added more documentation and unit tests
-
1.1.1
+### 1.1.1
+ Added support for DeadMessage event
+ Introduced new property to @Handler annotation that allows to activate/deactivate any message handler
@@ -240,7 +263,7 @@ There is ongoing effort to extend documentation and provide code samples and det
more precisely indicate their meaning
+ Added more unit tests
-
1.1.0
+### 1.1.0
First stable release!
@@ -248,20 +271,20 @@ First stable release!
+ More exhaustive unit tests
+ Installation from the central repository
-
1.0.6.RC
+### 1.0.6.RC
+ Fixed behaviour with capacity bound blocking queue such that there now are two methods to schedule a message
asynchronously. One will block until capacity becomes available, the other will timeout after a specified amount of
time.
+ Additional unit tests
-
1.0.5.RC
+### 1.0.5.RC
+ Added MessageEnvelope and @Enveloped annotation to configure handlers that might receive arbitrary message type
+ Added handler configuration property to @Handler annotation to move from message filtering to more specific implementation
of this feature
-
1.0.4.RC
+### 1.0.4.RC
+ Introduced BusConfiguration as a central class to encapsulate configurational aspects
diff --git a/src/docs/TODO.md b/src/docs/TODO.md
index 44941972..85ddb7fe 100644
--- a/src/docs/TODO.md
+++ b/src/docs/TODO.md
@@ -2,13 +2,15 @@
Asyncbus.shutdown() -> no test coverage
EnvelopedMessageDispatcher -> not tested at all
-#Refactorings
+#Refactorings
++ split up IMessagePublication into two separate interfaces (internal and external)
++ create MessagePublicationFactory
#Improvements
Prio 1: Validation of handlers
ERROR:Handler with mismatching parameter types
- ERROR:Interfaces + rejectSubtypes
+ ERROR:Interfaces/Abstract + rejectSubtypes
WARN:@Synchronized only for some handlers of a given listener
Prio 2: Lifecycle Callbacks = Implement in MessagePublication (BeforeStart,AfterCompletion)
diff --git a/src/docs/wiki-listener-def.md b/src/docs/wiki-listener-def.md
index ea2fc722..10c68c5a 100644
--- a/src/docs/wiki-listener-def.md
+++ b/src/docs/wiki-listener-def.md
@@ -32,37 +32,37 @@ filters, delivery modes etc.
-
rejectSubtypes
-
The primary message type consumed by a message handler is determined by the type of
- its parameter.Polymorphism does allow any sub type of that message type to be delivered
- to the handler as well, which is the default behaviour of any message handler.
- The handler can be configured to not receiving any sub types by specifying thus using this
- property.
-
-
false
+
rejectSubtypes
+
The primary message type consumed by a message handler is determined by the type of
+ its parameter.Polymorphism does allow any sub type of that message type to be delivered
+ to the handler as well, which is the default behaviour of any message handler.
+ The handler can be configured to not receiving any sub types by specifying thus using this
+ property.
+
+
false
-
-
enabled
-
A handler can be explicitly disabled to not take part in message delivery.
-
-
true
+
+
enabled
+
A handler can be explicitly disabled to not take part in message delivery.
+
+
true
-
strongReferencess
-
Whether the bus should use storng references to the listeners instead of weak references
-
-
false
+
strongReferencess
+
Whether the bus should use storng references to the listeners instead of weak references
+
+
false
-
invocation
-
Specify a custom implementation for the handler invocation. By default, a generic implementation
- that uses reflection will be used. Note: A custom implementation will not be faster than the generic one
- since there are heavy optimizations by the JVM using JIT-Compiler and more.
-
-
false
-
+
invocation
+
Specify a custom implementation for the handler invocation. By default, a generic implementation
+ that uses reflection will be used. Note: A custom implementation will not be faster than the generic one
+ since there are heavy optimizations by the JVM using JIT-Compiler and more.
+
-To avoid confusion and increase precision of the available documentation a common vocabulary of the most relevant concepts is necessary.
-Specifically, the terms "event" and "message" have their own definition within the context of the message bus system and as such require
+To avoid confusion and increase precision of the available documentation a common vocabulary of the most relevant concepts is necessary. Specifically, the terms "event" and "message" have their own definition within the context of the message bus system and as such require
some clarification.
Message
-A message is an object used for communication between multiple other objects.Other libraries established the term "event" which essentially
-refers to the same idea (an event occurs at some point in the system and is published to other components such that they might react to it).
-MBassador uses the term message instead of event since the object sent through it does not necessarily represent an event. It might merely represent
-data to be processed, e.g. stored or transformed.
+A message is an object used for communication between a sender and a set of receivers. Other libraries established the term "event" which essentially refers to the same idea (an event occurs at some point in the system and is published to other components such that they might react to it).
+MBassador uses the term `message` instead of `event` since the object sent over the wire does not necessarily represent an event. It might merely represent data to be processed, e.g. stored or transformed.
A message can be any object, no restrictions or assumptions are made. A message can be sent by any object that has access to the bus
-and is delivered to all registered listeners that consume the type of message.
+and is delivered to all registered handlers consuming that type of message.
Message handler
-A message handler is a method that defines exactly one parameter (the message) and is marked with @Handler. A handler has a message type
-that is implicitly defined in the method signature (the parameter type). A message handler will be invoked for each message that has a compatible
-type.
+A message handler is a method that defines exactly one parameter (the message or a message envelope) and is marked with @Handler. A handler has a message type that is implicitly defined in the method signature (the parameter type). A message handler will be invoked for each message that has a compatible type.
Message listener
-An object that defines one or more message handlers and that has been subscribed at the message bus is referred to as (message) listener.
+A class defining one or more message handlers and that has been subscribed at the message bus is referred to as (message) listener.
Subscription
Subscription is the process of adding a listener to the message bus, such that it might receive messages. It is used interchangeably with the
diff --git a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java
index a18febc3..143e2124 100644
--- a/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java
+++ b/src/main/java/net/engio/mbassy/bus/AbstractPubSubSupport.java
@@ -1,6 +1,7 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.DeadMessage;
+import net.engio.mbassy.bus.common.Properties;
import net.engio.mbassy.bus.common.PubSubSupport;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
@@ -9,10 +10,9 @@
import net.engio.mbassy.subscription.Subscription;
import net.engio.mbassy.subscription.SubscriptionManager;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
+
+import static net.engio.mbassy.bus.common.Properties.Handler.PublicationError;
/**
* The base class for all message bus implementations.
@@ -33,8 +33,15 @@ public abstract class AbstractPubSubSupport implements PubSubSupport {
public AbstractPubSubSupport(IBusConfiguration configuration) {
- this.runtime = new BusRuntime(this);
- this.runtime.add(BusRuntime.Properties.ErrorHandlers, getRegisteredErrorHandlers());
+ if(!configuration.hasProperty(Properties.Handler.PublicationError)){
+ System.out.println("WARN: No error handler configured to handle exceptions during publication.\n" +
+ "Error handlers can be added to any instance of AbstractPubSubSupport or via BusConfiguration. \n" +
+ "Falling back to console logger.");
+ }
+ this.errorHandlers.add(configuration.getProperty(Properties.Handler.PublicationError, new IPublicationErrorHandler.ConsoleLogger()));
+ this.runtime = new BusRuntime(this)
+ .add(PublicationError, getRegisteredErrorHandlers())
+ .add(Properties.Common.Id, UUID.randomUUID().toString());
// configure the pub sub feature
Feature.SyncPubSub pubSubFeature = configuration.getFeature(Feature.SyncPubSub.class);
this.subscriptionManager = pubSubFeature.getSubscriptionManagerProvider()
@@ -97,4 +104,8 @@ public void handlePublicationError(PublicationError error) {
}
}
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{ " + runtime.get(Properties.Common.Id) + "}";
+ }
}
diff --git a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java
index 96e1d956..e8c99dce 100644
--- a/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java
+++ b/src/main/java/net/engio/mbassy/bus/AbstractSyncAsyncMessageBus.java
@@ -1,6 +1,8 @@
package net.engio.mbassy.bus;
import net.engio.mbassy.bus.common.IMessageBus;
+import net.engio.mbassy.bus.common.Properties;
+import net.engio.mbassy.bus.config.ConfigurationError;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.config.IBusConfiguration;
import net.engio.mbassy.bus.error.PublicationError;
@@ -9,15 +11,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The base class for all message bus implementations with support for asynchronous message dispatch
*
- * @param
- * @param
+ * @param The type of message this bus consumes
+ * @param
The publication commands this bus supports depend on P
*/
public abstract class AbstractSyncAsyncMessageBus
extends AbstractPubSubSupport implements IMessageBus {
@@ -36,14 +37,20 @@ protected AbstractSyncAsyncMessageBus(IBusConfiguration configuration) {
// configure asynchronous message dispatch
Feature.AsynchronousMessageDispatch asyncDispatch = configuration.getFeature(Feature.AsynchronousMessageDispatch.class);
+ if(asyncDispatch == null){
+ configuration.handleError(ConfigurationError.Missing(Feature.AsynchronousMessageDispatch.class));
+ }
pendingMessages = asyncDispatch.getPendingMessages();
dispatchers = new ArrayList(asyncDispatch.getNumberOfMessageDispatchers());
initDispatcherThreads(asyncDispatch);
// configure asynchronous handler invocation
Feature.AsynchronousHandlerInvocation asyncInvocation = configuration.getFeature(Feature.AsynchronousHandlerInvocation.class);
+ if(asyncInvocation == null){
+ configuration.handleError(ConfigurationError.Missing(Feature.AsynchronousHandlerInvocation.class));
+ }
this.executor = asyncInvocation.getExecutor();
- getRuntime().add(BusRuntime.Properties.AsynchronousHandlerExecutor, executor);
+ getRuntime().add(Properties.Handler.AsynchronousHandlerExecutor, executor);
}
@@ -117,9 +124,4 @@ public boolean hasPendingMessages() {
return pendingMessages.size() > 0;
}
- @Override
- public Executor getExecutor() {
- return executor;
- }
-
}
diff --git a/src/main/java/net/engio/mbassy/bus/BusFactory.java b/src/main/java/net/engio/mbassy/bus/BusFactory.java
index 683a84cd..bd014644 100644
--- a/src/main/java/net/engio/mbassy/bus/BusFactory.java
+++ b/src/main/java/net/engio/mbassy/bus/BusFactory.java
@@ -16,8 +16,6 @@ public class BusFactory {
* Create a message bus supporting only synchronous message publication.
* All message publications will run in the calling thread, no bus internal
* multi-threading will occur.
- *
- * @return
*/
public static SyncMessageBus SynchronousOnly(){
BusConfiguration syncPubSubCfg = new BusConfiguration();
@@ -26,12 +24,9 @@ public static SyncMessageBus SynchronousOnly(){
}
/**
- * Create a message bus supporting synchronous and asynchronous message publication.
+ * Create a message bus with support for synchronous and asynchronous message publication.
* Asynchronous message publication will be handled by a single thread such that FIFO
* order of message processing is guaranteed.
- *
- *
- * @return
*/
public static IMessageBus AsynchronousSequentialFIFO(){
BusConfiguration asyncFIFOConfig = new BusConfiguration();
diff --git a/src/main/java/net/engio/mbassy/bus/BusRuntime.java b/src/main/java/net/engio/mbassy/bus/BusRuntime.java
index 047edb09..17c2f43a 100644
--- a/src/main/java/net/engio/mbassy/bus/BusRuntime.java
+++ b/src/main/java/net/engio/mbassy/bus/BusRuntime.java
@@ -18,13 +18,6 @@
*/
public class BusRuntime {
- public static class Properties{
-
- public static final String ErrorHandlers = "error.handlers";
- public static final String AsynchronousHandlerExecutor = "handler.async.executor";
-
- }
-
private PubSubSupport provider;
private Map properties = new HashMap();
diff --git a/src/main/java/net/engio/mbassy/bus/MBassador.java b/src/main/java/net/engio/mbassy/bus/MBassador.java
index ec249f42..aa140a7b 100644
--- a/src/main/java/net/engio/mbassy/bus/MBassador.java
+++ b/src/main/java/net/engio/mbassy/bus/MBassador.java
@@ -12,15 +12,16 @@
public class MBassador extends AbstractSyncAsyncMessageBus> implements IMessageBus> {
+
public MBassador(IBusConfiguration configuration) {
super(configuration);
}
public MBassador(){
- super(new BusConfiguration()
- .addFeature(Feature.SyncPubSub.Default())
- .addFeature(Feature.AsynchronousHandlerInvocation.Default())
- .addFeature(Feature.AsynchronousMessageDispatch.Default()));
+ this(new BusConfiguration()
+ .addFeature(Feature.SyncPubSub.Default())
+ .addFeature(Feature.AsynchronousHandlerInvocation.Default())
+ .addFeature(Feature.AsynchronousMessageDispatch.Default()));
}
@@ -47,7 +48,7 @@ public void publish(T message) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
- .setPublishedObject(message));
+ .setPublishedMessage(message));
}
}
diff --git a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java
index 5cd4892f..9d56cf16 100644
--- a/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java
+++ b/src/main/java/net/engio/mbassy/bus/SyncMessageBus.java
@@ -28,7 +28,7 @@ public void publish(T message) {
handlePublicationError(new PublicationError()
.setMessage("Error during publication of message")
.setCause(e)
- .setPublishedObject(message));
+ .setPublishedMessage(message));
}
}
diff --git a/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java b/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java
index d55a8e15..e074cd3b 100644
--- a/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java
+++ b/src/main/java/net/engio/mbassy/bus/common/GenericMessagePublicationSupport.java
@@ -4,9 +4,10 @@
/**
* This interface is meant to be implemented by different bus implementations to offer a consistent way
- * to plugin different flavors of message publication.
+ * to plugin different methods of message publication.
*
- * The parametrization of the IPostCommand influences which publication flavours are available.
+ * The parametrization of the IPostCommand influences which publication methods (asynchronous, synchronous or
+ * conditional etc.) are available.
*
*/
public interface GenericMessagePublicationSupport extends PubSubSupport, ErrorHandlingSupport{
@@ -15,10 +16,10 @@ public interface GenericMessagePublicationSupport
@Override
P post(T message);
- /**
- * Get the executor service that is used for asynchronous message publications.
- * The executor is passed to the message bus at creation time.
- *
- * Note: The executor can be obtained from the run time. See
- * @return
- */
- @Deprecated
- Executor getExecutor();
-
/**
* Check whether any asynchronous message publications are pending to be processed
*
diff --git a/src/main/java/net/engio/mbassy/bus/common/Properties.java b/src/main/java/net/engio/mbassy/bus/common/Properties.java
new file mode 100644
index 00000000..e152d8fa
--- /dev/null
+++ b/src/main/java/net/engio/mbassy/bus/common/Properties.java
@@ -0,0 +1,25 @@
+package net.engio.mbassy.bus.common;
+
+/**
+ * A collection of properties commonly used by different parts of the library.
+ *
+ * @author bennidi
+ * Date: 22.02.15
+ */
+public final class Properties {
+
+ public static final class Handler {
+
+ public static final String PublicationError = "bus.handlers.error";
+ public static final String AsynchronousHandlerExecutor = "bus.handlers.async-executor";
+ }
+
+ public static final class Common {
+
+ public static final String Id = "bus.id";
+ }
+
+
+
+
+}
diff --git a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java
index 561ad1fe..93523cef 100644
--- a/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java
+++ b/src/main/java/net/engio/mbassy/bus/config/BusConfiguration.java
@@ -1,6 +1,8 @@
package net.engio.mbassy.bus.config;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
/**
@@ -8,26 +10,51 @@
*/
public class BusConfiguration implements IBusConfiguration {
- // the registered features
- private Map, Feature> features = new HashMap, Feature>();
+ // the registered properties
+ private final Map