From bda07c5389ebfec005e4f36672b55d79af7f95b5 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Sun, 2 Oct 2016 21:21:10 -0700 Subject: [PATCH] working low-level kludge coded up in a test Next up: refactor this working code into a clean API. --- .../test/WindowConnectedComponentsTest.java | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 src/test/java/org/apache/flink/graph/streaming/example/test/WindowConnectedComponentsTest.java diff --git a/src/test/java/org/apache/flink/graph/streaming/example/test/WindowConnectedComponentsTest.java b/src/test/java/org/apache/flink/graph/streaming/example/test/WindowConnectedComponentsTest.java new file mode 100644 index 0000000..d538f2b --- /dev/null +++ b/src/test/java/org/apache/flink/graph/streaming/example/test/WindowConnectedComponentsTest.java @@ -0,0 +1,158 @@ +package org.apache.flink.graph.streaming.example.test; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.streaming.EdgesFold; +import org.apache.flink.graph.streaming.GraphStream; +import org.apache.flink.graph.streaming.SimpleEdgeStream; +import org.apache.flink.graph.streaming.example.util.DisjointSet; +import org.apache.flink.graph.streaming.library.ConnectedComponents; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Assert; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class WindowConnectedComponentsTest extends StreamingProgramTestBase { + public static final String Connected_RESULT = + "{1=[1, 2, 3]}\n" + + "{1=[1, 2, 3, 5]}\n" + +"{1=[1, 5], 6=[6, 7]}\n" + +"{6=[6, 7], 8=[8, 9]}\n" + +"{8=[8, 9]}"; + protected String resultPath; + + @SuppressWarnings("serial") + private static DataStream> getGraphStream(StreamExecutionEnvironment env) { + return env.fromCollection(getEdges()); + } + + public static final List> getEdges() { + List> edges = new ArrayList<>(); + edges.add(new Edge<>(1L, 2L, NullValue.getInstance())); + edges.add(new Edge<>(1L, 3L, NullValue.getInstance())); + edges.add(new Edge<>(2L, 3L, NullValue.getInstance())); + edges.add(new Edge<>(1L, 5L, NullValue.getInstance())); + edges.add(new Edge<>(6L, 7L, NullValue.getInstance())); + edges.add(new Edge<>(8L, 9L, NullValue.getInstance())); + return edges; + } + + public static final String[] parser(ArrayList list) { + int s = list.size(); + String r = list.get(s - 1); // to get the final combine result which is stored at the end of result + String t; + list.clear(); + String[] G = r.split("="); + for (int i = 0; i < G.length; i++) { + if (G[i].contains("[")) { + String[] k = G[i].split("]"); + t = k[0].substring(1, k[0].length()); + list.add(t); + } + } + String[] result = list.toArray(new String[list.size()]); + Arrays.sort(result); + return result; + } + + @Override + protected void preSubmit() throws Exception { + setParallelism(1); //needed to ensure total ordering for windows + resultPath = getTempDirPath("output"); + } + + @Override + protected void postSubmit() throws Exception { + String expectedResultStr = Connected_RESULT; + String[] excludePrefixes = new String[0]; + ArrayList list = new ArrayList(); + readAllResultLines(list, resultPath, excludePrefixes, false); + String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n"); + ArrayList expectedArr = new ArrayList(Arrays.asList(expected)); + Assert.assertEquals(expectedArr, list); + // Assert.assertEquals("Different number of lines in expected and obtained result.", expectedArr.size(), list.size()); + } + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream> edges = getGraphStream(env); + // GraphStream graph = new SimpleEdgeStream<>(edges, env); + + TypeInformation>> typeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, edges.getType()); + edges + .assignTimestampsAndWatermarks(new TargetAssigner()) + .map(new InitialMapper()) + .returns(typeInfo) + .keyBy(0) + .window(SlidingEventTimeWindows.of(Time.milliseconds(4), Time.milliseconds(2))) + .fold(new DisjointSet(), + new FoldFunction>, DisjointSet>() { + @Override + public DisjointSet fold(DisjointSet ds, Tuple2> val) + throws Exception { + ds.union(val.f1.getSource(), val.f1.getTarget()); + return ds; + } + }) + .map(new MapFunction, String>() { + @Override + public String map(DisjointSet ds) { + return ds.toString(); + } + }) + .writeAsText(resultPath); + + env.execute("Streaming Connected ComponentsCheck"); + } + + private static final class InitialMapper extends RichMapFunction, Tuple2>> { + + private int partitionIndex; + + @Override + public void open(Configuration parameters) throws Exception { + this.partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + } + + @Override + public Tuple2> map(Edge edge) throws Exception { + return new Tuple2<>(partitionIndex, edge); + } + } + + private static final class TargetAssigner implements AssignerWithPeriodicWatermarks> { + + private long currentMaxTimestamp = 1; + + public long extractTimestamp(Edge e, long previes) { + long timestamp = e.getTarget(); + currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp); + return timestamp; + } + + public Watermark getCurrentWatermark() { + return new Watermark(currentMaxTimestamp); + } + } + +} +