Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Improve agent disposal #17

Merged
merged 2 commits into from

2 participants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
19 drools-mas-core/src/main/java/org/drools/mas/core/DroolsAgent.java
@@ -120,15 +120,22 @@ public void dispose() {
GridServiceDescription<GridNode> nGsd = null;
if ( node == null ) {
nGsd = grid.get( WhitePages.class ).lookup( sessionLoc.getNodeId() );
+ if ( nGsd != null ) {
GridConnection<GridNode> conn = grid.get( ConnectionFactoryService.class ).createConnection( nGsd );
- node = conn.connect();
-
- StatefulKnowledgeSession ksession = node.get( sessionLoc.getSessionId(), StatefulKnowledgeSession.class );
- ksession.dispose();
- node.dispose();
-
+ node = conn.connect();
+ if ( logger.isDebugEnabled() ) {
+ logger.debug(" ### \t Session " + sessionLoc + " found in " + node.getId() + " >> local = " + node.isRemote() + " ( proxy = " + node.isRemote() + " ) " );
+ }
+
+ StatefulKnowledgeSession ksession = node.get( sessionLoc.getSessionId(), StatefulKnowledgeSession.class );
+ ksession.dispose();
+ }
} else {
// it's a local node ( triple check! ), so we just shut the KS down. The node will be disposed later
+ if ( logger.isDebugEnabled() ) {
+ logger.debug(" ### \t Session " + sessionLoc + " found in " + node.getId() + " >> local = " + node.isRemote() + " ( proxy = " + node.isRemote() + " ) " );
+ }
+
StatefulKnowledgeSession ksession = node.get( sessionLoc.getSessionId(), StatefulKnowledgeSession.class );
ksession.dispose();
}
View
15 drools-mas-core/src/main/java/org/drools/mas/core/DroolsAgentFactory.java
@@ -15,6 +15,7 @@
*/
package org.drools.mas.core;
+import org.drools.grid.remote.StatefulKnowledgeSessionRemoteClient;
import org.drools.mas.util.helper.NodeLocator;
import org.drools.runtime.StatefulKnowledgeSession;
@@ -113,12 +114,14 @@ public DroolsAgent spawn( DroolsAgentConfiguration config ) {
SessionManager sm = SessionManager.create( config, descr, grid, true );
StatefulKnowledgeSession mindSet = sm.getStatefulKnowledgeSession();
-// try{
-// mindSet.setGlobal( "grid", grid );
-// } catch (Exception e){
-// //maybe 'grid' is not even defined in subsession
-// logger.debug("Global 'grid' not set on session '"+descr.getSessionId()+"' due to "+e.getMessage());
-// }
+ if ( ! ( mindSet instanceof StatefulKnowledgeSessionRemoteClient) ) {
+ try{
+ mindSet.setGlobal( "grid", grid );
+ } catch (Exception e){
+ //maybe 'grid' is not even defined in subsession
+ logger.debug("Global 'grid' not set on session '"+descr.getSessionId()+"' due to "+e.getMessage());
+ }
+ }
mindSet.fireAllRules();
View
72 drools-mas-core/src/main/java/org/drools/mas/core/SessionManager.java
@@ -16,6 +16,10 @@
package org.drools.mas.core;
import org.drools.agent.KnowledgeAgent;
+import org.drools.agent.KnowledgeAgentConfiguration;
+import org.drools.agent.KnowledgeAgentFactory;
+import org.drools.agent.conf.NewInstanceOption;
+import org.drools.agent.conf.UseKnowledgeBaseClassloaderOption;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.ResourceType;
import org.drools.conf.EventProcessingOption;
@@ -141,10 +145,6 @@ public static GridNode createNode( String nodeId, Grid grid, int port, boolean f
// should indicate that someone else left an orphan row in the WP
node = grid.claimGridNode( nodeId );
grid.get( SocketService.class ).addService( nodeId, port, node );
-
- if ( forceRemote && ! node.isRemote() ) {
- node = grid.asRemoteNode( node );
- }
}
@@ -154,10 +154,6 @@ public static GridNode createNode( String nodeId, Grid grid, int port, boolean f
}
node = createLocalNode( grid, nodeId );
grid.get( SocketService.class ).addService( nodeId, port, node );
-
- if ( forceRemote && ! node.isRemote() ) {
- node = grid.asRemoteNode( node );
- }
}
} else {
@@ -193,6 +189,9 @@ protected SessionManager( String id, KnowledgeBase kbase, GridNode node ) {
// this.kSession = kAgent.getKnowledgeBase().newStatefulKnowledgeSession(conf, null);
this.kSession = kbase.newStatefulKnowledgeSession( conf, null );
+ if ( ! node.isRemote() ) {
+ addKnowledgeAgent( id, kbase, node );
+ }
//this.kSession.insert(new SessionLocator(node.getId(), id));
if ( logger.isInfoEnabled() ) {
@@ -202,6 +201,51 @@ protected SessionManager( String id, KnowledgeBase kbase, GridNode node ) {
}
+ private void addKnowledgeAgent(String id, KnowledgeBase kbase, GridNode node) {
+ KnowledgeAgentConfiguration kaConfig = KnowledgeAgentFactory.newKnowledgeAgentConfiguration();
+ kaConfig.setProperty( NewInstanceOption.PROPERTY_NAME, "false" );
+ kaConfig.setProperty( UseKnowledgeBaseClassloaderOption.PROPERTY_NAME, "true" );
+ KnowledgeAgent kagent = KnowledgeAgentFactory.newKnowledgeAgent( id, kbase, kaConfig );
+ SystemEventListener systemEventListener = new SystemEventListener() {
+
+ public void info(String string) {
+ System.out.println("INFO: "+string);
+ }
+
+ public void info(String string, Object o) {
+ System.out.println("INFO: "+string +", "+o);
+ }
+
+ public void warning(String string) {
+ System.out.println("WARN: "+string );
+ }
+
+ public void warning(String string, Object o) {
+ System.out.println("WARN: "+string +", "+o);
+ }
+
+ public void exception(String string, Throwable thrwbl) {
+ System.out.println("EXCEPTION: "+string +", "+thrwbl);
+ }
+
+ public void exception(Throwable thrwbl) {
+ System.out.println("EXCEPTION: "+thrwbl);
+ }
+
+ public void debug(String string) {
+ System.out.println("DEBUG: "+string );
+ }
+
+ public void debug(String string, Object o) {
+ System.out.println("DEBUG: "+string +", "+o);
+ }
+ };
+
+ kagent.setSystemEventListener( systemEventListener );
+
+ node.set( id + "_kAgent", kagent );
+ }
+
private static KnowledgeBase buildKnowledgeBase( String changeset, GridNode node ) throws IOException, SAXException, IllegalStateException {
if ( logger.isDebugEnabled() ) {
logger.debug( " ### SessionManager : CREATING kbase with CS: " + changeset );
@@ -247,7 +291,7 @@ private static KnowledgeBase buildKnowledgeBase( String changeset, GridNode node
KnowledgeBuilderErrors errors = kbuilder.getErrors();
if ( errors != null && errors.size() > 0 ) {
for ( KnowledgeBuilderError error : errors ) {
- logger.error( "### Session Manager: Error: " + error.getMessage() );
+ logger.error( "### Session Manager: Error: " + error );
logger.error( "### >>> " + error.getResource() + " @ " + Arrays.toString( error.getLines() ) );
}
throw new IllegalStateException( " ### Session Manager: There were errors during the knowledge compilation ^^^^ !" );
@@ -289,7 +333,15 @@ public static void addResource( Grid grid, String nodeId, String sessionId, Stri
Resource changeSetRes = new ByteArrayResource( changeSetString.getBytes() );
((InternalResource) changeSetRes).setResourceType( ResourceType.CHANGE_SET );
//resources.put(id, res);
- KnowledgeAgent kAgent = GridHelper.getKnowledgeAgentRemoteClient( grid, nodeId, sessionId );
+
+ KnowledgeAgent kAgent;
+ GridNode node = grid.getGridNode( nodeId );
+ if ( node == null || node.isRemote() ) {
+ kAgent = GridHelper.getKnowledgeAgentRemoteClient( grid, nodeId, sessionId );
+ } else {
+ kAgent = node.get( sessionId + "_kAgent", KnowledgeAgent.class );
+ }
+
kAgent.applyChangeSet( changeSetRes );
} catch (IOException ex) {
logger.error( " ### SessionManager: " + ex);
Something went wrong with that request. Please try again.