Skip to content

Commit

Permalink
[DROOLS-1103] dispose agenda and make it no longer usable on KieSessi…
Browse files Browse the repository at this point in the history
…on dispose
  • Loading branch information
mariofusco committed Jul 29, 2016
1 parent 1774629 commit fe4d08a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 16 deletions.
Expand Up @@ -775,7 +775,7 @@ public String toString() {
}
}

@Test
@Test(timeout = 10000)
public void testConcurrentFireAndDispose() throws InterruptedException {
// DROOLS-1103
String drl = "rule R no-loop timer( int: 1s )\n" +
Expand Down Expand Up @@ -816,4 +816,44 @@ public void run() {
fail( "java.util.concurrent.RejectedExecutionException should not happen" );
}
}

@Test(timeout = 10000)
public void testFireUntilHaltAndDispose() throws InterruptedException {
// DROOLS-1103
String drl = "rule R no-loop timer( int: 1s )\n" +
"when\n" +
" String()\n" +
"then\n" +
"end";

KieHelper helper = new KieHelper();
helper.addContent( drl, ResourceType.DRL );
KieBase kbase = helper.build( EventProcessingOption.STREAM );
KieSessionConfiguration ksconf = KieServices.Factory.get().newKieSessionConfiguration();
ksconf.setOption( TimedRuleExectionOption.YES );
final KieSession ksession = kbase.newKieSession(ksconf, null);

new Thread() {
@Override
public void run() {
ksession.fireUntilHalt();
}
}.start();

try {
Thread.sleep(100);
} catch (InterruptedException e) {
// do nothing
}

ksession.insert("xxx");

try {
Thread.sleep(100);
} catch (InterruptedException e) {
// do nothing
}

ksession.dispose();
}
}
37 changes: 22 additions & 15 deletions drools-core/src/main/java/org/drools/core/common/DefaultAgenda.java
Expand Up @@ -133,27 +133,30 @@ public class DefaultAgenda

private volatile List<PropagationContext> expirationContexts = new ArrayList<PropagationContext>();

public enum ExecutionState { // fireAllRule | fireUntilHalt | executeTask <-- required action
INACTIVE( false ), // fire | fire | exec
FIRING_ALL_RULES( true ), // do nothing | wait + fire | enqueue
FIRING_UNTIL_HALT( true ), // do nothing | do nothing | enqueue
HALTING( false ), // wait + fire | wait + fire | enqueue
EXECUTING_TASK( false ), // wait + fire | wait + fire | wait + exec
DEACTIVATED( false ), // wait + fire | wait + fire | wait + exec
DISPOSED( false ); // no further action is allowed
public enum ExecutionState { // fireAllRule | fireUntilHalt | executeTask <-- required action
INACTIVE( false, true ), // fire | fire | exec
FIRING_ALL_RULES( true, true ), // do nothing | wait + fire | enqueue
FIRING_UNTIL_HALT( true, true ), // do nothing | do nothing | enqueue
HALTING( false, true ), // wait + fire | wait + fire | enqueue
EXECUTING_TASK( false, true ), // wait + fire | wait + fire | wait + exec
DEACTIVATED( false, true ), // wait + fire | wait + fire | wait + exec
DISPOSING( false, false ), // no further action is allowed
DISPOSED( false, false ); // no further action is allowed

private final boolean firing;
private final boolean alive;

ExecutionState( boolean firing ) {
ExecutionState( boolean firing, boolean alive ) {
this.firing = firing;
this.alive = alive;
}

public boolean isFiring() {
return firing;
}

public boolean isAlive() {
return this != ExecutionState.DISPOSED;
return alive;
}
}

Expand Down Expand Up @@ -1436,7 +1439,10 @@ public boolean isFiring() {
public void executeTask( ExecutableEntry executable ) {
synchronized (stateMachineLock) {
// state is never changed outside of a sync block, so this is safe.
if (isFiring()) {
if (!currentState.isAlive()) {
return;
}
if (currentState.isFiring()) {
executable.enqueue();
return;
} else if (currentState != ExecutionState.EXECUTING_TASK) {
Expand Down Expand Up @@ -1499,14 +1505,14 @@ private void immediateHalt() {
synchronized (stateMachineLock) {
if (currentState != ExecutionState.INACTIVE) {
setCurrentState( ExecutionState.INACTIVE );
stateMachineLock.notify();
stateMachineLock.notifyAll();
workingMemory.notifyEngineInactive();
}
}
}

private void setCurrentState(ExecutionState state) {
if (!currentState.isAlive()) {
if (currentState == ExecutionState.DISPOSED) {
return;
}
if ( log.isDebugEnabled() ) {
Expand All @@ -1517,11 +1523,12 @@ private void setCurrentState(ExecutionState state) {

public boolean dispose() {
synchronized (stateMachineLock) {
if (currentState == ExecutionState.DISPOSED) {
if (!currentState.isAlive()) {
return false;
}
if (currentState.isFiring()) {
setCurrentState( ExecutionState.HALTING );
setCurrentState( ExecutionState.DISPOSING );
workingMemory.notifyWaitOnRest();
}
waitAndEnterExecutionState( ExecutionState.DISPOSED );
return true;
Expand Down

0 comments on commit fe4d08a

Please sign in to comment.