From 724959d6f743287e795a823607a5153513551b42 Mon Sep 17 00:00:00 2001 From: Bob Thorman Date: Mon, 27 Jun 2016 15:34:31 -0500 Subject: [PATCH 1/4] FLINK-3319 Added or function to the CEP pattern --- .../flink/cep/pattern/OrFilterFunction.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java new file mode 100644 index 0000000000000..e1b3ef75609e3 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A filter function which combines two filter functions with a logical and. Thus, the filter + * function only returns true, iff both filters return true. + * + * @param Type of the element to filter + */ +public class OrFilterFunction implements FilterFunction { + private static final long serialVersionUID = -2109562093871155005L; + + private final FilterFunction left; + private final FilterFunction right; + + public OrFilterFunction(final FilterFunction left, final FilterFunction right) { + this.left = left; + this.right = right; + } + + @Override + public boolean filter(T value) throws Exception { + return left.filter(value) || right.filter(value); + } + + public FilterFunction getLeft() { + return left; + } + + public FilterFunction getRight() { + return right; + } +} From f4737bf2a1886a3a39be4eb3abdd6155a013a248 Mon Sep 17 00:00:00 2001 From: Bob Thorman Date: Mon, 27 Jun 2016 15:35:27 -0500 Subject: [PATCH 2/4] FLINK-3319 updated for the or function in CEP pattern --- .../org/apache/flink/cep/pattern/Pattern.java | 18 +++ .../java/org/apache/flink/cep/CEPITCase.java | 115 ++++++++---------- .../apache/flink/cep/pattern/PatternTest.java | 41 +++++++ 3 files changed, 108 insertions(+), 66 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 696518e2d4e1c..14aed5de5d410 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -92,6 +92,24 @@ public Pattern where(FilterFunction newFilterFunction) { return this; } + /** + * Specifies a filter condition if fulfilled by an event will match. + * + * @param newFilterFunction Filter condition + * @return The same pattern operator where the new filter condition is set + */ + public Pattern or(FilterFunction newFilterFunction) { + ClosureCleaner.clean(newFilterFunction, true); + + if (this.filterFunction == null) { + this.filterFunction = newFilterFunction; + } else { + this.filterFunction = new OrFilterFunction<>(this.filterFunction, newFilterFunction); + } + + return this; + } + /** * Applies a subtype constraint on the current pattern operator. This means that an event has * to be of the given subtype in order to be matched. diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 29044d821df1b..a55e9eeee5e35 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.types.Either; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -424,87 +423,71 @@ public Integer select(Map pattern) throws Exception { env.execute(); } + /** + * Checks that a certain event sequence is recognized with an OR filter + * @throws Exception + */ @Test - public void testTimeoutHandling() throws Exception { + public void testSimpleOrFilterPatternCEP() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // (Event, timestamp) DataStream input = env.fromElements( - Tuple2.of(new Event(1, "start", 1.0), 1L), - Tuple2.of(new Event(1, "middle", 2.0), 5L), - Tuple2.of(new Event(1, "start", 2.0), 4L), - Tuple2.of(new Event(1, "end", 2.0), 6L) - ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks>() { - - @Override - public long extractTimestamp(Tuple2 element, long currentTimestamp) { - return element.f1; - } - - @Override - public Watermark checkAndGetNextWatermark(Tuple2 lastElement, long extractedTimestamp) { - return new Watermark(lastElement.f1 - 5); - } - - }).map(new MapFunction, Event>() { - - @Override - public Event map(Tuple2 value) throws Exception { - return value.f0; - } - }); + new Event(1, "start", 1.0), + new Event(2, "middle", 2.0), + new Event(3, "end", 3.0), + new Event(4, "start", 4.0), + new Event(5, "middle", 5.0), + new Event(6, "end", 6.0) + ); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start") + .where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle") + .where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() == 2.0; + } + }) + .or(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() == 5.0; + } + }) + .followedBy("end").where(new FilterFunction() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("start"); - } - }).followedBy("middle").where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("middle"); - } - }).followedBy("end").where(new FilterFunction() { + DataStream result = CEP.pattern(input, pattern).select(new PatternSelectFunction() { @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }).within(Time.milliseconds(3)); - - DataStream> result = CEP.pattern(input, pattern).select( - new PatternTimeoutFunction() { - @Override - public String timeout(Map pattern, long timeoutTimestamp) throws Exception { - return pattern.get("start").getPrice() + ""; - } - }, - new PatternSelectFunction() { - - @Override - public String select(Map pattern) { - StringBuilder builder = new StringBuilder(); + public String select(Map pattern) { + StringBuilder builder = new StringBuilder(); - builder.append(pattern.get("start").getPrice()).append(",") - .append(pattern.get("middle").getPrice()).append(",") - .append(pattern.get("end").getPrice()); + builder.append(pattern.get("start").getId()).append(",") + .append(pattern.get("middle").getId()).append(",") + .append(pattern.get("end").getId()); - return builder.toString(); - } + return builder.toString(); } - ); + }); result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - // the expected sequences of matching event ids - expected = "Left(1.0)\nRight(2.0,2.0,2.0)"; + // expected sequence of matching event ids + expected = "1,5,6\n1,2,3\n4,5,6\n1,2,6"; env.execute(); - - } + } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 2edf005f197ea..77a4b10b1a7a8 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -142,4 +142,45 @@ public boolean filter(SubEvent value) throws Exception { assertEquals(previous.getName(), "subevent"); assertEquals(previous2.getName(), "start"); } + + @Test + public void testPatternWithOrFilter() { + Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).or(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).next("or").where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).or(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).followedBy("end"); + + Pattern previous; + Pattern previous2; + + assertNotNull(previous = pattern.getPrevious()); + assertNotNull(previous2 = previous.getPrevious()); + assertNull(previous2.getPrevious()); + + assertTrue(pattern instanceof FollowedByPattern); + assertNotNull(previous.getFilterFunction() instanceof OrFilterFunction); + assertNotNull(previous2.getFilterFunction() instanceof OrFilterFunction); + + assertEquals(pattern.getName(), "end"); + assertEquals(previous.getName(), "or"); + assertEquals(previous2.getName(), "start"); + } + } From ec6be7f39a7f4792ff9d7a029e0b1a773266a61c Mon Sep 17 00:00:00 2001 From: Bob Thorman Date: Mon, 27 Jun 2016 15:34:31 -0500 Subject: [PATCH 3/4] FLINK-3319 Added or function to the CEP pattern --- .../flink/cep/pattern/OrFilterFunction.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java new file mode 100644 index 0000000000000..e1b3ef75609e3 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A filter function which combines two filter functions with a logical and. Thus, the filter + * function only returns true, iff both filters return true. + * + * @param Type of the element to filter + */ +public class OrFilterFunction implements FilterFunction { + private static final long serialVersionUID = -2109562093871155005L; + + private final FilterFunction left; + private final FilterFunction right; + + public OrFilterFunction(final FilterFunction left, final FilterFunction right) { + this.left = left; + this.right = right; + } + + @Override + public boolean filter(T value) throws Exception { + return left.filter(value) || right.filter(value); + } + + public FilterFunction getLeft() { + return left; + } + + public FilterFunction getRight() { + return right; + } +} From c4a46f90417ac34f3f237c8aecdd7548cfd6ce8a Mon Sep 17 00:00:00 2001 From: Bob Thorman Date: Mon, 27 Jun 2016 15:35:27 -0500 Subject: [PATCH 4/4] FLINK-3319 updated for the or function in CEP pattern --- .../org/apache/flink/cep/pattern/Pattern.java | 18 +++ .../java/org/apache/flink/cep/CEPITCase.java | 115 ++++++++---------- .../apache/flink/cep/pattern/PatternTest.java | 41 +++++++ 3 files changed, 108 insertions(+), 66 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 696518e2d4e1c..14aed5de5d410 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -92,6 +92,24 @@ public Pattern where(FilterFunction newFilterFunction) { return this; } + /** + * Specifies a filter condition if fulfilled by an event will match. + * + * @param newFilterFunction Filter condition + * @return The same pattern operator where the new filter condition is set + */ + public Pattern or(FilterFunction newFilterFunction) { + ClosureCleaner.clean(newFilterFunction, true); + + if (this.filterFunction == null) { + this.filterFunction = newFilterFunction; + } else { + this.filterFunction = new OrFilterFunction<>(this.filterFunction, newFilterFunction); + } + + return this; + } + /** * Applies a subtype constraint on the current pattern operator. This means that an event has * to be of the given subtype in order to be matched. diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 29044d821df1b..a55e9eeee5e35 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.types.Either; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -424,87 +423,71 @@ public Integer select(Map pattern) throws Exception { env.execute(); } + /** + * Checks that a certain event sequence is recognized with an OR filter + * @throws Exception + */ @Test - public void testTimeoutHandling() throws Exception { + public void testSimpleOrFilterPatternCEP() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // (Event, timestamp) DataStream input = env.fromElements( - Tuple2.of(new Event(1, "start", 1.0), 1L), - Tuple2.of(new Event(1, "middle", 2.0), 5L), - Tuple2.of(new Event(1, "start", 2.0), 4L), - Tuple2.of(new Event(1, "end", 2.0), 6L) - ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks>() { - - @Override - public long extractTimestamp(Tuple2 element, long currentTimestamp) { - return element.f1; - } - - @Override - public Watermark checkAndGetNextWatermark(Tuple2 lastElement, long extractedTimestamp) { - return new Watermark(lastElement.f1 - 5); - } - - }).map(new MapFunction, Event>() { - - @Override - public Event map(Tuple2 value) throws Exception { - return value.f0; - } - }); + new Event(1, "start", 1.0), + new Event(2, "middle", 2.0), + new Event(3, "end", 3.0), + new Event(4, "start", 4.0), + new Event(5, "middle", 5.0), + new Event(6, "end", 6.0) + ); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start") + .where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle") + .where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() == 2.0; + } + }) + .or(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() == 5.0; + } + }) + .followedBy("end").where(new FilterFunction() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("start"); - } - }).followedBy("middle").where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("middle"); - } - }).followedBy("end").where(new FilterFunction() { + DataStream result = CEP.pattern(input, pattern).select(new PatternSelectFunction() { @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }).within(Time.milliseconds(3)); - - DataStream> result = CEP.pattern(input, pattern).select( - new PatternTimeoutFunction() { - @Override - public String timeout(Map pattern, long timeoutTimestamp) throws Exception { - return pattern.get("start").getPrice() + ""; - } - }, - new PatternSelectFunction() { - - @Override - public String select(Map pattern) { - StringBuilder builder = new StringBuilder(); + public String select(Map pattern) { + StringBuilder builder = new StringBuilder(); - builder.append(pattern.get("start").getPrice()).append(",") - .append(pattern.get("middle").getPrice()).append(",") - .append(pattern.get("end").getPrice()); + builder.append(pattern.get("start").getId()).append(",") + .append(pattern.get("middle").getId()).append(",") + .append(pattern.get("end").getId()); - return builder.toString(); - } + return builder.toString(); } - ); + }); result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - // the expected sequences of matching event ids - expected = "Left(1.0)\nRight(2.0,2.0,2.0)"; + // expected sequence of matching event ids + expected = "1,5,6\n1,2,3\n4,5,6\n1,2,6"; env.execute(); - - } + } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 2edf005f197ea..77a4b10b1a7a8 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -142,4 +142,45 @@ public boolean filter(SubEvent value) throws Exception { assertEquals(previous.getName(), "subevent"); assertEquals(previous2.getName(), "start"); } + + @Test + public void testPatternWithOrFilter() { + Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).or(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).next("or").where(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).or(new FilterFunction() { + @Override + public boolean filter(Event value) throws Exception { + return false; + } + }).followedBy("end"); + + Pattern previous; + Pattern previous2; + + assertNotNull(previous = pattern.getPrevious()); + assertNotNull(previous2 = previous.getPrevious()); + assertNull(previous2.getPrevious()); + + assertTrue(pattern instanceof FollowedByPattern); + assertNotNull(previous.getFilterFunction() instanceof OrFilterFunction); + assertNotNull(previous2.getFilterFunction() instanceof OrFilterFunction); + + assertEquals(pattern.getName(), "end"); + assertEquals(previous.getName(), "or"); + assertEquals(previous2.getName(), "start"); + } + }