Skip to content

Commit

Permalink
[BZ-1073880] run timed rules in a single thread executor
Browse files Browse the repository at this point in the history
  • Loading branch information
mariofusco committed Mar 7, 2014
1 parent b83edb6 commit e488e71
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 8 deletions.
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.drools.compiler.Alarm;
import org.drools.compiler.Cheese;
Expand Down Expand Up @@ -1667,4 +1668,113 @@ public void testCronFire() throws InterruptedException {
ksession.insert( "go" );
ksession.fireAllRules();
}

@Test(timeout = 10000)
public void testRaceConditionWithTimedRuleExectionOption() throws Exception {
// BZ-1073880
String str = "package org.simple \n" +
"global java.util.List list \n" +
"rule xxx @Eager(true)\n" +
" timer (int:30s 10s) "
+ "when \n" +
" $s: String()\n" +
"then \n" +
" list.add($s); \n" +
"end \n";

KieSessionConfiguration conf = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setOption(ClockTypeOption.get("pseudo"));
conf.setOption(TimedRuleExectionOption.YES);

KnowledgeBase kbase = loadKnowledgeBaseFromString(str);
KieSession ksession = createKnowledgeSession(kbase, conf);

final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean aBool = new AtomicBoolean(true);
AgendaEventListener agendaEventListener = new DefaultAgendaEventListener() {
public void afterMatchFired(org.kie.api.event.rule.AfterMatchFiredEvent event) {
try {
if (aBool.get()) {
barrier.await();
aBool.set(false);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
ksession.addEventListener(agendaEventListener);

List list = new ArrayList();
ksession.setGlobal("list", list);

// Using the Pseudo Clock.
SessionClock clock = ksession.getSessionClock();
SessionPseudoClock pseudoClock = (SessionPseudoClock) clock;

// Insert the event.
String eventOne = "one";
ksession.insert(eventOne);

// Advance the time .... so the timer will fire.
pseudoClock.advanceTime(10000, TimeUnit.MILLISECONDS);

// Rule doesn't fire in PHREAK. This is because you need to call 'fireAllRules' after you've inserted the fact, otherwise the timer
// job is not created.

ksession.fireAllRules();

// Rule still doesn't fire, because the DefaultTimerJob is created now, and now we need to advance the timer again.

pseudoClock.advanceTime(30000, TimeUnit.MILLISECONDS);
barrier.await();
barrier.reset();
aBool.set(true);

pseudoClock.advanceTime(10000, TimeUnit.MILLISECONDS);
barrier.await();
barrier.reset();
aBool.set(true);

String eventTwo = "two";
ksession.insert(eventTwo);
ksession.fireAllRules();

// 60
pseudoClock.advanceTime(10000, TimeUnit.MILLISECONDS);
barrier.await();
barrier.reset();
aBool.set(true);

// 70
pseudoClock.advanceTime(10000, TimeUnit.MILLISECONDS);
barrier.await();
barrier.reset();
aBool.set(true);

//From here, the second rule should fire.
//phaser.register();
pseudoClock.advanceTime(10000, TimeUnit.MILLISECONDS);
barrier.await();
barrier.reset();
aBool.set(true);

// Now 2 rules have fired, and those will now fire every 10 seconds.
pseudoClock.advanceTime(20000, TimeUnit.MILLISECONDS);
barrier.await();
barrier.reset();

pseudoClock.advanceTime(20000, TimeUnit.MILLISECONDS);
aBool.set(true);
barrier.await();
barrier.reset();

pseudoClock.advanceTime(20000, TimeUnit.MILLISECONDS);
aBool.set(true);
barrier.await();
barrier.reset();

ksession.destroy();
}

}
Expand Up @@ -10,19 +10,25 @@

public class ExecutorProviderImpl implements KieExecutors {

private static final java.util.concurrent.ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});;
private static final java.util.concurrent.ExecutorService executor = Executors.newCachedThreadPool(new DaemonThreadFactory());

public Executor getExecutor() {
return executor;
}

public Executor newSingleThreadExecutor() {
return Executors.newSingleThreadExecutor(new DaemonThreadFactory());
}

public <T> CompletionService<T> getCompletionService() {
return new ExecutorCompletionService<T>(getExecutor());
}

private static class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
}
Expand Up @@ -425,7 +425,7 @@ public void run() {
}

private static class ExecutorHolder {
private static final java.util.concurrent.Executor executor = ExecutorProviderFactory.getExecutorProvider().getExecutor();
private static final java.util.concurrent.Executor executor = ExecutorProviderFactory.getExecutorProvider().newSingleThreadExecutor();
}

public static class Executor implements TimedRuleExecution {
Expand Down

0 comments on commit e488e71

Please sign in to comment.