Skip to content

Commit

Permalink
[hotfix][cep] Introduced nfa test harness
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Jan 9, 2019
1 parent 5e46487 commit abe7ae2
Show file tree
Hide file tree
Showing 15 changed files with 417 additions and 319 deletions.
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.aftermatch.SkipPastLastStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.FlinkRuntimeException;
Expand All @@ -40,9 +40,7 @@
import java.util.Collections;
import java.util.List;

import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.junit.Assert.assertThat;

/**
Expand Down Expand Up @@ -77,9 +75,9 @@ public boolean filter(Event value) throws Exception {
}
}).times(3);

NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3),
Expand Down Expand Up @@ -141,9 +139,11 @@ public boolean filter(Event value) throws Exception {
}
});

NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
.withAfterMatchSkipStrategy(skipStrategy)
.build();

return feedNFA(streamEvents, nfa, skipStrategy);
return nfaTestHarness.feedRecords(streamEvents);
}
}

Expand Down Expand Up @@ -198,9 +198,11 @@ public boolean filter(Event value) throws Exception {
}
}).oneOrMore();

NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
.withAfterMatchSkipStrategy(skipStrategy)
.build();

return feedNFA(streamEvents, nfa, skipStrategy);
return nfaTestHarness.feedRecords(streamEvents);
}
}

Expand Down Expand Up @@ -231,9 +233,9 @@ public boolean filter(Event value) throws Exception {
}
}).times(3);

NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3),
Expand Down Expand Up @@ -275,9 +277,9 @@ public boolean filter(Event value) throws Exception {
}
}).times(2);

NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, ab2, ab3, ab4),
Expand Down Expand Up @@ -318,9 +320,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("b");
}
}).times(2);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, ab2, ab3, ab4),
Expand Down Expand Up @@ -376,9 +378,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("d");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Collections.singletonList(
Lists.newArrayList(a1, b1, c1, d1)
Expand Down Expand Up @@ -415,9 +417,9 @@ public boolean filter(Event value) throws Exception {
}
}
);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a2, b2)
Expand Down Expand Up @@ -459,9 +461,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("c");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, c1),
Expand Down Expand Up @@ -498,9 +500,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("c");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, c1),
Expand Down Expand Up @@ -543,9 +545,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("b");
}
}).oneOrMore().consecutive();
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, b1),
Expand Down Expand Up @@ -573,9 +575,9 @@ public boolean filter(Event value) throws Exception {
}
}
);
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

//skip to first element of a match should throw exception if they are enabled,
//this mode is used in MATCH RECOGNIZE which assumes that skipping to first element
Expand Down Expand Up @@ -645,9 +647,11 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("c");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
.withAfterMatchSkipStrategy(skipStrategy)
.build();

return feedNFA(streamEvents, nfa, skipStrategy);
return nfaTestHarness.feedRecords(streamEvents);
}
}

Expand Down Expand Up @@ -686,9 +690,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("b");
}
}).oneOrMore().consecutive();
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, b1),
Expand Down Expand Up @@ -728,9 +732,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Collections.singletonList(
Lists.newArrayList(a1, a2, a3, b1)
Expand Down Expand Up @@ -768,9 +772,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
Expand Down Expand Up @@ -809,9 +813,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
Expand Down Expand Up @@ -851,9 +855,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("b");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
Expand Down Expand Up @@ -913,9 +917,9 @@ public boolean filter(Event value) throws Exception {
return value.getName().contains("d");
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a, b, c1, c2, c3, d),
Expand Down Expand Up @@ -962,9 +966,9 @@ public boolean filter(Event value, Context<Event> ctx) throws Exception {
ctx.getEventsForPattern("a").iterator().next().getPrice() == value.getPrice();
}
});
NFA<Event> nfa = compile(pattern, false);
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();

List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);

compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, c1, b2),
Expand All @@ -991,22 +995,10 @@ public boolean filter(Event value) throws Exception {
}
}).times(2);

NFA<Event> nfa = compile(pattern, false);

SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
NFAState nfaState = nfa.createInitialNFAState();

for (StreamRecord<Event> inputEvent : inputEvents) {
try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
nfa.process(
sharedBufferAccessor,
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp(),
matchSkipStrategy);
}
}
NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();

nfaTestHarness.feedRecords(inputEvents);

assertThat(sharedBuffer.isEmpty(), Matchers.is(true));
}
Expand Down
Expand Up @@ -31,8 +31,8 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;

/**
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cep.pattern.GroupPattern;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;

Expand All @@ -32,8 +33,8 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -1077,7 +1078,8 @@ public boolean filter(Event value) throws Exception {

NFAState nfaState = nfa.createInitialNFAState();

final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);

compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(c, a1, b1, d),
Expand Down
Expand Up @@ -33,8 +33,8 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;

/**
Expand Down

0 comments on commit abe7ae2

Please sign in to comment.