Skip to content

Commit

Permalink
BZ-1022140: removing busy wait loop from fireUntilHalt()
Browse files Browse the repository at this point in the history
  • Loading branch information
etirelli committed Oct 8, 2013
1 parent 31c60fd commit a0ec3c6
Show file tree
Hide file tree
Showing 15 changed files with 340 additions and 255 deletions.
Expand Up @@ -4,23 +4,24 @@
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import junit.framework.Assert;

import org.drools.compiler.Cell;
import org.drools.compiler.Cheese;
import org.drools.compiler.CommonTestMethodBase;
import org.drools.compiler.FactA;
import org.drools.core.FactHandle;
import org.drools.compiler.Father;
import org.drools.compiler.Foo;
import org.drools.compiler.Message;
import org.drools.compiler.Neighbor;
import org.drools.compiler.Person;
import org.drools.compiler.PersonInterface;
import org.drools.compiler.Pet;
import org.drools.core.RuleBase;
import org.drools.compiler.TotalHolder;
import org.drools.core.FactHandle;
import org.drools.core.RuleBase;
import org.drools.core.WorkingMemory;
import org.drools.core.common.InternalAgenda;
import org.drools.core.common.InternalWorkingMemory;
Expand All @@ -35,10 +36,23 @@
import org.drools.core.spi.Activation;
import org.drools.core.spi.AgendaGroup;
import org.junit.Test;
import org.kie.api.event.rule.AfterMatchFiredEvent;
import org.kie.api.event.rule.AgendaGroupPoppedEvent;
import org.kie.api.event.rule.AgendaGroupPushedEvent;
import org.kie.api.event.rule.BeforeMatchFiredEvent;
import org.kie.api.event.rule.DebugAgendaEventListener;
import org.kie.api.event.rule.MatchCancelledEvent;
import org.kie.api.event.rule.MatchCreatedEvent;
import org.kie.api.event.rule.RuleFlowGroupActivatedEvent;
import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent;
import org.kie.api.runtime.KieSession;
import org.kie.internal.KnowledgeBase;
import org.kie.internal.builder.conf.RuleEngineOption;
import org.kie.internal.runtime.StatefulKnowledgeSession;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.mockito.Mockito.*;

