From 07b4246e1f4398b427962879cbdbb01fde23f35b Mon Sep 17 00:00:00 2001 From: mbalassi Date: Thu, 4 Jun 2015 11:04:46 +0200 Subject: [PATCH 1/2] [FLINK-2139] [streaming] Streaming outputformat tests --- .../api/functions/sink/SocketClientSink.java | 2 + .../api/outputformat/CsvOutputFormatTest.java | 78 ++++++++++ .../outputformat/SocketOutputFormatTest.java | 64 +++++++++ .../outputformat/TextOutputFormatTest.java | 57 ++++++++ .../streaming/util/SocketOutputTestBase.java | 134 ++++++++++++++++++ .../util/SocketProgramITCaseBase.java | 2 +- .../SocketTextStreamWordCountITCase.java | 2 +- .../SocketTextStreamWordCountITCase.java | 2 +- .../flink-streaming-scala/pom.xml | 20 ++- .../scala/api/CsvOutputFormatTest.java | 67 +++++++++ .../scala/api/SocketOutputFormatTest.java | 32 +++++ .../scala/api/TextOutputFormatTest.java | 43 ++++++ .../api/scala/OutputFormatTestPrograms.scala | 76 ++++++++++ 13 files changed, 575 insertions(+), 4 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatTest.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatTest.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatTest.java create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java create mode 100644 flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatTest.java create mode 100644 flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatTest.java create mode 100644 flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatTest.java create mode 100644 flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index cd6c21cc8fea3..3fd2678872bd6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -85,6 +85,8 @@ public void invoke(IN value) { if(LOG.isErrorEnabled()){ LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e); } + throw new RuntimeException("Cannot send message \"" + value.toString() + + "\" to socket server at " + hostName + ":" + port, e); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatTest.java new file mode 100644 index 0000000000000..9724280df9d27 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.outputformat; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.util.Collector; + +public class CsvOutputFormatTest extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream text = env.fromElements(WordCountData.TEXT); + + DataStream> counts = + text.flatMap(new Tokenizer()) + .groupBy(0).sum(1); + + counts.writeAsCsv(resultPath); + + env.execute("WriteAsCsvTest"); + } + + @Override + protected void postSubmit() throws Exception { + //Strip the parentheses from the expected text like output + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES + .replaceAll("[\\\\(\\\\)]", ""), resultPath); + } + + public static final class Tokenizer implements FlatMapFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector> out) + throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + } + } + +} + diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatTest.java new file mode 100644 index 0000000000000..4df36054dfe0c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.outputformat; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.SocketOutputTestBase; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.test.testdata.WordCountData; +import org.junit.Assert; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +public class SocketOutputFormatTest extends SocketOutputTestBase { + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream text = env.fromElements(WordCountData.TEXT); + + DataStream counts = + text.flatMap(new CsvOutputFormatTest.Tokenizer()) + .groupBy(0).sum(1).map(new MapFunction, String>() { + @Override + public String map(Tuple2 value) throws Exception { + return value.toString() + "\n"; + } + }); + counts.writeToSocket(HOST, port, new DummyStringSchema()); + + env.execute("WriteToSocketTest"); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatTest.java new file mode 100644 index 0000000000000..3117fa1ac38d3 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.outputformat; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.util.Collector; + +public class TextOutputFormatTest extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream text = env.fromElements(WordCountData.TEXT); + + DataStream> counts = + text.flatMap(new CsvOutputFormatTest.Tokenizer()) + .groupBy(0).sum(1); + + counts.writeAsText(resultPath); + + env.execute("WriteAsTextTest"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java new file mode 100644 index 0000000000000..706114c088e82 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.outputformat.CsvOutputFormatTest; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.test.testdata.WordCountData; +import org.junit.Assert; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * Test base for streaming programs relying on an open server socket to write to. + */ +public abstract class SocketOutputTestBase extends StreamingProgramTestBase { + + protected static final String HOST = "localhost"; + protected static Integer port; + protected Set dataReadFromSocket = new HashSet(); + + @Override + protected void preSubmit() throws Exception { + port = NetUtils.getAvailablePort(); + temporarySocket = createLocalSocket(port); + } + + @Override + protected void postSubmit() throws Exception { + Set expectedData = new HashSet(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n"))); + Assert.assertEquals(expectedData, dataReadFromSocket); + temporarySocket.close(); + } + + protected ServerSocket temporarySocket; + + public ServerSocket createLocalSocket(int port) throws Exception { + ServerSocket serverSocket = new ServerSocket(port); + ServerThread st = new ServerThread(serverSocket); + st.start(); + return serverSocket; + } + + protected class ServerThread extends Thread { + + private ServerSocket serverSocket; + private Thread t; + + public ServerThread(ServerSocket serverSocket) { + this.serverSocket = serverSocket; + t = new Thread(this); + } + + public void waitForAccept() throws Exception { + Socket socket = serverSocket.accept(); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + DeserializationSchema schema = new DummyStringSchema(); + String rawData = in.readLine(); + while (rawData != null){ + String string = schema.deserialize(rawData.getBytes()); + dataReadFromSocket.add(string); + rawData = in.readLine(); + } + socket.close(); + } + + public void run() { + try { + waitForAccept(); + } catch (Exception e) { + Assert.fail(); + throw new RuntimeException(e); + } + } + + @Override + public void start() { + t.start(); + } + } + + public static class DummyStringSchema implements DeserializationSchema, SerializationSchema{ + private static final long serialVersionUID = 1L; + + @Override + public boolean isEndOfStream(String nextElement) { + return nextElement.equals("q"); + } + + @Override + public byte[] serialize(String element) { + return element.getBytes(); + } + + @Override + public String deserialize(byte[] message) { + return new String(message); + } + + @Override + public TypeInformation getProducedType() { + return TypeExtractor.getForClass(String.class); + } + + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java index 43b061ef6f573..37f695867cc09 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java @@ -86,4 +86,4 @@ public void start() { t.start(); } } -} +} \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java index 20f6ebe0e9d19..838834b4d7094 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java @@ -27,4 +27,4 @@ protected void testProgram() throws Exception { SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath}); } -} +} \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java index cfde04f5bc6ef..b3629ad8bada8 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java @@ -27,4 +27,4 @@ protected void testProgram() throws Exception { SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath}); } -} +} \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml index 51bea21e4bae0..9ea30fcd43950 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml +++ b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml @@ -72,6 +72,7 @@ under the License. ${guava.version} + org.apache.flink flink-tests @@ -80,6 +81,23 @@ under the License. test-jar + + + org.apache.flink + flink-test-utils + ${project.version} + test + + + + + org.apache.flink + flink-streaming-core + ${project.version} + test + test-jar + + @@ -124,7 +142,7 @@ under the License. - + org.apache.maven.plugins diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatTest.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatTest.java new file mode 100644 index 0000000000000..aec2ab9f4dc6c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.api; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.util.Collector; + +public class CsvOutputFormatTest extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + //Strip the parentheses from the expected text like output + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES + .replaceAll("[\\\\(\\\\)]", ""), resultPath); + } + + public static final class Tokenizer implements FlatMapFunction> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector> out) + throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + } + } + +} + diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatTest.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatTest.java new file mode 100644 index 0000000000000..c1249af96211d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.api; + +import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; +import org.apache.flink.streaming.util.SocketOutputTestBase; +import org.apache.flink.streaming.util.SocketProgramITCaseBase; +import org.apache.flink.test.testdata.WordCountData; + +public class SocketOutputFormatTest extends SocketOutputTestBase { + + @Override + protected void testProgram() throws Exception { + OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatTest.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatTest.java new file mode 100644 index 0000000000000..de9e45c1e92ba --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.api; + +import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class TextOutputFormatTest extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala new file mode 100644 index 0000000000000..88b0f4f803283 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala + +import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema +import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema + +import scala.language.existentials + +/** + * Test programs for built in output formats. Invoked from {@link OutputFormatTest}. + */ +object OutputFormatTestPrograms { + + def wordCountToText(input : String, outputPath : String) : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.fromElements(input) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + counts.writeAsText(outputPath) + + env.execute("Scala WordCountToText") + } + + def wordCountToCsv(input : String, outputPath : String) : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.fromElements(input) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + counts.writeAsCsv(outputPath) + + env.execute("Scala WordCountToCsv") + } + + def wordCountToSocket(input : String, outputHost : String, outputPort : Int) : Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.fromElements(input) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + .map(tuple => tuple.toString() + "\n") + + counts.writeToSocket(outputHost, outputPort, new DummyStringSchema()) + + env.execute("Scala WordCountToCsv") + } + +} From 7ea5d5a7fe5c8bea92b5f6045c9dab7c57c563b9 Mon Sep 17 00:00:00 2001 From: mbalassi Date: Thu, 4 Jun 2015 15:22:13 +0200 Subject: [PATCH 2/2] [streaming] Socket Client Sink propagates exceptions --- .../api/functions/sink/SocketClientSink.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index 3fd2678872bd6..da8fd7f01fb8f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array. @@ -35,8 +33,6 @@ public class SocketClientSink extends RichSinkFunction { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class); - private final String hostName; private final int port; private final SerializationSchema schema; @@ -65,7 +61,7 @@ public void intializeConnection() { client = new Socket(hostName, port); outputStream = client.getOutputStream(); } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e); } dataOutputStream = new DataOutputStream(outputStream); } @@ -82,11 +78,8 @@ public void invoke(IN value) { try { dataOutputStream.write(msg); } catch (IOException e) { - if(LOG.isErrorEnabled()){ - LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e); - } - throw new RuntimeException("Cannot send message \"" + value.toString() + - "\" to socket server at " + hostName + ":" + port, e); + throw new RuntimeException("Cannot send message " + value.toString() + + " to socket server at " + hostName + ":" + port, e); } } @@ -105,7 +98,7 @@ private void closeConnection(){ try { client.close(); } catch (IOException e) { - LOG.error("Cannot close connection with socket server at " + throw new RuntimeException("Cannot close connection with socket server at " + hostName + ":" + port, e); } }