Skip to content

Commit

Permalink
[BZ-1185691] fix concurrent exection on a StatelessKieSession
Browse files Browse the repository at this point in the history
  • Loading branch information
mariofusco committed Jan 28, 2015
1 parent 2d86a32 commit 2bd608b
Showing 1 changed file with 23 additions and 32 deletions.
Expand Up @@ -86,13 +86,10 @@ public class StatelessKnowledgeSessionImpl extends AbstractRuntime
private AgendaEventSupport agendaEventSupport = new AgendaEventSupport();
private RuleRuntimeEventSupport ruleRuntimeEventSupport = new RuleRuntimeEventSupport();
private ProcessEventSupport processEventSupport = new ProcessEventSupport();
private boolean initialized;

private KieSessionConfiguration conf;
private Environment environment;

private transient StatefulKnowledgeSession ksession;

private WorkingMemoryFactory wmFactory;

public StatelessKnowledgeSessionImpl() {
Expand Down Expand Up @@ -121,30 +118,26 @@ public KnowledgeAgent getKnowledgeAgent() {
}

public StatefulKnowledgeSession newWorkingMemory() {
if (ksession != null && ((StatefulKnowledgeSessionImpl)ksession).isAlive()) {
return ksession;
}
if (this.kagent != null) {
// if we have an agent always get the rulebase from there
this.kBase = (InternalKnowledgeBase) this.kagent.getKnowledgeBase();
}
this.kBase.readLock();
try {
ksession = (StatefulKnowledgeSession) wmFactory.createWorkingMemory(this.kBase.nextWorkingMemoryCounter(), this.kBase,
(SessionConfiguration) this.conf, this.environment);

StatefulKnowledgeSession ksession = (StatefulKnowledgeSession) wmFactory.createWorkingMemory(this.kBase.nextWorkingMemoryCounter(),
this.kBase,
(SessionConfiguration) this.conf,
this.environment);
StatefulKnowledgeSessionImpl ksessionImpl = (StatefulKnowledgeSessionImpl) ksession;

// we don't pass the mapped listener wrappers to the session constructor anymore,
// because they would be ignored anyway, since the wm already contains those listeners

((Globals) ksessionImpl.getGlobalResolver()).setDelegate(this.sessionGlobals);
if (!initialized) {
// copy over the default generated listeners that are used for internal stuff once
registerSystemListeners(ksessionImpl);
registerCustomListeners();
initialized = true;
}

// copy over the default generated listeners that are used for internal stuff once
registerSystemListeners(ksessionImpl);
registerCustomListeners();

ksessionImpl.setAgendaEventSupport( this.agendaEventSupport );
ksessionImpl.setRuleRuntimeEventSupport(this.ruleRuntimeEventSupport);
Expand Down Expand Up @@ -260,7 +253,7 @@ public void addEventListener(ProcessEventListener listener) {
this.cachedProcessEventListener = new HashSet<ProcessEventListener>();
}
this.cachedProcessEventListener.add(listener);
this.processEventSupport.addEventListener( listener );
this.processEventSupport.addEventListener(listener);
}

public Collection<ProcessEventListener> getProcessEventListeners() {
Expand All @@ -276,8 +269,8 @@ public void removeEventListener(ProcessEventListener listener) {

public void setGlobal(String identifier,
Object value) {
this.sessionGlobals.setGlobal( identifier,
value );
this.sessionGlobals.setGlobal(identifier,
value);
}

public Globals getGlobals() {
Expand All @@ -287,12 +280,12 @@ public Globals getGlobals() {
@Override
public void registerChannel(String name,
Channel channel) {
this.channels.put( name, channel );
this.channels.put(name, channel);
}

@Override
public void unregisterChannel(String name) {
this.channels.remove( name );
this.channels.remove(name);
}

@Override
Expand All @@ -302,11 +295,11 @@ public Map<String, Channel> getChannels() {

@Override
public KieBase getKieBase() {
return newWorkingMemory().getKieBase();
return getKnowledgeBase();
}

public <T> T execute(Command<T> command) {
newWorkingMemory();
StatefulKnowledgeSession ksession = newWorkingMemory();

FixedKnowledgeCommandContext context = new FixedKnowledgeCommandContext( new ContextImpl( "ksession",
null ),
Expand Down Expand Up @@ -342,35 +335,35 @@ public <T> T execute(Command<T> command) {
}
} finally {
((StatefulKnowledgeSessionImpl) ksession).endBatchExecution();
dispose();
dispose(ksession);
}
}

public void execute(Object object) {
newWorkingMemory();
StatefulKnowledgeSession ksession = newWorkingMemory();
try {
ksession.insert( object );
ksession.fireAllRules();
} finally {
dispose();
dispose(ksession);
}
}

public void execute(Iterable objects) {
newWorkingMemory();
StatefulKnowledgeSession ksession = newWorkingMemory();
try {
for ( Object object : objects ) {
ksession.insert( object );
}
ksession.fireAllRules();
} finally {
dispose();
dispose(ksession);
}
}

public List executeWithResults(Iterable objects, ObjectFilter filter) {
List list = new ArrayList();
newWorkingMemory();
StatefulKnowledgeSession ksession = newWorkingMemory();
try {
for ( Object object : objects ) {
ksession.insert( object );
Expand All @@ -380,7 +373,7 @@ public List executeWithResults(Iterable objects, ObjectFilter filter) {
list.add(((InternalFactHandle) fh).getObject());
}
} finally {
dispose();
dispose(ksession);
}
return list;
}
Expand All @@ -389,7 +382,7 @@ public Environment getEnvironment() {
return environment;
}

protected void dispose( ) {
protected void dispose(StatefulKnowledgeSession ksession) {
StatefulKnowledgeSessionImpl wm = ((StatefulKnowledgeSessionImpl) ksession);

for ( AgendaEventListener listener : wm.getAgendaEventSupport().getEventListeners() ) {
Expand All @@ -404,9 +397,7 @@ protected void dispose( ) {
this.processEventSupport.removeEventListener( listener );
}
}
initialized = false;
ksession.dispose();
ksession = null;
}

private static class AgendaEventListenerPlaceholder implements AgendaEventListener {
Expand Down

0 comments on commit 2bd608b

Please sign in to comment.