Skip to content
Browse files

replying

  • Loading branch information...
1 parent 7e35a2d commit 1adfdd1115512e930cd9519ca2d1bdf4efc8e5b0 @smaldini smaldini committed
View
204 src/docs/guide/events/replying.gdoc
@@ -1,146 +1,66 @@
-### Events API : Spring Integration
+Usually, an event is _fired and forgot_. In some cases, you may expect an answer to transform your messaging architecture
+into a controlled flow. For instance, a negative reply can be used in GORM events to cancel database writing for the current subjet.
+Another usual example is the aggregation of multiple workers products.
-Plugin platforms brings an event API to allow decoupled communication within your application and beyond.
-If you want to use more features like routing mechanisms or external message brokers, you could also rely on specifics plugins which implement
-the platform events API such as Spring Integration (si-events, this plugin).
+h3. Simple reply
-It configures routes, channel, and gateway for you. It allows grails dev to integrate their events with external systems through Spring Integration adapters.
+Replying is a simple matter of returning an object from the listener method :
-#### Declaring an event listener
-A listener is an handler attached to a topic name, ie 'userLogged' or 'bookArchived'.
-This handler can receive and return values as we will see in the 'Sending Events' paragraph.
-
-There are 3 ways to declare listeners :
-
-##### Services artefacts :
-
-You can surround candidate methods with @Listener(String topic. If you don't define explicitly any topic, then the method name will be used :
-
-```groovy
+{code}
class SomeService{
- @grails.events.Listener('userLogged')
- def myMethod(User user){
- //do something with user
- }
-
- @grails.events.Listener
- def mailSent(User user){ //use 'mailSent' as topic name
- //do something with user
- }
-}
-```
-
-##### Inline closures :
-
-Inside services, domains and controllers artefacts, you can call "String addListener(String topic, Closure closure)".
-This method returns a listener Id which is under the following format "topic:ClassName#method@hashCode":
-
-```groovy
-class SomeController{
-
- def auth(){
- String listenerId = addListener('userLogged'){User user->
- //do something with user
- }
- }
-}
-```
-
-
-##### Custom objects :
-
-You can also declare runtime listeners with any object and method using "String addListener(String topic, Object bean, Method/* or String */ method)".
-As previously mentionned, this method returns a listener Id which is under the following format "topic:ClassName#method@hashCode":
-
-```groovy
-class SomeController{
-
- def someService
-
- def auth(){
- addListener('userLogged', someService, 'myMethod')
- }
-}
-```
-
-#### Sending events
-
-You have 2 ways of sending events : asynchronously or syncronously. Both methods returns an EventReply object.
-EventReply implements Future<Object> and provides 3 usefuls methods :
-* List<Object> getValues()
-Return as many values as listeners has replied.
-* Object getValue()
-Return the first element of getValues().
-* int size()
-Return the replies count.
-
-
-##### Sync events
-Syncronous events can be sent from domains, services and controllers artefacts by using "EventReply event(String topic, Object data)" :
-
-```groovy
-class SomeService{
- @Listener('logout')
- def method(User user){
+ @Listener
+ def logout(User user){
Date disconnectDate = new Date()
- //do something very long with user
+ //do something with user
return disconnectDate
}
}
+{code}
+
+Whenever a listener return non null object, the caller can access it through the EventReply enveloppe returned immediatly
+after calling event/eventAsync. Getting the value is a blocking operation called *value* :
+{code}
class SomeController{
def logout(){
- def reply = syncEvent('logout', session.user)
- render reply.value //display disconnectDate
- }
-}
-```
+ def reply = eventAsync topic:"logout", data:session.user
-##### Async events
-Asyncronous events can be sent from domains, services and controllers artefacts by using "EventReply eventAsync(String topic, Object data)" :
+ render reply.value //wait and display value
-```groovy
-class SomeService{
- @Listener('logout')
- def method(User user){
- Date disconnectDate = new Date()
-
- //do something with user
+ def reply2 = eventAsync topic:"logout", data:session.user
- return disconnectDate
- }
+ render reply.get(30, TimeUnit.SECONDS) //EventReply object is a Future implementation, hence providing the same facilities such as timeout
+ }
}
+{code}
+
+h3. Multiple replies
+
+Multiple listeners can return values for the same topic/namespace. In this case, EventReply will wait for all handlers before returning any value.
+Remember that a valid result is a non null value, hence why even if 3 handlers have reacted but only 2 did return something, then you will only
+see 2 values in the *EventReply.values*.
+{code}
class SomeController{
def logout(){
- def reply = eventAsync('logout', session.user)
- render reply.value //block the thread until event response and display disconnectDate
+ def reply = event topic:"sendMails", data:session.user
+
+ render reply.value //wait for all listeners and then display the first value from the aggregated results
+ render reply.values //display all results as List
}
}
-```
+{code}
-##### Waiting replies
+h3. Waiting replies
In domains, services and controllers artefacts you can wait for events using "EventReply[] waitFor(EventReply... eventReplies)".
-This method is rather useless in a sync scenario. It accepts as many events replies you want and returns the same array
-for functiunal programming style. :
-
-```groovy
-class SomeService{
- @Listener('logout')
- def method(User user){
- Date disconnectDate = new Date()
-
- //do something with user
-
- return disconnectDate
- }
-}
+It accepts as many events replies you want and returns the same array for functiunal programming style. This method is rather useless in a sync scenario.
+{code}
class SomeController{
def logout(){
@@ -151,55 +71,11 @@ class SomeController{
waitFor(reply,reply2,reply3).each{EventReply reply->
render reply.value +'</br>'
}
- }
-}
-```
-
-
-#### GORM events (Grails 2 only)
-You can listen all the grails 2 gorm events using the same topic name than domain method handler described in grails documentation.
-The listener argument has to be typed to specify the domain that the handler listens for. If the gorm event can be cancelled like with beforeInsert or beforeValidation,
-the handler can return a false boolean to discard the event :
-
-```groovy
-class SomeService{
- @Listener('beforeInsert')
- def myMethod(User user){
- //do something with user
- false //cancel the current insert
- }
-}
-```
-
-#### Removing listeners
-
-To remove listeners, just use "int removeListeners(String listenerIdPattern)". The argument allows you to filter listeners by using the listener id pattern.
-For instance you can use both "topic" and "topic:ClassName#method".
-The method returns the number of deleted listeners :
-
-```groovy
-class SomeController{
-
- def logout(){
- println removeListeners('userLogged') //remove all listeners for topic 'userLogged'
- println removeListeners('statistics:org.sample.StatisticsService') // remove all listeners for topic 'statistics' and class 'StatisticsService'
+ //same with 20 seconds timeout on each reply
+ waitFor(20, TimeUnit.SECONDS, reply,reply2,reply3).each{EventReply reply->
+ render reply.value +'</br>'
+ }
}
}
-```
-
-#### Counting listeners
-
-To count listeners, just use "int countListeners(String listenerIdPattern)". The argument allows you to filter listeners by using the listener id pattern.
-For instance you can use both "topic" and "topic:ClassName#method".
-The method returns the number of listeners :
-
-
-```groovy
-class SomeController{
-
- def logout(){
- println countListeners('userLogged') //count all listeners for topic 'userLogged'
- println countListeners('statistics:org.sample.StatisticsService') // count all listeners for topic 'statistics' and class 'StatisticsService'
- }
-}
+{code}
View
12 src/groovy/org/grails/plugin/platform/events/Events.groovy
@@ -17,6 +17,10 @@
*/
package org.grails.plugin.platform.events
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeoutException
+
interface Events {
EventReply event(String namespace, String topic)
EventReply event(String namespace, String topic, data)
@@ -27,11 +31,9 @@ interface Events {
void eventAsyncWithCallback(String namespace, String topic, Closure callback)
void eventAsyncWithCallback(String namespace, String topic, data, Closure callback)
void eventAsyncWithCallback(String namespace, String topic, data, Closure callback, Map params)
-
- int removeListeners(String callbackId)
- int countListeners(String callbackId)
// We have to use a list here as [] and ... were failing to compile for some WTF reason - MP
- Object[] waitFor(List<EventReply> replies)
-
+ Object[] waitFor(EventReply[] replies) throws ExecutionException, InterruptedException, TimeoutException
+ Object[] waitFor(long l, TimeUnit timeUnit, EventReply[] replies) throws ExecutionException, InterruptedException, TimeoutException
+
}
View
28 src/groovy/org/grails/plugin/platform/events/EventsImpl.groovy
@@ -31,6 +31,9 @@ import org.grails.plugin.platform.util.PluginUtils
import org.springframework.context.ApplicationContext
import java.lang.reflect.Method
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeoutException
class EventsImpl implements Events {
@@ -82,36 +85,31 @@ class EventsImpl implements Events {
on { String topic, Closure callback ->
self.grailsEventsRegistry.on(APP_NAMESPACE, topic, callback)
}
- copyFrom(self.grailsEventsPublisher, 'waitFor')
+ copyFrom(self, 'waitFor')
copyFrom(self.grailsEventsRegistry, ['on', 'removeListeners', 'countListeners'])
}
}
- // We have to use a list here as [] and ... were failing to compile for some WTF reason - MP
- Object[] waitFor(List<EventReply> replies) {
- grailsEventsPublisher.waitFor(replies)
+ Object[] waitFor(long l, TimeUnit timeUnit, EventReply... replies) throws ExecutionException, InterruptedException, TimeoutException {
+ for (reply in replies) {
+ reply?.get(l, timeUnit)
+ }
+ replies
}
- String on(String topic, Closure callback) {
- grailsEventsRegistry.on(namespace, topic, callback)
+ Object[] waitFor(EventReply... replies) throws ExecutionException, InterruptedException, TimeoutException {
+ waitFor(-1l, TimeUnit.NANOSECONDS, replies)
}
- int removeListeners(String callbackId) {
- grailsEventsRegistry.removeListeners(callbackId)
+ EventReply event(String namespace, String topic, data) {
+ event(namespace, topic, data, null)
}
- int countListeners(String callbackId) {
- grailsEventsRegistry.countListeners(callbackId)
- }
EventReply event(String namespace, String topic) {
event(namespace, topic, null, null)
}
- EventReply event(String namespace, String topic, data) {
- event(namespace, topic, data, null)
- }
-
EventReply event(String namespace, String topic, data, Map params) {
if (log.debugEnabled) {
log.debug "Sending event of namespace [$namespace] and topic [$topic] with data [${data}] and params [${params}]"
View
11 src/java/org/grails/plugin/platform/events/publisher/DefaultEventsPublisher.java
@@ -25,9 +25,7 @@
import org.grails.plugin.platform.events.registry.DefaultEventsRegistry;
import org.springframework.core.task.AsyncTaskExecutor;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
/**
* @author Stephane Maldini <smaldini@doc4web.com>
@@ -86,13 +84,6 @@ public void run() {
});
}
- public EventReply[] waitFor(EventReply... replies) throws ExecutionException, InterruptedException {
- for (EventReply reply : replies) {
- if (reply != null) reply.get();
- }
- return replies;
- }
-
//INTERNAL
private class Callback implements Callable<DefaultEventsRegistry.InvokeResult> {
View
5 src/java/org/grails/plugin/platform/events/publisher/EventsPublisher.java
@@ -21,8 +21,6 @@
import org.grails.plugin.platform.events.EventMessage;
import org.grails.plugin.platform.events.EventReply;
-import java.util.concurrent.ExecutionException;
-
/**
* @author Stephane Maldini <smaldini@doc4web.com>
* @version 1.0
@@ -34,10 +32,7 @@
*/
public interface EventsPublisher {
- public final static String GORM_EVENT_SOURCE = "gorm";
-
public EventReply event(final EventMessage event);
public EventReply eventAsync(final EventMessage event);
public void eventAsync(final EventMessage event, Closure onComplete);
- public Object[] waitFor(EventReply... replies) throws ExecutionException, InterruptedException;
}

0 comments on commit 1adfdd1

Please sign in to comment.
Something went wrong with that request. Please try again.