diff --git a/drools-compiler/src/test/java/org/drools/compiler/integrationtests/CepQueryTest.java b/drools-compiler/src/test/java/org/drools/compiler/integrationtests/CepQueryTest.java new file mode 100644 index 00000000000..bed81fd8c9c --- /dev/null +++ b/drools-compiler/src/test/java/org/drools/compiler/integrationtests/CepQueryTest.java @@ -0,0 +1,119 @@ +package org.drools.compiler.integrationtests; + +import java.util.concurrent.TimeUnit; + +import org.drools.compiler.CommonTestMethodBase; +import org.drools.core.time.SessionPseudoClock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.kie.api.KieServices; +import org.kie.api.builder.KieFileSystem; +import org.kie.api.builder.model.KieBaseModel; +import org.kie.api.builder.model.KieModuleModel; +import org.kie.api.conf.EventProcessingOption; +import org.kie.api.runtime.KieSession; +import org.kie.api.runtime.conf.ClockTypeOption; +import org.kie.api.runtime.rule.EntryPoint; +import org.kie.api.runtime.rule.QueryResults; +import org.kie.internal.io.ResourceFactory; + +/** + * Tests queries using temporal operators on events from two entry points. + */ +public class CepQueryTest extends CommonTestMethodBase { + + private KieSession ksession; + + private SessionPseudoClock clock; + + private EntryPoint firstEntryPoint, secondEntryPoint; + + @Before + public void prepare() { + final String drl = "package org.drools.compiler.integrationtests\n" + + "import org.drools.compiler.integrationtests.CepQueryTest.TestEvent;\n" + + "declare TestEvent\n" + + " @role( event )\n" + + "end\n" + + "query EventsAfterZeroToNineSeconds\n" + + " $event : TestEvent() from entry-point FirstStream\n" + + " $result : TestEvent( this after [0s, 9s] $event) from entry-point SecondStream\n" + + "end\n"; + + final KieServices ks = KieServices.Factory.get(); + KieFileSystem kfs = ks.newKieFileSystem(); + KieModuleModel module = ks.newKieModuleModel(); + + KieBaseModel baseModel = module.newKieBaseModel("defaultKBase") + .setDefault(true) + .setEventProcessingMode(EventProcessingOption.STREAM); + baseModel.newKieSessionModel("defaultKSession") + .setDefault(true) + .setClockType(ClockTypeOption.get("pseudo")); + + kfs.writeKModuleXML(module.toXML()); + kfs.write(ResourceFactory.newByteArrayResource( drl.getBytes() ).setTargetPath("defaultPkg/query.drl") ); + + assertTrue(ks.newKieBuilder(kfs).buildAll().getResults().getMessages().isEmpty()); + ksession = ks.newKieContainer(ks.getRepository().getDefaultReleaseId()).newKieSession(); + + clock = ksession.getSessionClock(); + firstEntryPoint = ksession.getEntryPoint("FirstStream"); + secondEntryPoint = ksession.getEntryPoint("SecondStream"); + } + + @After + public void cleanup() { + if (ksession != null) { + ksession.dispose(); + } + } + + private void eventsInitialization() { + secondEntryPoint.insert(new TestEvent("minusOne")); + clock.advanceTime(5, TimeUnit.SECONDS); + + firstEntryPoint.insert(new TestEvent("zero")); + secondEntryPoint.insert(new TestEvent("one")); +// clock.advanceTime(10, TimeUnit.SECONDS); +// +// secondEntryPoint.insert(new TestEvent("two")); +// clock.advanceTime(10, TimeUnit.SECONDS); +// +// secondEntryPoint.insert(new TestEvent("three")); +// ksession.fireAllRules(); + } + + /** + * Tests query using temporal operator 'after' on events from two entry points. + */ + @Test//(timeout=10000) + public void testQueryWithAfter() { + this.eventsInitialization(); + QueryResults results = ksession.getQueryResults("EventsAfterZeroToNineSeconds"); + + assertEquals("Unexpected query result length", 1, results.size()); + assertEquals("Unexpected query result content", + "one", ((TestEvent) results.iterator().next().get("$result")).getName()); + } + + /** + * Simple event used in the test. + */ + public static class TestEvent { + private final String name; + + public TestEvent(final String name) { + this.name = name; + } + + public String getName() { + return this.name; + } + @Override + public String toString() { + return "TestEvent["+name+"]"; + } + } +} \ No newline at end of file diff --git a/drools-core/src/main/java/org/drools/core/common/AbstractWorkingMemory.java b/drools-core/src/main/java/org/drools/core/common/AbstractWorkingMemory.java index 2390d37429c..a19c86c1853 100644 --- a/drools-core/src/main/java/org/drools/core/common/AbstractWorkingMemory.java +++ b/drools-core/src/main/java/org/drools/core/common/AbstractWorkingMemory.java @@ -56,7 +56,12 @@ import org.drools.core.base.NonCloningQueryViewListener; import org.drools.core.base.QueryRowWithSubruleIndex; import org.drools.core.base.StandardQueryViewChangedEventListener; -import org.drools.core.event.*; +import org.drools.core.event.AgendaEventListener; +import org.drools.core.event.AgendaEventSupport; +import org.drools.core.event.RuleBaseEventListener; +import org.drools.core.event.RuleEventListenerSupport; +import org.drools.core.event.WorkingMemoryEventListener; +import org.drools.core.event.WorkingMemoryEventSupport; import org.drools.core.impl.EnvironmentFactory; import org.drools.core.impl.StatefulKnowledgeSessionImpl; import org.drools.core.management.DroolsManagementAgent; @@ -69,6 +74,7 @@ import org.drools.core.marshalling.impl.ProtobufMessages.ActionQueue.Assert; import org.drools.core.phreak.RuleAgendaItem; import org.drools.core.phreak.SegmentUtilities; +import org.drools.core.phreak.StackEntry; import org.drools.core.reteoo.EntryPointNode; import org.drools.core.reteoo.InitialFactImpl; import org.drools.core.reteoo.LeftInputAdapterNode; @@ -85,8 +91,10 @@ import org.drools.core.reteoo.RuleTerminalNode; import org.drools.core.reteoo.SegmentMemory; import org.drools.core.reteoo.TerminalNode; -import org.drools.core.rule.*; +import org.drools.core.rule.Declaration; +import org.drools.core.rule.EntryPointId; import org.drools.core.rule.Package; +import org.drools.core.rule.Rule; import org.drools.core.runtime.impl.ExecutionResultImpl; import org.drools.core.runtime.process.InternalProcessRuntime; import org.drools.core.runtime.process.ProcessRuntimeFactory; @@ -105,28 +113,29 @@ import org.drools.core.time.TimerServiceFactory; import org.drools.core.type.DateFormats; import org.drools.core.type.DateFormatsImpl; +import org.kie.api.conf.EventProcessingOption; import org.kie.api.event.process.ProcessEventListener; import org.kie.api.event.process.ProcessEventManager; import org.kie.api.marshalling.Marshaller; import org.kie.api.marshalling.ObjectMarshallingStrategy; import org.kie.api.marshalling.ObjectMarshallingStrategyStore; -import org.kie.api.runtime.ObjectFilter; -import org.kie.api.runtime.rule.LiveQuery; -import org.kie.api.runtime.rule.ViewChangedEventListener; -import org.kie.internal.event.rule.RuleEventListener; -import org.kie.internal.marshalling.MarshallerFactory; -import org.kie.internal.process.CorrelationAwareProcessRuntime; -import org.kie.internal.process.CorrelationKey; import org.kie.api.runtime.Calendars; import org.kie.api.runtime.Channel; import org.kie.api.runtime.Environment; import org.kie.api.runtime.EnvironmentName; import org.kie.api.runtime.ExecutionResults; import org.kie.api.runtime.Globals; +import org.kie.api.runtime.ObjectFilter; import org.kie.api.runtime.process.ProcessInstance; import org.kie.api.runtime.process.WorkItemHandler; import org.kie.api.runtime.process.WorkItemManager; +import org.kie.api.runtime.rule.LiveQuery; +import org.kie.api.runtime.rule.ViewChangedEventListener; import org.kie.api.time.SessionClock; +import org.kie.internal.event.rule.RuleEventListener; +import org.kie.internal.marshalling.MarshallerFactory; +import org.kie.internal.process.CorrelationAwareProcessRuntime; +import org.kie.internal.process.CorrelationKey; import org.kie.internal.runtime.StatefulKnowledgeSession; /** @@ -529,8 +538,19 @@ private BaseNode[] evalQuery(String queryName, DroolsQuery queryObject, Internal if ( lsmem == null ) { lsmem = SegmentUtilities.createSegmentMemory(lts, this); } - LeftInputAdapterNode.doInsertObject( handle, pCtx, lian, this, lmem, false, queryObject.isOpen() ); + if( this.ruleBase.getConfiguration().getEventProcessingMode().equals(EventProcessingOption.STREAM) ) { + lmem.linkNode(this); + List pmems = lmem.getSegmentMemory().getPathMemories(); + PathMemory pmm = pmems!=null && !pmems.isEmpty() ? pmems.get(0) : null; + if( pmm != null ) { + RuleAgendaItem item = pmm.getRuleAgendaItem(); + item.getRuleExecutor().reEvaluateNetwork( this, new org.drools.core.util.LinkedList(), false); + } + } + + LeftInputAdapterNode.doInsertObject( handle, pCtx, lian, this, lmem, false, queryObject.isOpen() ); + List pmems = lmem.getSegmentMemory().getPathMemories(); for ( int i = 0, length = pmems.size(); i < length; i++ ) { PathMemory rm = pmems.get( i ); @@ -538,6 +558,7 @@ private BaseNode[] evalQuery(String queryName, DroolsQuery queryObject, Internal evaluator.getRuleExecutor().setDirty(true); evaluator.getRuleExecutor().evaluateNetworkAndFire(this, null, 0, -1); } + return tnodes; }