Skip to content

Commit

Permalink
[hotfix] [cep] Fix afterMatchStrategy parameter missing issue
Browse files Browse the repository at this point in the history
This closes #4673
  • Loading branch information
yestinchen authored and dawidwys committed Sep 17, 2017
1 parent 258b385 commit 1269f75
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Expand Up @@ -329,7 +329,7 @@ private PriorityQueue<Long> getSortedTimestamps() throws Exception {
*/
private void processEvent(NFA<IN> nfa, IN event, long timestamp) {
Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
nfa.process(event, timestamp);
nfa.process(event, timestamp, afterMatchSkipStrategy);

try {
processMatchedSequences(patterns.f0, timestamp);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
Expand Down Expand Up @@ -679,4 +680,38 @@ public int compare(Event o1, Event o2) {
return Double.compare(o1.getPrice(), o2.getPrice());
}
}

@Test
public void testSimpleAfterMatchSkip() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, String>> input = env.fromElements(
new Tuple2<>(1, "a"),
new Tuple2<>(2, "a"),
new Tuple2<>(3, "a"),
new Tuple2<>(4, "a"));

Pattern<Tuple2<Integer, String>, ?> pattern =
Pattern.<Tuple2<Integer, String>>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition<Tuple2<Integer, String>>() {
@Override
public boolean filter(Tuple2<Integer, String> rec) throws Exception {
return rec.f1.equals("a");
}
}).times(2);

PatternStream<Tuple2<Integer, String>> pStream = CEP.pattern(input, pattern);

DataStream<Tuple2<Integer, String>> result = pStream.select(new PatternSelectFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> select(Map<String, List<Tuple2<Integer, String>>> pattern) throws Exception {
return pattern.get("start").get(0);
}
});

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);

expected = "(1,a)\n(3,a)";

env.execute();
}
}

0 comments on commit 1269f75

Please sign in to comment.