Permalink
Browse files

- initial multi-taskExecutor support

- fix non hibernate gorm events handling
- set version to M6.1
  • Loading branch information...
1 parent d284cd1 commit 7faafc9d44da476f480c4ee1287675d3b317d157 @smaldini smaldini committed Oct 11, 2012
@@ -24,7 +24,7 @@ import org.grails.plugin.platform.events.registry.DefaultEventsRegistry
class PlatformCoreGrailsPlugin {
// the plugin version
- def version = "1.0.M7-SNAPSHOT"
+ def version = "1.0.M6.1"
// the version or versions of Grails the plugin is designed for
def grailsVersion = "1.3 > *"
@@ -156,8 +156,7 @@ Grails Plugin Platform Core APIs
grailsEventsRegistry(DefaultEventsRegistry)
grailsEventsPublisher(DefaultEventsPublisher) {
grailsEventsRegistry = ref('grailsEventsRegistry')
- taskExecutor = ref('grailsTopicExecutor')
- if(manager.hasGrailsPlugin('hibernate'))
+ if(getBeanDefinition('persistenceInterceptor'))
persistenceInterceptor = ref("persistenceInterceptor")
catchFlushExceptions = config.events.catchFlushExceptions
}
View
@@ -1,5 +1,5 @@
#Grails Metadata file
-#Mon Jul 16 16:23:03 BST 2012
-app.grails.version=2.1.0
+#Wed Sep 26 21:18:21 BST 2012
+app.grails.version=2.2.0.RC1
app.name=PlatformCore
plugins.rest-client-builder=1.0.2
@@ -144,10 +144,10 @@ class EventsImpl implements Events {
log.debug "Sending event of namespace [$namespace] and topic [$topic] with data [${data}] and params [${params}]"
}
def reply
- callback = callback ?: params?.get(EventsPublisher.ON_REPLY)
+ callback = callback ?: params?.get(EventsPublisher.ON_REPLY) as Closure
if (params?.containsKey(EventsPublisher.FORK) && !params.remove(EventsPublisher.FORK)) {
reply = grailsEventsPublisher.event(eventMessage)
- reply.onError = params?.get(EventsPublisher.ON_ERROR)
+ reply.onError = params?.get(EventsPublisher.ON_ERROR) as Closure
reply.throwError()
callback?.call(reply)
} else
@@ -159,7 +159,7 @@ class EventsImpl implements Events {
}
EventMessage buildEvent(String pluginName, String namespace, String topic, data, Map params) {
- boolean gormSession = params?.containsKey(EventsPublisher.GORM) ? params.remove(EventsPublisher.GORM) : true
+ boolean gormSession = params?.containsKey(EventsPublisher.GORM) ? params.remove(EventsPublisher.GORM) as boolean : true
namespace = params?.remove(EventsPublisher.NAMESPACE) ?: namespace
checkNamespace pluginName, namespace
@@ -205,7 +205,7 @@ class EventsImpl implements Events {
null
}
- void registerListeners(Collection<Class<?>> serviceClasses) {
+ void registerListeners(Collection<Class> serviceClasses) {
// grailsEventsDispatcher.scanClassForMappings(serviceClass)
def bean
eachListener(serviceClasses) {String namespace, boolean hasInlineNamespace,
@@ -260,12 +260,12 @@ class EventsImpl implements Events {
}
void loadDSL(Class dslClass) {
- Script dslInstance = dslClass.newInstance()
+ Script dslInstance = dslClass.newInstance() as Script
dslInstance.binding.setVariable("grailsApplication", grailsApplication)
dslInstance.binding.setVariable("ctx", grailsApplication.mainContext)
dslInstance.binding.setVariable("config", grailsApplication.config)
dslInstance.run()
- def dsl = dslInstance.binding.getVariable('events')
+ def dsl = dslInstance.binding.getVariable('events') as Closure
if (dsl) {
registerEvents(dsl)
} else {
@@ -332,11 +332,11 @@ class EventsImpl implements Events {
if (filter) {
if (Closure.isAssignableFrom(filter.getClass())) {
- definition.filterClosure = filter
+ definition.filterClosure = filter as Closure
definition.filterEventMessage = EventMessage.isAssignableFrom(Closure.cast(filter).parameterTypes[0])
}
if (Class.isAssignableFrom(filter.getClass())) {
- definition.filterClass = filter
+ definition.filterClass = filter as Class
}
}
@@ -35,4 +35,5 @@ interface GormTopicSupport {
void processCancel(Object evt, Object returnValue)
String convertTopic(Object evt)
+ Object extractEntity(Object source)
}
@@ -20,8 +20,6 @@ package org.grails.plugin.platform.events.dispatcher;
//import org.grails.datastore.mapping.engine.event.AbstractPersistenceEvent;
-import org.springframework.context.ApplicationEvent
-
/**
* @author Stephane Maldini <smaldini@vmware.com>
* @version 1.0
@@ -40,4 +38,8 @@ class GormTopicSupport1X implements GormTopicSupport{
String convertTopic(Object evt) {
return null //To change body of implemented methods use File | Settings | File Templates.
}
+
+ Object extractEntity(Object source) {
+ return null //To change body of implemented methods use File | Settings | File Templates.
+ }
}
@@ -60,4 +60,9 @@ class GormTopicSupport2X implements GormTopicSupport {
null
}
+
+ Object extractEntity(Object source) {
+ //workaround for document db and hibernate gorm events
+ source.entityObject ?: source?.entityAccess?.entity
+ }
}
@@ -23,10 +23,13 @@
import org.grails.plugin.platform.events.EventMessage;
import org.grails.plugin.platform.events.EventReply;
import org.grails.plugin.platform.events.registry.DefaultEventsRegistry;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.AsyncTaskExecutor;
import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
* @author Stephane Maldini <smaldini@vmware.com>
@@ -39,10 +42,17 @@
*/
public class DefaultEventsPublisher implements EventsPublisher {
+
+ private static final String EXECUTOR = "executor";
+ private static final String DEFAULT_EXECUTOR = "grailsTopicExecutor";
+ private static final String QUEUE_EXECUTOR = "grailsP2PExecutor";
private final static Logger log = Logger.getLogger(DefaultEventsPublisher.class);
private DefaultEventsRegistry grailsEventsRegistry;
- protected AsyncTaskExecutor taskExecutor;
+
+ @Autowired
+ protected Map<String, AsyncTaskExecutor> taskExecutors;
+
private PersistenceContextInterceptor persistenceInterceptor;
private boolean catchFlushExceptions = false;
@@ -51,10 +61,6 @@ public void setCatchFlushExceptions(boolean catchFlushExceptions) {
this.catchFlushExceptions = catchFlushExceptions;
}
- public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
- this.taskExecutor = taskExecutor;
- }
-
public void setPersistenceInterceptor(PersistenceContextInterceptor persistenceInterceptor) {
this.persistenceInterceptor = persistenceInterceptor;
}
@@ -71,6 +77,10 @@ public EventReply event(EventMessage event) {
}
public EventReply eventAsync(final EventMessage event, final Map<String, Object> params) {
+ AsyncTaskExecutor taskExecutor = params != null && params.containsKey(EXECUTOR) ?
+ taskExecutors.get( params.get(EXECUTOR) ) :
+ taskExecutors.get(DEFAULT_EXECUTOR);
+
Future<DefaultEventsRegistry.InvokeResult> invokeResult =
taskExecutor.submit(new Callback(event));
@@ -17,7 +17,6 @@
*/
package org.grails.plugin.platform.events.publisher;
-import groovy.lang.Closure;
import org.grails.plugin.platform.events.EventMessage;
import org.grails.plugin.platform.events.EventReply;
@@ -43,5 +42,5 @@
public static final String HEADERS = "headers";
public EventReply event(final EventMessage event);
- public EventReply eventAsync(final EventMessage event, Map<String,Object> params);
+ public EventReply eventAsync(final EventMessage event, Map<String, Object> params);
}
@@ -18,16 +18,12 @@
package org.grails.plugin.platform.events.publisher;
import org.apache.log4j.Logger;
-import org.grails.plugin.platform.events.EventMessage;
import org.grails.plugin.platform.events.EventReply;
import org.grails.plugin.platform.events.Events;
import org.grails.plugin.platform.events.dispatcher.GormTopicSupport;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
-import org.springframework.util.ReflectionUtils;
-import java.io.Serializable;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -68,10 +64,7 @@ public void onApplicationEvent(ApplicationEvent applicationEvent) {
params.put(EventsPublisher.FORK, false);
EventReply reply = grailsEvents.event(GormTopicSupport.GORM_SOURCE, topic,
- ReflectionUtils.invokeMethod(
- ReflectionUtils.findMethod(applicationEvent.getClass(), "getEntityObject"),
- applicationEvent
- ), params);
+ gormTopicSupport.extractEntity(applicationEvent), params);
try {
gormTopicSupport.processCancel(applicationEvent, reply != null ? reply.getValues() : null);
} catch (Throwable e) {

0 comments on commit 7faafc9

Please sign in to comment.