Skip to content

Commit

Permalink
added support for DeadEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
benni committed Jan 18, 2013
1 parent d08470c commit fca9c60
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 26 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Of course you can always clone the repository and build from source


<h3>1.1.1</h3> <h3>1.1.1</h3>


+ Added support for DeadEvent
+ Introduced new property to @Listener annotation that allows to activate/deactivate any message handler + Introduced new property to @Listener annotation that allows to activate/deactivate any message handler
+ Full support of proxies created by cglib + Full support of proxies created by cglib
+ Message handler inheritance changed! See wiki page about handler definition for more details. + Message handler inheritance changed! See wiki page about handler definition for more details.
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/net/engio/mbassy/AbstractMessageBus.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public abstract class AbstractMessageBus<T, P extends IMessageBus.IPostCommand>
private final List<Thread> dispatchers = new CopyOnWriteArrayList<Thread>(); private final List<Thread> dispatchers = new CopyOnWriteArrayList<Thread>();


// all pending messages scheduled for asynchronous dispatch are queued here // all pending messages scheduled for asynchronous dispatch are queued here
private final BlockingQueue<MessagePublication<T>> pendingMessages; private final BlockingQueue<MessagePublication> pendingMessages;


// this factory is used to create specialized subscriptions based on the given message handler configuration // this factory is used to create specialized subscriptions based on the given message handler configuration
// it can be customized by implementing the getSubscriptionFactory() method // it can be customized by implementing the getSubscriptionFactory() method
Expand All @@ -56,7 +56,7 @@ public AbstractMessageBus(BusConfiguration configuration) {
this.executor = configuration.getExecutor(); this.executor = configuration.getExecutor();
subscriptionFactory = configuration.getSubscriptionFactory(); subscriptionFactory = configuration.getSubscriptionFactory();
this.metadataReader = configuration.getMetadataReader(); this.metadataReader = configuration.getMetadataReader();
pendingMessages = new LinkedBlockingQueue<MessagePublication<T>>(configuration.getMaximumNumberOfPendingMessages()); pendingMessages = new LinkedBlockingQueue<MessagePublication>(configuration.getMaximumNumberOfPendingMessages());
initDispatcherThreads(configuration.getNumberOfMessageDispatchers()); initDispatcherThreads(configuration.getNumberOfMessageDispatchers());
addErrorHandler(new IPublicationErrorHandler.ConsoleLogger()); addErrorHandler(new IPublicationErrorHandler.ConsoleLogger());
} }
Expand Down Expand Up @@ -151,7 +151,7 @@ public void addErrorHandler(IPublicationErrorHandler handler) {
} }


