Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add empty sub-nodes to an agent's structure #16

Merged
merged 2 commits into from

2 participants

@sotty
Owner

No description provided.

@esteban-aliverti esteban-aliverti merged commit 125b0db into droolsjbpm:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 1, 2012
  1. @sotty

    MAS: Helper classes

    sotty authored
  2. @sotty
This page is out of date. Refresh to see the latest.
View
15 drools-mas-core/src/main/java/org/drools/mas/core/DroolsAgentConfiguration.java
@@ -25,6 +25,7 @@
private String changeset;
private String responseInformer;
private List<SubSessionDescriptor> subSessions = new ArrayList<SubSessionDescriptor>();
+ private List<String> subNodes = new ArrayList<String>();
private String springContextFilePath;
private String defaultSubsessionChangeSet;
@@ -103,8 +104,18 @@ public int getPort() {
public void setPort(int port) {
this.port = port;
}
-
-
+
+ public List<String> getSubNodes() {
+ return subNodes;
+ }
+
+ public void setSubNodes( List<String> subNodes ) {
+ this.subNodes = subNodes;
+ }
+
+ public void addSubNode( String node ) {
+ this.subNodes.add( node );
+ }
public static class SubSessionDescriptor implements Serializable {
View
15 drools-mas-core/src/main/java/org/drools/mas/core/DroolsAgentFactory.java
@@ -15,6 +15,7 @@
*/
package org.drools.mas.core;
+import org.drools.mas.util.helper.NodeLocator;
import org.drools.runtime.StatefulKnowledgeSession;
@@ -103,6 +104,8 @@ public DroolsAgent spawn( DroolsAgentConfiguration config ) {
}
mind.setGlobal( "grid", grid );
+ mind.insert( new NodeLocator(config.getMindNodeLocation(), true ) );
+
for ( DroolsAgentConfiguration.SubSessionDescriptor descr : config.getSubSessions() ) {
if ( logger.isDebugEnabled() ) {
logger.debug( " ### Creating Agent Sub-Session: " + descr.getSessionId() + "- CS: " + descr.getChangeset() + " - on node: " + descr.getNodeId() );
@@ -122,8 +125,20 @@ public DroolsAgent spawn( DroolsAgentConfiguration config ) {
mindSet.insert( new SessionLocator( config.getMindNodeLocation(), config.getAgentId(), true, false ) );
mindSet.insert( new SessionLocator( descr.getNodeId(), descr.getSessionId(), false, true ) );
mind.insert( new SessionLocator( descr.getNodeId(), descr.getSessionId(), false, true ) );
+ mind.insert( new NodeLocator( descr.getNodeId(), true ) );
}
+
+ if ( config.getSubNodes().size() > 0 ) {
+ for ( String node : config.getSubNodes() ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( " ### Creating Additional Node: " + node );
+ }
+ SessionManager.createNode( node, grid, config.getPort(), true );
+ mind.insert( new NodeLocator( node, false ) );
+ }
+ }
+
mind.insert( aid );
//Insert configuration as a fact inside the mind session
mind.insert( config );
View
172 drools-mas-core/src/main/java/org/drools/mas/core/SessionManager.java
@@ -32,6 +32,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
@@ -59,7 +60,7 @@
public static SessionManager create( DroolsAgentConfiguration conf, DroolsAgentConfiguration.SubSessionDescriptor subDescr, Grid grid, boolean forceRemote ) {
return create( null, conf, subDescr, grid, forceRemote );
}
-
+
public static SessionManager create( String sessionId, DroolsAgentConfiguration conf, DroolsAgentConfiguration.SubSessionDescriptor subDescr, Grid grid, boolean forceRemote ) {
String id;
String changeset;
@@ -82,55 +83,63 @@ public static SessionManager create( String sessionId, DroolsAgentConfiguration
int port = conf.getPort();
try {
- GridNode node = grid.getGridNode( nodeId );
- if ( node == null ) {
- // the node is not "local"
+ GridNode node = createNode( nodeId, grid, port, forceRemote );
+ String cs = changeset != null ?
+ changeset :
+ ( conf.getDefaultSubsessionChangeSet() != null ?
+ conf.getDefaultSubsessionChangeSet() :
+ DEFAULT_CHANGESET
+ );
+
+ return new SessionManager( id, buildKnowledgeBase( cs, node ), node );
+ } catch ( SAXException ex ) {
+ ex.printStackTrace();
+ } catch ( IOException ioe ) {
+ ioe.printStackTrace();
+ } catch ( IllegalStateException ise ) {
+ logger.error( " FATAL : Could not create a Knowledge Base " );
+ ise.printStackTrace();
+ }
+ return null;
+ }
+
+ public static GridNode createNode( String nodeId, Grid grid, int port, boolean forceRemote ) {
+ GridNode node = grid.getGridNode( nodeId );
+ if ( node == null ) {
+ // the node is not "local"
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( " ### Session Manager: Looking for Remote Node: " + nodeId );
+ }
+ GridServiceDescription<GridNode> n1Gsd = grid.get( WhitePages.class ).lookup( nodeId );
+ if ( n1Gsd != null ) {
if ( logger.isDebugEnabled() ) {
- logger.debug( " ### Session Manager: Looking for Remote Node: " + nodeId );
+ logger.debug( " ### Session Manager: Remote Node Descriptor Found: " + n1Gsd );
}
- GridServiceDescription<GridNode> n1Gsd = grid.get( WhitePages.class ).lookup( nodeId );
- if ( n1Gsd != null ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug( " ### Session Manager: Remote Node Descriptor Found: " + n1Gsd );
- }
- if ( grid.getId().equals( n1Gsd.getOwnerGridId() ) && GridNode.class.equals( n1Gsd.getServiceInterface() ) ) {
- // the node is theoretically owned by this grid, but is not among the local nodes - it's usually due to a stale WP record, claim it by right!
- node = grid.claimGridNode( nodeId );
- grid.get( SocketService.class ).addService( nodeId, port, node );
- } else if ( n1Gsd.getAddresses().size() > 0 ) {
- Address socketAddress = n1Gsd.getAddresses().get("socket");
- if ( socketAddress.getObject() instanceof InetSocketAddress ) {
- // node exists and is alive
- GridConnection<GridNode> conn = grid.get( ConnectionFactoryService.class ).createConnection( n1Gsd );
- if ( logger.isDebugEnabled() ) {
- logger.debug( " ### Session Manager: Opened connection to node: " + conn );
- }
- // node exists and is hopefully alive
- node = conn.connect();
- if ( logger.isDebugEnabled() ) {
- logger.debug( " ### Session Manager: Got in touch with the node: " + node + ", now look at service " + grid.get( SocketService.class ) );
- logger.debug( n1Gsd.toString() + " // " + n1Gsd.getAddresses() );
- }
+ if ( grid.getId().equals( n1Gsd.getOwnerGridId() ) && GridNode.class.equals( n1Gsd.getServiceInterface() ) ) {
+ // the node is theoretically owned by this grid, but is not among the local nodes - it's usually due to a stale WP record, claim it by right!
+ node = grid.claimGridNode( nodeId );
+ grid.get( SocketService.class ).addService( nodeId, port, node );
+ } else if ( n1Gsd.getAddresses().size() > 0 ) {
+ Address socketAddress = n1Gsd.getAddresses().get("socket");
+ if ( socketAddress.getObject() instanceof InetSocketAddress ) {
+ // node exists and is alive
+ GridConnection<GridNode> conn = grid.get( ConnectionFactoryService.class ).createConnection( n1Gsd );
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( " ### Session Manager: Opened connection to node: " + conn );
}
- }
-
- if ( node == null ) {
- // should indicate that someone else left an orphan row in the WP
- node = grid.claimGridNode( nodeId );
- grid.get( SocketService.class ).addService( nodeId, port, node );
-
- if ( forceRemote && ! node.isRemote() ) {
- node = grid.asRemoteNode( node );
+ // node exists and is hopefully alive
+ node = conn.connect();
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( " ### Session Manager: Got in touch with the node: " + node + ", now look at service " + grid.get( SocketService.class ) );
+ logger.debug( n1Gsd.toString() + " // " + n1Gsd.getAddresses() );
}
}
+ }
-
- } else {
- if ( logger.isDebugEnabled() ) {
- logger.debug( " ### Session Manager: Creating a new Local Node" );
- }
- node = createLocalNode( grid, nodeId );
+ if ( node == null ) {
+ // should indicate that someone else left an orphan row in the WP
+ node = grid.claimGridNode( nodeId );
grid.get( SocketService.class ).addService( nodeId, port, node );
if ( forceRemote && ! node.isRemote() ) {
@@ -138,32 +147,37 @@ public static SessionManager create( String sessionId, DroolsAgentConfiguration
}
}
+
} else {
if ( logger.isDebugEnabled() ) {
- logger.debug( " ### Session Manager: I have already found a local node ! " );
+ logger.debug( " ### Session Manager: Creating a new Local Node" );
+ }
+ node = createLocalNode( grid, nodeId );
+ grid.get( SocketService.class ).addService( nodeId, port, node );
+
+ if ( forceRemote && ! node.isRemote() ) {
+ node = grid.asRemoteNode( node );
}
}
- return new SessionManager( id, buildKnowledgeBase(
- changeset != null ?
- changeset :
- ( conf.getDefaultSubsessionChangeSet() != null ?
- conf.getDefaultSubsessionChangeSet() :
- DEFAULT_CHANGESET
- ),
- node ), node );
- } catch ( SAXException ex ) {
- ex.printStackTrace();
- } catch ( IOException ioe ) {
- ioe.printStackTrace();
- } catch ( IllegalStateException ise ) {
- logger.error( " FATAL : Could not create a Knowledge Base " );
- ise.printStackTrace();
+ } else {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( " ### Session Manager: I have already found a local node ! " );
+ }
}
- return null;
+
+ node = applyLocality( grid, node, forceRemote );
+ return node;
}
+ private static GridNode applyLocality( Grid grid, GridNode node, boolean forceRemote ) {
+ if ( forceRemote ) {
+ return node.isRemote() ? node : grid.asRemoteNode( node );
+ } else {
+ return node;
+ }
+ }
protected SessionManager( String id, KnowledgeBase kbase, GridNode node ) {
super();
@@ -176,7 +190,7 @@ protected SessionManager( String id, KnowledgeBase kbase, GridNode node ) {
KnowledgeSessionConfiguration conf = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setProperty( ClockTypeOption.PROPERTY_NAME, ClockType.REALTIME_CLOCK.toExternalForm() );
-
+
// this.kSession = kAgent.getKnowledgeBase().newStatefulKnowledgeSession(conf, null);
this.kSession = kbase.newStatefulKnowledgeSession( conf, null );
@@ -185,6 +199,7 @@ protected SessionManager( String id, KnowledgeBase kbase, GridNode node ) {
logger.info( " ### SessionManager : Registering session " + id + " in node: " + node.getId() );
}
node.set( id, this.kSession );
+
}
private static KnowledgeBase buildKnowledgeBase( String changeset, GridNode node ) throws IOException, SAXException, IllegalStateException {
@@ -201,17 +216,18 @@ private static KnowledgeBase buildKnowledgeBase( String changeset, GridNode node
} catch ( Throwable t ) {
throw new IllegalStateException( "Could not init the KnowledgeBuilder : " + t.getMessage() );
}
+
SemanticModules semanticModules = new SemanticModules();
semanticModules.addSemanticModule( new ChangeSetSemanticModule() );
XmlChangeSetReader reader = new XmlChangeSetReader( semanticModules );
//InputStream inputStream = new ClassPathResource(changeset, SessionManager.class).getInputStream();
reader.setClassLoader( SessionManager.class.getClassLoader(),
- null );
+ null );
ChangeSet cs = reader.read( SessionManager.class.getClassLoader().getResourceAsStream( changeset ) );
Collection<Resource> resourcesAdded = cs.getResourcesAdded();
for( Resource res: resourcesAdded ){
-
+
String file = ( (InternalResource) res ).getURL().getFile();
if ( ! file.contains( "file:" ) ) {
file = "file:" + file;
@@ -222,17 +238,17 @@ private static KnowledgeBase buildKnowledgeBase( String changeset, GridNode node
logger.info( "Resource: " + res + " file: " + file );
InputStream inputStream = new UrlResource( file ).getInputStream();
byte[] bytes = IOUtils.toByteArray( inputStream );
-
+
kbuilder.add( new ByteArrayResource( bytes ), ( (InternalResource) res ).getResourceType() );
}
-
-
-
+
+
+
KnowledgeBuilderErrors errors = kbuilder.getErrors();
if ( errors != null && errors.size() > 0 ) {
for ( KnowledgeBuilderError error : errors ) {
logger.error( "### Session Manager: Error: " + error.getMessage() );
-
+ logger.error( "### >>> " + error.getResource() + " @ " + Arrays.toString( error.getLines() ) );
}
throw new IllegalStateException( " ### Session Manager: There were errors during the knowledge compilation ^^^^ !" );
}
@@ -263,13 +279,13 @@ public static void addResource( Grid grid, String nodeId, String sessionId, Stri
if(logger.isDebugEnabled()){
logger.debug(" ### Session Manager: Add Resource -> nodeId: "+nodeId +" - sessionId: "+sessionId +" - id: "+id+" - res: "+((InternalResource)res).getURL().toString() +" - type: "+((InternalResource)res).getResourceType().getName());
}
- String changeSetString = "<change-set xmlns='http://drools.org/drools-5.0/change-set'>"
+ String changeSetString = "<change-set xmlns='http://drools.org/drools-5.0/change-set'>"
+ "<add>"
+ "<resource type=\""+((InternalResource)res).getResourceType().getName()+"\" source=\""+((InternalResource)res).getURL().toString()+"\" />"
+ "</add>"
+ "</change-set>"
+ "";
-
+
Resource changeSetRes = new ByteArrayResource( changeSetString.getBytes() );
((InternalResource) changeSetRes).setResourceType( ResourceType.CHANGE_SET );
//resources.put(id, res);
@@ -312,9 +328,9 @@ public static void addResource( Grid grid, String nodeId, String sessionId, Stri
// }
// }
-
-
+
+
private static GridNode createLocalNode( Grid grid, String nodeName ) {
if ( logger.isDebugEnabled() ) {
@@ -323,11 +339,11 @@ private static GridNode createLocalNode( Grid grid, String nodeName ) {
GridNode localNode = grid.createGridNode( nodeName );
return localNode;
}
-
- public static void addResource( WorkingMemory ksession, ResourceDescriptor rd){
+
+ public static void addResource( WorkingMemory ksession, ResourceDescriptor rd ){
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(ResourceFactory.newUrlResource(rd.getResourceURL()), rd.getType());
-
+
if (kbuilder.hasErrors()){
Iterator<KnowledgeBuilderError> iterator = kbuilder.getErrors().iterator();
while (iterator.hasNext()) {
@@ -335,14 +351,14 @@ public static void addResource( WorkingMemory ksession, ResourceDescriptor rd){
logger.debug( " ### Session Manager: Error compiling resource '"+rd.getResourceURL()+"': " + knowledgeBuilderError.getMessage() );
}
}
-
+
org.drools.rule.Package[] packages = new org.drools.rule.Package[kbuilder.getKnowledgePackages().size()];
int i=0;
for (KnowledgePackage knowledgePackage : kbuilder.getKnowledgePackages()) {
packages[i++] = ((KnowledgePackageImp)knowledgePackage).pkg;
}
-
+
ksession.getRuleBase().addPackages(packages);
-
+
}
}
View
11 drools-mas-core/src/main/resources/org/drools/mas/acl_content_based_routing.drl
@@ -29,6 +29,7 @@ import java.util.Map;
import org.drools.mas.util.LoggerHelper;
import org.drools.mas.core.DroolsAgentConfiguration;
import org.drools.mas.util.helper.SessionLocator;
+import org.drools.mas.util.helper.NodeLocator;
import org.drools.grid.helper.GridHelper;
import org.drools.mas.util.helper.NoHandlerFault;
@@ -65,9 +66,17 @@ then
insert( new Destination( $mid, $target ) );
end
+rule "Implicit nodes"
+when
+ SessionLocator( $isMind : mind, $nid : nodeId )
+ not NodeLocator( mindNode == $isMind, nodeId == $nid )
+then
+ insert( new NodeLocator( $nid, $isMind ) );
+end
+
query subLocation( String $target )
- SessionLocator( mind == false, $target := nodeId )
+ NodeLocator( mindNode == false, $target := nodeId )
end
rule "Generate Session"
View
1  drools-mas-core/src/test/java/org/drools/mas/core/tests/SpringAgentTest.java
@@ -184,6 +184,7 @@ public void helloAgentsSmithsUpAndDown() {
@Test
+ @Ignore
public void testNodeSessionSharedHosting() {
ApplicationContext context;
DroolsAgent a1;
View
34 drools-mas-util/src/main/java/org/drools/mas/util/EndPointHelper.java
@@ -0,0 +1,34 @@
+package org.drools.mas.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Properties;
+
+public class EndPointHelper {
+
+ public static final String DEF_SRC = "/META-INF/service.endpoint.properties";
+
+ public static String getEndPoint( String key ) {
+ Properties props = new Properties();
+ try {
+ InputStream in = EndPointHelper.class.getResourceAsStream( DEF_SRC );
+ if ( in != null ) {
+ props.load( in );
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return (String) props.get( key );
+ }
+
+ public static URL getEndPointURL( String key ) {
+ try {
+ return new URL( getEndPoint( key ) );
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+}
View
74 drools-mas-util/src/main/java/org/drools/mas/util/helper/NodeLocator.java
@@ -0,0 +1,74 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.drools.mas.util.helper;
+
+import org.drools.definition.type.Position;
+
+import java.io.Serializable;
+
+/**
+ *
+ * @author salaboy
+ */
+public class NodeLocator implements Serializable {
+
+ @Position(0)
+ private String nodeId;
+
+ @Position(1)
+ private boolean mindNode;
+
+
+ public NodeLocator( String nodeId, boolean mindNode ) {
+ this.nodeId = nodeId;
+ this.mindNode = mindNode;
+ }
+
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public boolean isMindNode() {
+ return mindNode;
+ }
+
+ public void setMindNode(boolean mindNode) {
+ this.mindNode = mindNode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ NodeLocator that = (NodeLocator) o;
+
+ if (mindNode != that.mindNode) return false;
+ if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = nodeId != null ? nodeId.hashCode() : 0;
+ result = 31 * result + (mindNode ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "NodeLocator{" +
+ "nodeId='" + nodeId + '\'' +
+ ", mindNode=" + mindNode +
+ '}';
+ }
+}
+
Something went wrong with that request. Please try again.