Skip to content

Commit

Permalink
[DROOLS-7584] process completely all facts inserted before halt (apac…
Browse files Browse the repository at this point in the history
…he#5576)

* [DROOLS-7584] process completely all facts inserted before halt

* fix imports order

* fix testModifyWithFromSudoku to meet halt() (kiegroup#22)

---------

Co-authored-by: Toshiya Kobayashi <toshiyakobayashi@gmail.com>
  • Loading branch information
mariofusco and tkobayas committed Nov 7, 2023
1 parent 85fc7e3 commit 8a2b9db
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ public InternalMatch createAgendaItem(RuleTerminalNodeLeftTuple rtnLeftTuple, in

@Override
public void cancelActivation(InternalMatch internalMatch) {
InternalMatch item = internalMatch;

if ( internalMatch.isQueued() ) {
if (internalMatch.getActivationGroupNode() != null ) {
internalMatch.getActivationGroupNode().getActivationGroup().removeActivation(internalMatch);
Expand All @@ -194,11 +192,11 @@ public void cancelActivation(InternalMatch internalMatch) {
getAgendaEventSupport().fireActivationCancelled(internalMatch, reteEvaluator, MatchCancelledCause.WME_MODIFY);
}

if (item.getRuleAgendaItem() != null) {
item.getRuleAgendaItem().getRuleExecutor().fireConsequenceEvent( this.reteEvaluator, this, item, ON_DELETE_MATCH_CONSEQUENCE_NAME );
if (internalMatch.getRuleAgendaItem() != null) {
internalMatch.getRuleAgendaItem().getRuleExecutor().fireConsequenceEvent( this.reteEvaluator, this, internalMatch, ON_DELETE_MATCH_CONSEQUENCE_NAME );
}

reteEvaluator.getRuleEventSupport().onDeleteMatch( item );
reteEvaluator.getRuleEventSupport().onDeleteMatch(internalMatch);
}

@Override
Expand Down Expand Up @@ -284,10 +282,10 @@ public void addPropagation(PropagationEntry propagationEntry) {

@Override
public int fireAllRules(AgendaFilter agendaFilter, int fireLimit) {
return fireLoop( agendaFilter, fireLimit, RestHandler.FIRE_ALL_RULES );
return fireLoop( agendaFilter, fireLimit );
}

private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restHandler) {
private int fireLoop(AgendaFilter agendaFilter, int fireLimit) {
firing = true;
int fireCount = 0;
PropagationEntry head = propagationList.takeAll();
Expand Down Expand Up @@ -320,7 +318,7 @@ private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restH

if ( returnedFireCount == 0 && head == null && ( group == null || ( group.isEmpty() && !group.isAutoDeactivate() ) ) && !flushExpirations() ) {
// if true, the engine is now considered potentially at rest
head = restHandler.handleRest( this );
head = handleRest();
}
}

Expand Down Expand Up @@ -349,44 +347,11 @@ private void doRetract( PropagationContext ectx ) {
}
}

interface RestHandler {
RestHandler FIRE_ALL_RULES = new RestHandler.FireAllRulesRestHandler();
RestHandler FIRE_UNTIL_HALT = new RestHandler.FireUntilHaltRestHandler();

PropagationEntry handleRest(ActivationsManagerImpl agenda);

class FireAllRulesRestHandler implements RestHandler {
@Override
public PropagationEntry handleRest(ActivationsManagerImpl agenda) {
PropagationEntry head = agenda.propagationList.takeAll();
if (head == null) {
agenda.firing = false;
}
return head;
}
}

class FireUntilHaltRestHandler implements RestHandler {
@Override
public PropagationEntry handleRest(ActivationsManagerImpl agenda) {
PropagationEntry head;
// this must use the same sync target as takeAllPropagations, to ensure this entire block is atomic, up to the point of wait
synchronized (agenda.propagationList) {
head = agenda.propagationList.takeAll();

// if halt() has called, the thread should not be put into a wait state
// instead this is just a safe way to make sure the queue is flushed before exiting the loop
if (head == null) {
agenda.propagationList.waitOnRest();
head = agenda.propagationList.takeAll();
if (head == null) {
agenda.firing = false;
}
}
}

return head;
}
private PropagationEntry handleRest() {
PropagationEntry head = propagationList.takeAll();
if (head == null) {
firing = false;
}
return head;
}
}
30 changes: 14 additions & 16 deletions drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ public int fire(ActivationsManager activationsManager, AgendaFilter filter, int
return fire(activationsManager.getReteEvaluator(), activationsManager, filter, fireCount, fireLimit);
}

private int fire( ReteEvaluator reteEvaluator,
ActivationsManager activationsManager,
AgendaFilter filter,
int fireCount,
int fireLimit) {
private int fire( ReteEvaluator reteEvaluator, ActivationsManager activationsManager, AgendaFilter filter, int fireCount, int fireLimit) {
int localFireCount = 0;

if (!tupleList.isEmpty()) {
Expand Down Expand Up @@ -167,8 +163,10 @@ private int fire( ReteEvaluator reteEvaluator,
}

if (!ruleIsAllMatches) { // if firing rule is @All don't give way to other rules
if ( haltRuleFiring( fireCount, fireLimit, localFireCount, activationsManager ) ) {
break; // another rule has high priority and is on the agenda, so evaluate it first
if ( firingHalted(activationsManager) ||
fireLimitReached(fireCount, fireLimit, localFireCount) ||
ruleWithHigherSalienceActivated(activationsManager) ) {
break;
}
if (!reteEvaluator.isSequential()) {
evaluateNetworkIfDirty( activationsManager );
Expand Down Expand Up @@ -261,22 +259,22 @@ private boolean cancelAndContinue(ReteEvaluator reteEvaluator,
return filter != null && !filter.accept((InternalMatch) leftTuple);
}

private boolean haltRuleFiring(int fireCount,
int fireLimit,
int localFireCount,
ActivationsManager activationsManager) {
if (!activationsManager.isFiring() || (fireLimit >= 0 && (localFireCount + fireCount >= fireLimit))) {
return true;
}
private static boolean firingHalted(ActivationsManager activationsManager) {
return !activationsManager.isFiring();
}

private boolean ruleWithHigherSalienceActivated(ActivationsManager activationsManager) {
// The eager list must be evaluated first, as dynamic salience rules will impact the results of peekNextRule
activationsManager.evaluateEagerList();

RuleAgendaItem nextRule = activationsManager.peekNextRule();
if (nextRule == ruleAgendaItem || nextRule == null) {
return false;
}
return !ruleAgendaItem.getAgendaGroup().equals( nextRule.getAgendaGroup() ) || !isHigherSalience(nextRule);
return !ruleAgendaItem.getAgendaGroup().equals(nextRule.getAgendaGroup()) || !isHigherSalience(nextRule);
}

private static boolean fireLimitReached(int fireCount, int fireLimit, int localFireCount) {
return fireLimit >= 0 && (localFireCount + fireCount >= fireLimit);
}

private boolean isHigherSalience(RuleAgendaItem nextRule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
*/
package org.drools.kiesession.agenda;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.drools.base.definitions.rule.impl.QueryImpl;
import org.drools.base.definitions.rule.impl.RuleImpl;
import org.drools.core.RuleBaseConfiguration;
Expand Down Expand Up @@ -67,13 +74,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Rule-firing Agenda.
*
Expand Down Expand Up @@ -576,7 +576,7 @@ int internalFireAllRules( AgendaFilter agendaFilter, int fireLimit, boolean isIn
private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restHandler, boolean isInternalFire) {
int fireCount = 0;
try {
PropagationEntry head = propagationList.takeAll();
PropagationEntry head = takePropagationHead();
int returnedFireCount;

boolean limitReached = fireLimit == 0; // -1 or > 0 will return false. No reason for user to give 0, just handled for completeness.
Expand Down Expand Up @@ -604,19 +604,13 @@ private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restH
// Note that if a halt() command is given, the engine is changed to INACTIVE,
// and isFiring returns false allowing it to exit before all rules are fired.
//
while ( isFiring() ) {
while ( isFiring() || executionStateMachine.getCurrentState().isHalting() ) {
if ( head != null ) {
// it is possible that there are no action propagations, but there are rules to fire.
propagationList.flush(head);
head = null;
}

// a halt may have occurred during the flushPropagations,
// which changes the isFiring state. So a second isFiring guard is needed
if (!isFiring()) {
break;
}

evaluateEagerList();
InternalAgendaGroup group = getAgendaGroupsManager().getNextFocus();
if ( group != null && !limitReached ) {
Expand All @@ -627,7 +621,7 @@ private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restH
fireCount += returnedFireCount;

limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
head = propagationList.takeAll();
head = takePropagationHead();
} else {
returnedFireCount = 0; // no rules fired this iteration, so we know this is 0
group = null; // set the group to null in case the fire limit has been reached
Expand All @@ -636,7 +630,7 @@ private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restH
if ( returnedFireCount == 0 && head == null && ( group == null || ( group.isEmpty() && !group.isAutoDeactivate() ) ) && !flushExpirations() ) {
// if true, the engine is now considered potentially at rest
head = restHandler.handleRest( this, isInternalFire );
if (!isInternalFire && head == null) {
if ( ( !isInternalFire || executionStateMachine.getCurrentState().isHalting() ) && head == null) {
break;
}
}
Expand All @@ -653,6 +647,13 @@ private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler restH
return fireCount;
}

private PropagationEntry takePropagationHead() {
if (executionStateMachine.getCurrentState().isHalting()) {
return null;
}
return propagationList.takeAll();
}

interface RestHandler {
RestHandler FIRE_ALL_RULES = new FireAllRulesRestHandler();
RestHandler FIRE_UNTIL_HALT = new FireUntilHaltRestHandler();
Expand Down Expand Up @@ -684,15 +685,15 @@ public PropagationEntry handleRest(DefaultAgenda agenda, boolean isInternalFire)
PropagationEntry head;
// this must use the same sync target as takeAllPropagations, to ensure this entire block is atomic, up to the point of wait
synchronized (agenda.propagationList) {
head = agenda.propagationList.takeAll();
head = agenda.takePropagationHead();

// if halt() has called, the thread should not be put into a wait state
// instead this is just a safe way to make sure the queue is flushed before exiting the loop
if (head == null && (
agenda.executionStateMachine.getCurrentState() == ExecutionStateMachine.ExecutionState.FIRING_UNTIL_HALT ||
agenda.executionStateMachine.getCurrentState() == ExecutionStateMachine.ExecutionState.INACTIVE_ON_FIRING_UNTIL_HALT )) {
agenda.propagationList.waitOnRest();
head = agenda.propagationList.takeAll();
head = agenda.takePropagationHead();
}
}

Expand Down Expand Up @@ -771,13 +772,38 @@ public String toString() {
}
}

static class ImmediateHalt extends PropagationEntry.AbstractPropagationEntry {

private final ExecutionStateMachine executionStateMachine;
private final PropagationList propagationList;

protected ImmediateHalt( ExecutionStateMachine executionStateMachine, PropagationList propagationList ) {
this.executionStateMachine = executionStateMachine;
this.propagationList = propagationList;
}

@Override
public void internalExecute(ReteEvaluator reteEvaluator ) {
executionStateMachine.immediateHalt(propagationList);
reteEvaluator.getActivationsManager().haltGroupEvaluation();
}

@Override
public String toString() {
return "ImmediateHalt";
}
}

@Override
public synchronized void halt() {
// only attempt halt an engine that is currently firing
// This will place a halt command on the propagation queue
// that will allow the engine to halt safely
if ( isFiring() ) {
propagationList.addEntry(new Halt(executionStateMachine));
PropagationEntry halt = executionStateMachine.getCurrentState() == ExecutionStateMachine.ExecutionState.FIRING_ALL_RULES ?
new ImmediateHalt(executionStateMachine, propagationList) :
new Halt(executionStateMachine);
propagationList.addEntry(halt);
}
}

Expand Down Expand Up @@ -881,6 +907,10 @@ public boolean isFiring() {
return firing;
}

public boolean isHalting() {
return this == HALTING;
}

public boolean isAlive() {
return alive;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void testModifyWithFromSudoku() {
"rule \"set a value\" when\n" +
" $s : Setting( $rn: rowNo, $cn: colNo, $v: value )\n" +
" $c : Cell( rowNo == $rn, colNo == $cn, value == null)\n" +
" $ctr : Counter( $count: count )\n" +
" $ctr : Counter( $count: count != 0 )\n" +
"then\n" +
" System.out.println(\"set a value [\" + $v + \"] to $c = \" + $c);\n" +
" modify( $c ){ setValue( $v ) }\n" +
Expand All @@ -269,6 +269,7 @@ public void testModifyWithFromSudoku() {
" $s: Setting( $rn: rowNo, $cn: colNo, $v: value )\n" +
" Cell( rowNo == $rn, colNo == $cn, value == $v, $exCells: exCells )\n" +
" $c: Cell( free contains $v ) from $exCells\n" +
" Counter( $count: count != 0 )\n" +
"then\n" +
" System.out.println(\"eliminate a value [\" + $v + \"] from Cell : $c = \" + $c);\n" +
" modify( $c ){ blockValue( $v ) }\n" +
Expand All @@ -280,6 +281,7 @@ public void testModifyWithFromSudoku() {
" not( $x: Cell( free contains $v )\n" +
" and\n" +
" Cell( this == $c, exCells contains $x ) )\n" +
" Counter( $count: count != 0 )\n" +
"then\n" +
" System.out.println( \"done setting cell \" + $c.toString() ); \n" +
" delete( $s );\n" +
Expand All @@ -288,6 +290,7 @@ public void testModifyWithFromSudoku() {
"when\n" +
" not Setting()\n" +
" $c: Cell( $rn: rowNo, $cn: colNo, freeCount == 1 )\n" +
" Counter( $count: count != 0 )\n" +
"then\n" +
" Integer i = $c.getFreeValue();\n" +
" System.out.println( \"single \" + i + \" at \" + $c.toString() );\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.drools.mvel.compiler.Cheese;
import org.drools.mvel.compiler.Person;
Expand Down Expand Up @@ -161,4 +164,38 @@ public void testFireUntilHaltFailingAcrossEntryPoints() throws Exception {
assertThat(alive).as("Thread should have died!").isFalse();
assertThat(list.size()).isEqualTo(1);
}

@Test
public void testAllFactsProcessedBeforeHalt() throws Exception {
String drl = "package org.example.drools;\n" +
"\n" +
"global java.util.concurrent.CountDownLatch latch;\n" +
"\n" +
"rule \"R1\" when\n" +
" $s : String()\n" +
"then\n" +
" latch.countDown();\n" +
"end\n" +
"rule \"R2\" when\n" +
" $s : String()\n" +
"then\n" +
" latch.countDown();\n" +
"end\n";

KieBase kbase = KieBaseUtil.getKieBaseFromKieModuleFromDrl("test", kieBaseTestConfiguration, drl);
KieSession ksession = kbase.newKieSession();

CountDownLatch latch = new CountDownLatch(4);
ksession.setGlobal("latch", latch);

Executors.newSingleThreadExecutor().execute(ksession::fireUntilHalt);

ksession.insert("aaa");
ksession.insert("bbb");

ksession.halt();

// the 2 facts inserted should be processed before halt
assertThat(latch.await(100, TimeUnit.MILLISECONDS)).isTrue();
}
}

0 comments on commit 8a2b9db

Please sign in to comment.