// this method enqueues a message delivery request // this method enqueues a message delivery request
protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublication<T> request){ protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request){
try { try {
pendingMessages.put(request); pendingMessages.put(request);
return request.markScheduled(); return request.markScheduled();
Expand All @@ -161,7 +161,7 @@ protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublicatio
} }


// this method enqueues a message delivery request // this method enqueues a message delivery request
protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublication<T> request, long timeout, TimeUnit unit){ protected MessagePublication addAsynchronousDeliveryRequest(MessagePublication request, long timeout, TimeUnit unit){
try { try {
return pendingMessages.offer(request, timeout, unit) return pendingMessages.offer(request, timeout, unit)
? request.markScheduled() ? request.markScheduled()
Expand All @@ -172,6 +172,7 @@ protected MessagePublication<T> addAsynchronousDeliveryRequest(MessagePublicatio
} }


// obtain the set of subscriptions for the given message type // obtain the set of subscriptions for the given message type
// Note: never returns null!
protected Collection<Subscription> getSubscriptionsByMessageType(Class messageType) { protected Collection<Subscription> getSubscriptionsByMessageType(Class messageType) {
Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc); Set<Subscription> subscriptions = new TreeSet<Subscription>(Subscription.SubscriptionByPriorityDesc);


Expand Down
4 changes: 2 additions & 2 deletions src/main/java/net/engio/mbassy/IMessageBus.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public static interface IPostCommand<T>{
* *
* @return A message publication that can be used to access information about the state of * @return A message publication that can be used to access information about the state of
*/ */
public MessagePublication<T> asynchronously(); public MessagePublication asynchronously();




/** /**
Expand All @@ -149,7 +149,7 @@ public static interface IPostCommand<T>{
* *
* @return A message publication that wraps up the publication request * @return A message publication that wraps up the publication request
*/ */
public MessagePublication<T> asynchronously(long timeout, TimeUnit unit); public MessagePublication asynchronously(long timeout, TimeUnit unit);


} }


Expand Down
44 changes: 33 additions & 11 deletions src/main/java/net/engio/mbassy/MBassador.java
Original file line number Original file line Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.engio.mbassy; package net.engio.mbassy;


import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.Subscription;


import java.util.Collection; import java.util.Collection;
Expand All @@ -13,17 +14,26 @@ public MBassador(BusConfiguration configuration) {
} }




public MessagePublication<T> publishAsync(T message) { public MessagePublication publishAsync(T message) {
return addAsynchronousDeliveryRequest(MessagePublication.Create( return addAsynchronousDeliveryRequest(createMessagePublication(message));
getSubscriptionsByMessageType(message.getClass()), message));
} }


public MessagePublication<T> publishAsync(T message, long timeout, TimeUnit unit) { public MessagePublication publishAsync(T message, long timeout, TimeUnit unit) {
return addAsynchronousDeliveryRequest(MessagePublication.Create( return addAsynchronousDeliveryRequest(createMessagePublication(message), timeout, unit);
getSubscriptionsByMessageType(message.getClass()), message), timeout, unit); }

private MessagePublication createMessagePublication(T message) {
Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if (subscriptions == null || subscriptions.isEmpty()) {
// Dead Event
subscriptions = getSubscriptionsByMessageType(DeadEvent.class);
return MessagePublication.Create(subscriptions, new DeadEvent(message));
}
else return MessagePublication.Create(subscriptions, message);
} }





/** /**
* Synchronously publish a message to all registered listeners (this includes listeners defined for super types) * Synchronously publish a message to all registered listeners (this includes listeners defined for super types)
* The call blocks until every messageHandler has processed the message. * The call blocks until every messageHandler has processed the message.
Expand All @@ -32,13 +42,25 @@ public MessagePublication<T> publishAsync(T message, long timeout, TimeUnit unit
*/ */
public void publish(T message) { public void publish(T message) {
try { try {
MessagePublication publication = createMessagePublication(message);
publication.execute();

/*
final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass()); final Collection<Subscription> subscriptions = getSubscriptionsByMessageType(message.getClass());
if (subscriptions == null) { if (subscriptions == null || subscriptions.isEmpty()) {
return; // TODO: Dead Event? // publish a DeadEvent since no subscriptions could be found
} final Collection<Subscription> deadEventSubscriptions = getSubscriptionsByMessageType(DeadEvent.class);
for (Subscription subscription : subscriptions) { if (deadEventSubscriptions != null && !deadEventSubscriptions.isEmpty()) {
subscription.publish(message); for (Subscription subscription : deadEventSubscriptions) {
subscription.publish(new DeadEvent(message));
}
}
} }
else{
for (Subscription subscription : subscriptions) {
subscription.publish(message);
}
}*/
} catch (Throwable e) { } catch (Throwable e) {
handlePublicationError(new PublicationError() handlePublicationError(new PublicationError()
.setMessage("Error during publication of message") .setMessage("Error during publication of message")
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/net/engio/mbassy/MessagePublication.java
Original file line number Original file line Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.engio.mbassy; package net.engio.mbassy;


import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.subscription.Subscription; import net.engio.mbassy.subscription.Subscription;


import java.util.Collection; import java.util.Collection;
Expand All @@ -12,19 +13,19 @@
* @author bennidi * @author bennidi
* Date: 11/16/12 * Date: 11/16/12
*/ */
public class MessagePublication<T> { public class MessagePublication {


public static <T> MessagePublication<T> Create(Collection<Subscription> subscriptions, T message){ public static MessagePublication Create(Collection<Subscription> subscriptions, Object message){
return new MessagePublication<T>(subscriptions, message, State.Initial); return new MessagePublication(subscriptions, message, State.Initial);
} }


private Collection<Subscription> subscriptions; private Collection<Subscription> subscriptions;


private T message; private Object message;


private State state = State.Scheduled; private State state = State.Scheduled;


private MessagePublication(Collection<Subscription> subscriptions, T message, State initialState) { private MessagePublication(Collection<Subscription> subscriptions, Object message, State initialState) {
this.subscriptions = subscriptions; this.subscriptions = subscriptions;
this.message = message; this.message = message;
this.state = initialState; this.state = initialState;
Expand Down Expand Up @@ -54,18 +55,22 @@ public boolean isScheduled() {
return state.equals(State.Scheduled); return state.equals(State.Scheduled);
} }


public MessagePublication<T> markScheduled(){ public MessagePublication markScheduled(){
if(!state.equals(State.Initial)) if(!state.equals(State.Initial))
return this; return this;
state = State.Scheduled; state = State.Scheduled;
return this; return this;
} }


public MessagePublication<T> setError(){ public MessagePublication setError(){
state = State.Error; state = State.Error;
return this; return this;
} }


public boolean isDeadEvent(){
return DeadEvent.class.isAssignableFrom(message.getClass());
}

private enum State{ private enum State{
Initial,Scheduled,Running,Finished,Error; Initial,Scheduled,Running,Finished,Error;
} }
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/engio/mbassy/SyncAsyncPostCommand.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void now() {
} }


@Override @Override
public MessagePublication<T> asynchronously() { public MessagePublication asynchronously() {
return mBassador.publishAsync(message); return mBassador.publishAsync(message);
} }


Expand Down
21 changes: 21 additions & 0 deletions src/main/java/net/engio/mbassy/common/DeadEvent.java
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.engio.mbassy.common;

/**
* The DeadEvent is delivered to all subscribed handlers (if any) whenever no message
* handlers could be found for a given message publication.
*
* @author bennidi
* Date: 1/18/13
*/
public class DeadEvent {

private Object event;

public DeadEvent(Object event) {
this.event = event;
}

public Object getEvent() {
return event;
}
}
3 changes: 2 additions & 1 deletion src/test/java/net/engio/mbassy/AllTests.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
FilterTest.class, FilterTest.class,
MetadataReaderTest.class, MetadataReaderTest.class,
ListenerSubscriptionTest.class, ListenerSubscriptionTest.class,
MethodDispatchTest.class MethodDispatchTest.class,
DeadEventTest.class
}) })
public class AllTests { public class AllTests {
} }
48 changes: 48 additions & 0 deletions src/test/java/net/engio/mbassy/DeadEventTest.java
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,48 @@
package net.engio.mbassy;

import net.engio.mbassy.common.ConcurrentSet;
import net.engio.mbassy.common.DeadEvent;
import net.engio.mbassy.common.UnitTest;
import net.engio.mbassy.listener.Listener;
import org.junit.Test;

/**
* Verify correct behaviour in case of empty message publications
*
* @author bennidi
* Date: 1/18/13
*/
public class DeadEventTest extends UnitTest{


@Test
public void testDeadEvent(){
MBassador bus = new MBassador(BusConfiguration.Default());
DeadEventHandler deadEventHandler = new DeadEventHandler();
bus.subscribe(deadEventHandler);
assertEquals(0, deadEventHandler.getDeadEventCount());
bus.post(new Object()).now();
assertEquals(1, deadEventHandler.getDeadEventCount());
bus.post(323).now();
assertEquals(2, deadEventHandler.getDeadEventCount());
bus.publish("fkdfdk");
assertEquals(3, deadEventHandler.getDeadEventCount());
}

public class DeadEventHandler{

private ConcurrentSet deadEvents = new ConcurrentSet();

@Listener
public void handle(DeadEvent event){
deadEvents.add(event);
}


public int getDeadEventCount(){
return deadEvents.size();
}

}

}

0 comments on commit fca9c60

Please sign in to comment.