public class ExecutionFlowControlTest extends CommonTestMethodBase {

Expand Down Expand Up @@ -911,14 +925,25 @@ public void testRuleFlowGroupDeactivate() throws Exception {
assertEquals( 0, list.size() );
}

@Test
@Test(timeout=10000)
public void testRuleFlowGroupInActiveMode() throws Exception {
KnowledgeBase kbase = loadKnowledgeBase("ruleflowgroup.drl");
final KieSession ksession = createKnowledgeSession(kbase);

final List list = new ArrayList();
ksession.setGlobal( "list",
list );

final AtomicBoolean fired = new AtomicBoolean(false);
ksession.addEventListener(new org.kie.api.event.rule.DefaultAgendaEventListener() {
@Override
public void afterMatchFired(AfterMatchFiredEvent event) {
synchronized( fired ) {
fired.set(true);
fired.notifyAll();
}
}
});

new Thread(new Runnable() {
public void run() {
Expand All @@ -931,8 +956,12 @@ public void run() {
list.size() );

((AgendaImpl) ksession.getAgenda()).getAgenda().activateRuleFlowGroup( "Group1" );

Thread.sleep(1000);

synchronized( fired ) {
if( !fired.get() ) {
fired.wait();
}
}

assertEquals( 1,
list.size() );
Expand Down
Expand Up @@ -2100,7 +2100,7 @@ public void testQueryAndRetract() {
" final java.util.List l = list;" +
" org.drools.core.common.AgendaItem item = ( org.drools.core.common.AgendaItem ) drools.getMatch();\n" +
" item.setActivationUnMatchListener( new org.kie.internal.event.rule.ActivationUnMatchListener() {\n" +
" public void unMatch(org.kie.api.runtime.rule.Session wm, org.kie.api.runtime.rule.Match activation) {\n" +
" public void unMatch(org.kie.api.runtime.rule.RuleRuntime wm, org.kie.api.runtime.rule.Match activation) {\n" +
" l.add(\"pippo\");\n" +
" }\n" +
" } );" +
Expand Down
Expand Up @@ -53,6 +53,7 @@ public class PseudoClockEventsTest extends CommonTestMethodBase {
"\n" +
"declare StockTick\n" +
" @role( event )\n" +
//" @expires( 1m )\n" +
"end\n\n";
private static final String evalFirePseudoClockRuleA =
"rule A\n" +
Expand Down Expand Up @@ -87,7 +88,7 @@ public void testEvenFirePseudoClockRuleA() throws Exception {
any(AfterMatchFiredEvent.class));
}

@Test (timeout = 6000)
@Test(timeout = 6000)
public void testEvenFirePseudoClockRuleB() throws Exception {

AgendaEventListener ael = mock(AgendaEventListener.class);
Expand Down Expand Up @@ -142,6 +143,7 @@ public void run() {

for (int stIndex = 1; stIndex <= stockCount; stIndex++) {
clock.advanceTime(20, TimeUnit.SECONDS);
Thread.sleep(100);
final StockTickInterface st = new StockTick(stIndex,
"RHT",
100 * stIndex,
Expand All @@ -152,6 +154,8 @@ public void run() {

Thread.sleep(100);
ksession.halt();

fireUntilHaltThread.join(5000);

return stockCount;
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.drools.compiler.StockTickInterface;
import org.drools.core.base.ClassObjectType;
import org.drools.core.common.InternalFactHandle;
import org.drools.core.event.DebugAgendaEventListener;
import org.drools.compiler.compiler.DroolsParserException;
import org.drools.core.impl.KnowledgeBaseImpl;
import org.drools.core.reteoo.ObjectTypeNode;
Expand All @@ -46,6 +47,7 @@
import org.drools.core.spi.ObjectType;
import org.drools.core.time.impl.PseudoClockScheduler;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.kie.internal.KnowledgeBase;
import org.kie.api.KieBaseConfiguration;
Expand Down Expand Up @@ -753,7 +755,7 @@ public void testWindowWithEntryPointCompilationError() {
}


@Test (timeout=10000)
@Test(timeout=10000)
public void testAtomicActivationFiring() throws Exception {
// JBRULES-3383
String str = "package org.drools.compiler.test\n" +
Expand Down Expand Up @@ -797,6 +799,8 @@ public void testAtomicActivationFiring() throws Exception {
kBaseConfig.setOption(EventProcessingOption.STREAM);
KnowledgeBase kbase = loadKnowledgeBaseFromString(kBaseConfig, str);
StatefulKnowledgeSession ksession = createKnowledgeSession( kbase );

ksession.addEventListener(new org.kie.api.event.rule.DebugAgendaEventListener());

FactType eventType = kbase.getFactType("org.drools.compiler.test", "Event");

Expand Down
Expand Up @@ -356,7 +356,8 @@ public AbstractWorkingMemory(final int id,
this.__ruleBaseEventListeners = new LinkedList();
this.lock = new ReentrantLock();

timerService = TimerServiceFactory.getTimerService( this.config );
timerService = TimerServiceFactory.getTimerService( this, this.config );
timerService.setSession(this);
((AcceptsTimerJobFactoryManager) timerService).setTimerJobFactoryManager( config.getTimerJobFactoryManager() );

this.propagationIdCounter = new AtomicLong( propagationContext );
Expand Down
Expand Up @@ -134,6 +134,10 @@ public int size() {

public void add(final Activation activation) {
this.priorityQueue.enqueue((Activation) activation);
if( this.workingMemory.getAgenda() instanceof DefaultAgenda ) {
// this is phreak, as reteoo uses ReteAgenda
((DefaultAgenda)this.workingMemory.getAgenda()).notifyHalt();
}
}

public Activation remove() {
Expand Down
94 changes: 54 additions & 40 deletions drools-core/src/main/java/org/drools/core/common/DefaultAgenda.java
Expand Up @@ -107,7 +107,8 @@ public class DefaultAgenda

private org.kie.api.runtime.rule.ConsequenceExceptionHandler consequenceExceptionHandler;

protected volatile AtomicBoolean halt = new AtomicBoolean( false );
protected AtomicBoolean halt = new AtomicBoolean( true );
protected volatile boolean fireUntilHalt = false;

protected int activationCounter;

Expand All @@ -117,12 +118,6 @@ public class DefaultAgenda

private ActivationsFilter activationsFilter;

private volatile boolean isFiringActivation = false;

private volatile boolean mustNotifyHalt = false;

private volatile boolean fireUntilHalt = false;

// ------------------------------------------------------------
// Constructors
// ------------------------------------------------------------
Expand Down Expand Up @@ -279,6 +274,7 @@ public void addEagerRuleAgendaItem(RuleAgendaItem item) {
log.trace("Added {} to eager evaluation list.", item.getRule().getName() );
}
eager.add( item );
notifyHalt();
}

@Override
Expand Down Expand Up @@ -672,6 +668,7 @@ public void activateRuleFlowGroup(final InternalRuleFlowGroup group, long proces
setFocus( group );
((EventSupport) this.workingMemory).getAgendaEventSupport().fireAfterRuleFlowGroupActivated( group,
this.workingMemory );
this.notifyHalt();
}

public void deactivateRuleFlowGroup(final String name) {
Expand Down Expand Up @@ -985,7 +982,6 @@ public synchronized void fireActivation(final Activation activation) throws Cons
// on an empty pattern
// we need to make sure it re-activates
this.workingMemory.startOperation();
isFiringActivation = true;
try {
final EventSupport eventsupport = (EventSupport) this.workingMemory;

Expand Down Expand Up @@ -1051,11 +1047,6 @@ public synchronized void fireActivation(final Activation activation) throws Cons

unstageActivations();
} finally {
isFiringActivation = false;
if ( mustNotifyHalt ) {
mustNotifyHalt = false;
notifyHalt();
}
this.workingMemory.endOperation();
}
}
Expand Down Expand Up @@ -1142,45 +1133,68 @@ public void stageLeftTuple(RuleAgendaItem ruleAgendaItem, AgendaItem justified)
}

public void fireUntilHalt() {
fireUntilHalt = true;
fireUntilHalt( null );
}

public void fireUntilHalt(final AgendaFilter agendaFilter) {
this.halt.set( false );
if ( log.isTraceEnabled() ) {
log.trace("Starting fireUntilHalt");
}
while ( continueFiring( -1 ) ) {
boolean fired = fireNextItem( agendaFilter, 0, -1 ) >= 0 ||
!((AbstractWorkingMemory) this.workingMemory).getActionQueue().isEmpty();
this.workingMemory.executeQueuedActions();
if ( !fired ) {
Thread.yield();
} else {
this.workingMemory.executeQueuedActions();
if( this.halt.compareAndSet( true, false ) ) { // if this was false already means someone else is firing rules already
fireUntilHalt = true;
try {
if ( log.isTraceEnabled() ) {
log.trace("Starting fireUntilHalt");
}
while ( continueFiring( -1 ) ) {
boolean fired = fireNextItem( agendaFilter, 0, -1 ) > 0 ||
!((AbstractWorkingMemory) this.workingMemory).getActionQueue().isEmpty();
this.workingMemory.executeQueuedActions();
if ( !fired ) {
synchronized ( this.halt ) {
// has to check in here because a different thread might have set the halt flag already
if( ! this.halt.get() ) {
// need to check again the agenda is still empty as a new activation
// could have been created between the time it did not fire the last
// one and the synchronized block started
InternalAgendaGroup nextFocus = getNextFocus();
if( nextFocus == null || nextFocus.isEmpty() ) {
try {
this.halt.wait();
} catch (InterruptedException e) {
// nothing to do
}
}
}
}
}
}
if ( log.isTraceEnabled() ) {
log.trace("Ending fireUntilHalt");
}
} finally {
fireUntilHalt = false;
this.halt.set(true);
}
}
if ( log.isTraceEnabled() ) {
log.trace("Ending fireUntilHalt");
}
fireUntilHalt = false;
}

public int fireAllRules(AgendaFilter agendaFilter,
int fireLimit) {
unstageActivations();
this.halt.set( false );
int fireCount = 0;
int returnedFireCount = 0;
do {
returnedFireCount = fireNextItem( agendaFilter, fireCount, fireLimit );
fireCount += returnedFireCount;
this.workingMemory.executeQueuedActions();
} while ( continueFiring( 0 ) && returnedFireCount != 0 && (fireLimit == -1 || (fireCount < fireLimit)) );
if ( this.focusStack.size() == 1 && getMainAgendaGroup().isEmpty() ) {
// the root MAIN agenda group is empty, reset active to false, so it can receive more activations.
getMainAgendaGroup().setActive( false );
if( this.halt.compareAndSet( true, false ) ) { // if this was false already means someone else is firing rules already
try {
int returnedFireCount = 0;
do {
returnedFireCount = fireNextItem( agendaFilter, fireCount, fireLimit );
fireCount += returnedFireCount;
this.workingMemory.executeQueuedActions();
} while ( continueFiring( 0 ) && returnedFireCount != 0 && (fireLimit == -1 || (fireCount < fireLimit)) );
if ( this.focusStack.size() == 1 && getMainAgendaGroup().isEmpty() ) {
// the root MAIN agenda group is empty, reset active to false, so it can receive more activations.
getMainAgendaGroup().setActive( false );
}
} finally {
this.halt.set(true);
}
}
return fireCount;
}
Expand Down

0 comments on commit a0ec3c6

Please sign in to comment.