Skip to content

Commit

Permalink
DROOLS-284 / BZ1015323 Timer nullpointer due to race condition during…
Browse files Browse the repository at this point in the history
… update

-update propagations and timer execute are now synchronised, to avoid race conditions
  • Loading branch information
Mark Proctor committed Oct 3, 2013
1 parent 116bee5 commit 859b1a5
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,47 @@ public void run() {
barrier.reset();
}

@Test(timeout=10000)
public void testExprIntervalTimerRaceCondition() throws Exception {
String str = "";
str += "package org.simple \n";
str += "global java.util.List list \n";
str += "rule xxx \n";
str += " timer (expr: $i, $i) \n";
str += "when \n";
str += " $i : Long() \n";
str += "then \n";
str += " list.add(\"fired\"); \n";
str += "end \n";

KieSessionConfiguration conf = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setOption( ClockTypeOption.get( "pseudo" ) );

KnowledgeBase kbase = loadKnowledgeBaseFromString(str );
KieSession ksession = createKnowledgeSession(kbase, conf);

List list = new ArrayList();

PseudoClockScheduler timeService = ( PseudoClockScheduler ) ksession.<SessionClock>getSessionClock();
timeService.advanceTime( new Date().getTime(), TimeUnit.MILLISECONDS );

ksession.setGlobal( "list", list );
FactHandle fh = (FactHandle) ksession.insert( 10000l );

ksession.fireAllRules();
assertEquals( 0, list.size() );

timeService.advanceTime( 10, TimeUnit.SECONDS );
ksession.fireAllRules();
assertEquals( 1, list.size() );


timeService.advanceTime( 17, TimeUnit.SECONDS );
ksession.update( fh, 5000l );
ksession.fireAllRules();
assertEquals( 2, list.size() );
}

@Test(timeout=10000)
public void testUnknownProtocol() throws Exception {
wrongTimerExpression("xyz:30");
Expand Down Expand Up @@ -1048,10 +1089,6 @@ public void testTimerRemoval() {

@Test(timeout=10000)
public void testIntervalTimerWithLongExpressions() throws Exception {
if ( CommonTestMethodBase.phreak == RuleEngineOption.PHREAK ) {
return; // phreak does not yet support dynamic salience
}

String str = "package org.simple;\n" +
"global java.util.List list;\n" +
"\n" +
Expand Down Expand Up @@ -1116,10 +1153,6 @@ public void testIntervalTimerWithLongExpressions() throws Exception {

@Test(timeout=10000)
public void testIntervalTimerWithStringExpressions() throws Exception {
if ( CommonTestMethodBase.phreak == RuleEngineOption.PHREAK ) {
return; // phreak does not yet support dynamic salience
}

String str = "package org.simple;\n" +
"global java.util.List list;\n" +
"\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.drools.core.time.TimerService;
import org.drools.core.time.Trigger;
import org.drools.core.time.impl.DefaultJobHandle;
import org.drools.core.time.impl.IntervalTrigger;
import org.drools.core.time.impl.Timer;
import org.drools.core.util.LinkedList;
import org.drools.core.util.index.LeftTupleList;
Expand Down Expand Up @@ -115,7 +116,17 @@ public void doLeftUpdates(TimerNode timerNode,
LeftTuple next = leftTuple.getStagedNext();

DefaultJobHandle jobHandle = (DefaultJobHandle) leftTuple.getObject();
timerService.removeJob( jobHandle );
LeftTupleList leftTuples = tm.getInsertOrUpdateLeftTuples();
synchronized ( leftTuples ) {
// the job removal and memory check is done within a sync block, incase it is executing a trigger at the
// same time we are procesing an update
timerService.removeJob( jobHandle );

if ( leftTuple.getMemory() != null ) {
// a previous timer has requested an eval, so remove, we don't want it processed twice
leftTuples.remove( leftTuple );
}
}
scheduleLeftTuple( timerNode, tm, pmem, sink, wm, timer, timerService, timestamp, calendarNames, calendars, leftTuple, trgLeftTuples, stagedLeftTuples );

leftTuple.clearStaged();
Expand Down Expand Up @@ -368,6 +379,11 @@ public void execute(JobContext ctx) {
}

synchronized ( leftTuples ) {
if ( timerJobCtx.getJobHandle().isCancel() ) {
// this is to force a sync point, as during update propagate it can cancel the FH
// we cannot have an update processed at the same timer is firing
return;
}
if ( lt.getMemory() == null ) {
// don't add it, if it's already added, which could happen with interval or cron timers
leftTuples.add( lt );
Expand Down Expand Up @@ -423,8 +439,9 @@ public void evauateAndFireRule() {
}

LeftTupleSets trgLeftTuples = new LeftTupleSetsImpl();
LeftTupleSets stagedLeftTuples = RuleNetworkEvaluator.getStagedLeftTuples( sink, wm, sm );
doPropagateChildLeftTuples(null, tm, sink, wm,
null, trgLeftTuples, sm.getStagedLeftTuples());
null, trgLeftTuples, stagedLeftTuples );

RuleNetworkEvaluator rne = new RuleNetworkEvaluator();
LinkedList<StackEntry> outerStack = new LinkedList<StackEntry>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Trigger createTrigger(long timestamp,

if ( jh != null ) {
IntervalTrigger preTrig = (IntervalTrigger) jh.getTimerJobInstance().getTrigger();
if (preTrig.hasNextFireTime() != null) {
if (preTrig.getLastFireTime() != null) {
timeSinceLastFire = timestamp - preTrig.getLastFireTime().getTime();
}
}
Expand Down

0 comments on commit 859b1a5

Please sign in to comment.