Skip to content

Commit

Permalink
BZ-980775: in STREAM mode, force event queue flush before executing e…
Browse files Browse the repository at this point in the history
…ach query

(cherry picked from commit 582ca92d2a858e53522c3eb7367f736d3a3d2a0b)
  • Loading branch information
winklerm authored and etirelli committed Sep 25, 2013
1 parent 72c5a8b commit 86be1a8
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 10 deletions.
@@ -0,0 +1,119 @@
package org.drools.compiler.integrationtests;

import java.util.concurrent.TimeUnit;

import org.drools.compiler.CommonTestMethodBase;
import org.drools.core.time.SessionPseudoClock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.KieServices;
import org.kie.api.builder.KieFileSystem;
import org.kie.api.builder.model.KieBaseModel;
import org.kie.api.builder.model.KieModuleModel;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.rule.EntryPoint;
import org.kie.api.runtime.rule.QueryResults;
import org.kie.internal.io.ResourceFactory;

/**
* Tests queries using temporal operators on events from two entry points.
*/
public class CepQueryTest extends CommonTestMethodBase {

private KieSession ksession;

private SessionPseudoClock clock;

private EntryPoint firstEntryPoint, secondEntryPoint;

@Before
public void prepare() {
final String drl = "package org.drools.compiler.integrationtests\n" +
"import org.drools.compiler.integrationtests.CepQueryTest.TestEvent;\n" +
"declare TestEvent\n" +
" @role( event )\n" +
"end\n" +
"query EventsAfterZeroToNineSeconds\n" +
" $event : TestEvent() from entry-point FirstStream\n" +
" $result : TestEvent( this after [0s, 9s] $event) from entry-point SecondStream\n" +
"end\n";

final KieServices ks = KieServices.Factory.get();
KieFileSystem kfs = ks.newKieFileSystem();
KieModuleModel module = ks.newKieModuleModel();

KieBaseModel baseModel = module.newKieBaseModel("defaultKBase")
.setDefault(true)
.setEventProcessingMode(EventProcessingOption.STREAM);
baseModel.newKieSessionModel("defaultKSession")
.setDefault(true)
.setClockType(ClockTypeOption.get("pseudo"));

kfs.writeKModuleXML(module.toXML());
kfs.write(ResourceFactory.newByteArrayResource( drl.getBytes() ).setTargetPath("defaultPkg/query.drl") );

assertTrue(ks.newKieBuilder(kfs).buildAll().getResults().getMessages().isEmpty());
ksession = ks.newKieContainer(ks.getRepository().getDefaultReleaseId()).newKieSession();

clock = ksession.getSessionClock();
firstEntryPoint = ksession.getEntryPoint("FirstStream");
secondEntryPoint = ksession.getEntryPoint("SecondStream");
}

@After
public void cleanup() {
if (ksession != null) {
ksession.dispose();
}
}

private void eventsInitialization() {
secondEntryPoint.insert(new TestEvent("minusOne"));
clock.advanceTime(5, TimeUnit.SECONDS);

firstEntryPoint.insert(new TestEvent("zero"));
secondEntryPoint.insert(new TestEvent("one"));
// clock.advanceTime(10, TimeUnit.SECONDS);
//
// secondEntryPoint.insert(new TestEvent("two"));
// clock.advanceTime(10, TimeUnit.SECONDS);
//
// secondEntryPoint.insert(new TestEvent("three"));
// ksession.fireAllRules();
}

/**
* Tests query using temporal operator 'after' on events from two entry points.
*/
@Test//(timeout=10000)
public void testQueryWithAfter() {
this.eventsInitialization();
QueryResults results = ksession.getQueryResults("EventsAfterZeroToNineSeconds");

assertEquals("Unexpected query result length", 1, results.size());
assertEquals("Unexpected query result content",
"one", ((TestEvent) results.iterator().next().get("$result")).getName());
}

/**
* Simple event used in the test.
*/
public static class TestEvent {
private final String name;

public TestEvent(final String name) {
this.name = name;
}

public String getName() {
return this.name;
}
@Override
public String toString() {
return "TestEvent["+name+"]";
}
}
}
Expand Up @@ -56,7 +56,12 @@
import org.drools.core.base.NonCloningQueryViewListener;
import org.drools.core.base.QueryRowWithSubruleIndex;
import org.drools.core.base.StandardQueryViewChangedEventListener;
import org.drools.core.event.*;
import org.drools.core.event.AgendaEventListener;
import org.drools.core.event.AgendaEventSupport;
import org.drools.core.event.RuleBaseEventListener;
import org.drools.core.event.RuleEventListenerSupport;
import org.drools.core.event.WorkingMemoryEventListener;
import org.drools.core.event.WorkingMemoryEventSupport;
import org.drools.core.impl.EnvironmentFactory;
import org.drools.core.impl.StatefulKnowledgeSessionImpl;
import org.drools.core.management.DroolsManagementAgent;
Expand All @@ -69,6 +74,7 @@
import org.drools.core.marshalling.impl.ProtobufMessages.ActionQueue.Assert;
import org.drools.core.phreak.RuleAgendaItem;
import org.drools.core.phreak.SegmentUtilities;
import org.drools.core.phreak.StackEntry;
import org.drools.core.reteoo.EntryPointNode;
import org.drools.core.reteoo.InitialFactImpl;
import org.drools.core.reteoo.LeftInputAdapterNode;
Expand All @@ -85,8 +91,10 @@
import org.drools.core.reteoo.RuleTerminalNode;
import org.drools.core.reteoo.SegmentMemory;
import org.drools.core.reteoo.TerminalNode;
import org.drools.core.rule.*;
import org.drools.core.rule.Declaration;
import org.drools.core.rule.EntryPointId;
import org.drools.core.rule.Package;
import org.drools.core.rule.Rule;
import org.drools.core.runtime.impl.ExecutionResultImpl;
import org.drools.core.runtime.process.InternalProcessRuntime;
import org.drools.core.runtime.process.ProcessRuntimeFactory;
Expand All @@ -105,28 +113,29 @@
import org.drools.core.time.TimerServiceFactory;
import org.drools.core.type.DateFormats;
import org.drools.core.type.DateFormatsImpl;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.event.process.ProcessEventListener;
import org.kie.api.event.process.ProcessEventManager;
import org.kie.api.marshalling.Marshaller;
import org.kie.api.marshalling.ObjectMarshallingStrategy;
import org.kie.api.marshalling.ObjectMarshallingStrategyStore;
import org.kie.api.runtime.ObjectFilter;
import org.kie.api.runtime.rule.LiveQuery;
import org.kie.api.runtime.rule.ViewChangedEventListener;
import org.kie.internal.event.rule.RuleEventListener;
import org.kie.internal.marshalling.MarshallerFactory;
import org.kie.internal.process.CorrelationAwareProcessRuntime;
import org.kie.internal.process.CorrelationKey;
import org.kie.api.runtime.Calendars;
import org.kie.api.runtime.Channel;
import org.kie.api.runtime.Environment;
import org.kie.api.runtime.EnvironmentName;
import org.kie.api.runtime.ExecutionResults;
import org.kie.api.runtime.Globals;
import org.kie.api.runtime.ObjectFilter;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.api.runtime.process.WorkItemHandler;
import org.kie.api.runtime.process.WorkItemManager;
import org.kie.api.runtime.rule.LiveQuery;
import org.kie.api.runtime.rule.ViewChangedEventListener;
import org.kie.api.time.SessionClock;
import org.kie.internal.event.rule.RuleEventListener;
import org.kie.internal.marshalling.MarshallerFactory;
import org.kie.internal.process.CorrelationAwareProcessRuntime;
import org.kie.internal.process.CorrelationKey;
import org.kie.internal.runtime.StatefulKnowledgeSession;

