Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4798] CEPITCase.testSimpleKeyedPatternCEP test failure #2843

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -32,42 +31,31 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;

import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.types.Either;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.util.Map;

@SuppressWarnings("serial")
public class CEPITCase extends StreamingMultipleProgramsTestBase {

private String resultPath;
private String expected;

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();

@Before
public void before() throws Exception {
resultPath = tempFolder.newFile().toURI().toString();
expected = "";
}

@After
public void after() throws Exception {
compareResultsByLinesInMemory(expected, resultPath);
}

/**
* Checks that a certain event sequence is recognized
* @throws Exception
*/
@Test
public void testSimplePatternCEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TestListResultSink<String> resultSink = new TestListResultSink<>();

DataStream<Event> input = env.fromElements(
new Event(1, "barfoo", 1.0),
Expand Down Expand Up @@ -119,18 +107,18 @@ public String select(Map<String, Event> pattern) {
}
});

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);

// expected sequence of matching event ids
expected = "2,6,8";

result.addSink(resultSink);
env.execute();

String expected = "2,6,8";
compareResultAsText(resultSink.getResult(), expected);
}

@Test
public void testSimpleKeyedPatternCEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
TestListResultSink<String> resultSink = new TestListResultSink<>();

DataStream<Event> input = env.fromElements(
new Event(1, "barfoo", 1.0),
Expand Down Expand Up @@ -194,18 +182,19 @@ public String select(Map<String, Event> pattern) {
}
});

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
result.addSink(resultSink);
env.execute();

// the expected sequences of matching event ids
expected = "2,2,2\n3,3,3\n42,42,42";

env.execute();
compareResultAsText(resultSink.getResult(), expected) ;
}

@Test
public void testSimplePatternEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
TestListResultSink<String> resultSink = new TestListResultSink<>();

// (Event, timestamp)
DataStream<Event> input = env.fromElements(
Expand Down Expand Up @@ -272,19 +261,20 @@ public String select(Map<String, Event> pattern) {
}
);

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
result.addSink(resultSink);
env.execute();

// the expected sequence of matching event ids
expected = "1,5,4";

env.execute();
compareResultAsText(resultSink.getResult(), expected);
}

@Test
public void testSimpleKeyedPatternEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(2);
TestListResultSink<String> resultSink = new TestListResultSink<>();

// (Event, timestamp)
DataStream<Event> input = env.fromElements(
Expand Down Expand Up @@ -361,17 +351,18 @@ public String select(Map<String, Event> pattern) {
}
);

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
result.addSink(resultSink);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using DataStreamUtils#collect() here? Then we wouldn't need to move that sink class around.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find any of useDataStreamUtils#collect() in Flink tests.
DataStreamUtils#collect() places in <artifactId>flink-contrib</artifactId> module. Are you sure that is a good idea to depend on it?

I moved classes because I saw at least four implementations String\List sinks for testing purpose. I guess it's a good idea to migrate to one of them and get rid of usage temp files where possible.

What do you think? I'm newbie in a Flink and maybe wrong understand smth

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't aware that it sits in flink-contrib, nevermind my previous comment then.

env.execute();

// the expected sequences of matching event ids
expected = "1,1,1\n2,2,2";

env.execute();
compareResultAsText(resultSink.getResult(), expected);
}

@Test
public void testSimplePatternWithSingleState() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TestListResultSink<Tuple2<Integer, Integer>> resultSink = new TestListResultSink<>();
DataStream<Tuple2<Integer, Integer>> input = env.fromElements(
new Tuple2<>(0, 1),
new Tuple2<>(0, 2));
Expand All @@ -394,17 +385,18 @@ public Tuple2<Integer, Integer> select(Map<String, Tuple2<Integer, Integer>> pat
}
});

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
result.addSink(resultSink);
env.execute();

expected = "(0,1)";

env.execute();
compareResultAsText(resultSink.getResult(), expected);
}

@Test
public void testProcessingTimeWithWindow() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
TestListResultSink<Integer> resultSink = new TestListResultSink<>();

DataStream<Integer> input = env.fromElements(1, 2);

Expand All @@ -417,18 +409,19 @@ public Integer select(Map<String, Integer> pattern) throws Exception {
}
});

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
result.addSink(resultSink);
env.execute();

expected = "3";

env.execute();
compareResultAsText(resultSink.getResult(), expected);
}

@Test
public void testTimeoutHandling() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
TestListResultSink<Either<String, String>> resultSink = new TestListResultSink<>();

// (Event, timestamp)
DataStream<Event> input = env.fromElements(
Expand Down Expand Up @@ -498,12 +491,12 @@ public String select(Map<String, Event> pattern) {
}
);

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
result.addSink(resultSink);
env.execute();

// the expected sequences of matching event ids
expected = "Left(1.0)\nRight(2.0,2.0,2.0)";

env.execute();
compareResultAsText(resultSink.getResult(), expected);
}

/**
Expand All @@ -513,6 +506,7 @@ public String select(Map<String, Event> pattern) {
@Test
public void testSimpleOrFilterPatternCEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TestListResultSink<String> resultSink = new TestListResultSink<>();

DataStream<Event> input = env.fromElements(
new Event(1, "start", 1.0),
Expand Down Expand Up @@ -565,11 +559,11 @@ public String select(Map<String, Event> pattern) {
}
});

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);

result.addSink(resultSink);
env.execute();
// expected sequence of matching event ids
expected = "1,5,6\n1,2,3\n4,5,6\n1,2,6";
compareResultAsText(resultSink.getResult(), expected);

env.execute();
}
}
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.test.streaming.runtime.util;
package org.apache.flink.streaming.util;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.test.streaming.runtime.util;
package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -61,4 +61,4 @@ public List<?> getList(int listId) {
return list;
}

}
}
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.util.Collector;

import org.junit.Test;
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.streaming.util.TestListResultSink;

import org.junit.Test;

Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.streaming.util.TestListResultSink;

import org.junit.Test;

Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.streaming.util.TestListResultSink;

import org.junit.Test;

Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.util.Collector;

import org.junit.Test;
Expand Down