/**
Expand Down Expand Up @@ -529,15 +538,27 @@ private BaseNode[] evalQuery(String queryName, DroolsQuery queryObject, Internal
if ( lsmem == null ) {
lsmem = SegmentUtilities.createSegmentMemory(lts, this);
}
LeftInputAdapterNode.doInsertObject( handle, pCtx, lian, this, lmem, false, queryObject.isOpen() );

if( this.ruleBase.getConfiguration().getEventProcessingMode().equals(EventProcessingOption.STREAM) ) {
lmem.linkNode(this);
List<PathMemory> pmems = lmem.getSegmentMemory().getPathMemories();
PathMemory pmm = pmems!=null && !pmems.isEmpty() ? pmems.get(0) : null;
if( pmm != null ) {
RuleAgendaItem item = pmm.getRuleAgendaItem();
item.getRuleExecutor().reEvaluateNetwork( this, new org.drools.core.util.LinkedList<StackEntry>(), false);
}
}

LeftInputAdapterNode.doInsertObject( handle, pCtx, lian, this, lmem, false, queryObject.isOpen() );

List<PathMemory> pmems = lmem.getSegmentMemory().getPathMemories();
for ( int i = 0, length = pmems.size(); i < length; i++ ) {
PathMemory rm = pmems.get( i );
RuleAgendaItem evaluator = agenda.createRuleAgendaItem(Integer.MAX_VALUE, rm, (TerminalNode) rm.getNetworkNode());
evaluator.getRuleExecutor().setDirty(true);
evaluator.getRuleExecutor().evaluateNetworkAndFire(this, null, 0, -1);
}

return tnodes;
}

Expand Down

0 comments on commit 86be1a8

Please sign in to comment.