From 293fbf46c57c3e00eeb21d3b46f76fd896159e3d Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 21 Jun 2015 17:24:14 +0100 Subject: [PATCH 01/11] push for travis --- .../classes/log4j.properties | 23 +++++++++++++++++++ .../classes/log4j.properties | 23 +++++++++++++++++++ .../StreamExecutionEnvironment.java | 10 ++++---- .../source/SocketTextStreamFunction.java | 6 ++--- .../socket/SocketTextStreamWordCount.java | 2 +- 5 files changed, 55 insertions(+), 9 deletions(-) create mode 100644 flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties create mode 100644 flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties new file mode 100644 index 0000000000000..da32ea0f44d37 --- /dev/null +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties new file mode 100644 index 0000000000000..da32ea0f44d37 --- /dev/null +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 5e7be8dfcb553..df37eb066dc1d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -828,7 +828,7 @@ public DataStream readFileStream(String filePath, long intervalMillis, * The port number which a server socket binds. A port number of 0 means that the port number is automatically * allocated. * @param delimiter - * A character which splits received strings into records + * A string delimiter which splits received strings into records * @param maxRetry * The maximal retry interval in seconds while the program waits for a socket that is temporarily down. * Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated, @@ -836,7 +836,7 @@ public DataStream readFileStream(String filePath, long intervalMillis, * a negative value ensures retrying forever. * @return A data stream containing the strings received from the socket */ - public DataStreamSource socketTextStream(String hostname, int port, char delimiter, long maxRetry) { + public DataStreamSource socketTextStream(String hostname, int port, String delimiter, long maxRetry) { return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream"); } @@ -851,10 +851,10 @@ public DataStreamSource socketTextStream(String hostname, int port, char * The port number which a server socket binds. A port number of 0 means that the port number is automatically * allocated. * @param delimiter - * A character which splits received strings into records + * A string which splits received strings into records * @return A data stream containing the strings received from the socket */ - public DataStreamSource socketTextStream(String hostname, int port, char delimiter) { + public DataStreamSource socketTextStream(String hostname, int port, String delimiter) { return socketTextStream(hostname, port, delimiter, 0); } @@ -871,7 +871,7 @@ public DataStreamSource socketTextStream(String hostname, int port, char * @return A data stream containing the strings received from the socket */ public DataStreamSource socketTextStream(String hostname, int port) { - return socketTextStream(hostname, port, '\n'); + return socketTextStream(hostname, port, "\n"); } /** diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index fb66f16c4759f..55c4d454410e6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -37,7 +37,7 @@ public class SocketTextStreamFunction extends RichSourceFunction { private String hostname; private int port; - private char delimiter; + private String delimiter; private long maxRetry; private boolean retryForever; private Socket socket; @@ -46,7 +46,7 @@ public class SocketTextStreamFunction extends RichSourceFunction { private volatile boolean isRunning; - public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { + public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxRetry) { this.hostname = hostname; this.port = port; this.delimiter = delimiter; @@ -115,7 +115,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - if (data == delimiter) { + if (String.valueOf(data).equals(delimiter)) { ctx.collect(buffer.toString()); buffer = new StringBuffer(); } else if (data != '\r') { // ignore carriage return diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java index 14730975a3cb1..b66959aabe488 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java @@ -60,7 +60,7 @@ public static void main(String[] args) throws Exception { .getExecutionEnvironment(); // get input data - DataStream text = env.socketTextStream(hostName, port, '\n', 0); + DataStream text = env.socketTextStream(hostName, port, "\n", 0); DataStream> counts = // split up the lines in pairs (2-tuples) containing: (word,1) From 285047d77e994720d3cf997a37112f573c5b40b8 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 21 Jun 2015 17:42:11 +0100 Subject: [PATCH 02/11] without cast is would not work correctly --- .../api/functions/source/SocketTextStreamFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 55c4d454410e6..cab341300dd09 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -115,7 +115,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - if (String.valueOf(data).equals(delimiter)) { + if (String.valueOf((char)data).equals(delimiter)) { ctx.collect(buffer.toString()); buffer = new StringBuffer(); } else if (data != '\r') { // ignore carriage return From 3110be3e26dc82bb48a5ee3dd52022af0c74a1fb Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 21 Jun 2015 19:25:34 +0100 Subject: [PATCH 03/11] not sure why this has been created but sure files are not needed --- .../classes/log4j.properties | 23 ------------------- .../classes/log4j.properties | 23 ------------------- 2 files changed, 46 deletions(-) delete mode 100644 flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties delete mode 100644 flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties deleted file mode 100644 index da32ea0f44d37..0000000000000 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties deleted file mode 100644 index da32ea0f44d37..0000000000000 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n From b5a2e50be0a77a3ec34930c99e7ca3f15fe0b105 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sat, 4 Jul 2015 11:46:05 +0100 Subject: [PATCH 04/11] rc1 --- .../source/SocketTextStreamFunction.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index cab341300dd09..fc470fb7be644 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -25,6 +25,7 @@ import java.net.Socket; import java.net.SocketException; +import org.apache.commons.lang.ArrayUtils; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,15 +69,19 @@ public void run(SourceContext ctx) throws Exception { } public void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { + try { StringBuffer buffer = new StringBuffer(); BufferedReader reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); while (isRunning) { - int data; + int charsRead; + // int data; + char [] cbuff = new char[ delimiter.length() ]; try { - data = reader.read(); + //data = reader.read(); + charsRead = reader.read(cbuff); } catch (SocketException e) { if (!isRunning) { break; @@ -85,7 +90,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex } } - if (data == -1) { + if (charsRead == -1) { socket.close(); long retry = 0; boolean success = false; @@ -97,8 +102,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex + (CONNECTION_RETRY_SLEEP / 1000) + " seconds..."); try { socket = new Socket(); - socket.connect(new InetSocketAddress(hostname, port), - CONNECTION_TIMEOUT_TIME); + socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); success = true; } catch (ConnectException ce) { Thread.sleep(CONNECTION_RETRY_SLEEP); @@ -115,11 +119,17 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - if (String.valueOf((char)data).equals(delimiter)) { - ctx.collect(buffer.toString()); - buffer = new StringBuffer(); - } else if (data != '\r') { // ignore carriage return - buffer.append((char) data); + //if (String.valueOf((char)data).equals(delimiter)) { + if (buffer.indexOf(delimiter) != -1) { + ctx.collect(buffer.substring(0, buffer.indexOf(delimiter))); + buffer = new StringBuffer(buffer.substring(buffer.indexOf(delimiter) + delimiter.length())); + + //} else if (data != '\r') { // ignore carriage return + } else { + while (ArrayUtils.contains(cbuff, '\r')) + ArrayUtils.removeElement(cbuff, 'r'); + + buffer.append(cbuff); } } From e74d0f0756930d60478d0bda4a21a1258aa8a32a Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sat, 4 Jul 2015 12:17:42 +0100 Subject: [PATCH 05/11] missing escaping --- .../api/functions/source/SocketTextStreamFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index fc470fb7be644..cfc70af873173 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -127,7 +127,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex //} else if (data != '\r') { // ignore carriage return } else { while (ArrayUtils.contains(cbuff, '\r')) - ArrayUtils.removeElement(cbuff, 'r'); + ArrayUtils.removeElement(cbuff, '\r'); buffer.append(cbuff); } From 0fb6f176763d204ebfa42bb6b7864303b85d6932 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 5 Jul 2015 11:00:17 +0100 Subject: [PATCH 06/11] another try --- .../classes/log4j.properties | 23 ++++++++++++++++ .../classes/log4j.properties | 23 ++++++++++++++++ .../source/SocketTextStreamFunction.java | 26 +++++++------------ 3 files changed, 56 insertions(+), 16 deletions(-) create mode 100644 flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties create mode 100644 flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties new file mode 100644 index 0000000000000..da32ea0f44d37 --- /dev/null +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties new file mode 100644 index 0000000000000..da32ea0f44d37 --- /dev/null +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/${project.build.directory}/classes/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index cfc70af873173..aabaff7a92057 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -76,12 +76,9 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex socket.getInputStream())); while (isRunning) { - int charsRead; - // int data; - char [] cbuff = new char[ delimiter.length() ]; + int data; try { - //data = reader.read(); - charsRead = reader.read(cbuff); + data = reader.read(); } catch (SocketException e) { if (!isRunning) { break; @@ -90,7 +87,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex } } - if (charsRead == -1) { + if (data == -1) { socket.close(); long retry = 0; boolean success = false; @@ -119,17 +116,14 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - //if (String.valueOf((char)data).equals(delimiter)) { - if (buffer.indexOf(delimiter) != -1) { - ctx.collect(buffer.substring(0, buffer.indexOf(delimiter))); - buffer = new StringBuffer(buffer.substring(buffer.indexOf(delimiter) + delimiter.length())); - - //} else if (data != '\r') { // ignore carriage return - } else { - while (ArrayUtils.contains(cbuff, '\r')) - ArrayUtils.removeElement(cbuff, '\r'); + if (data != '\r') { // ignore carriage return + buffer.append((char)data); + } - buffer.append(cbuff); + int delimiterIndex = buffer.indexOf(delimiter); + if (delimiterIndex != -1) { + ctx.collect(buffer.substring(0, delimiterIndex)); + buffer = new StringBuffer(); } } From edca12e5b210063926b61411fcb0e16a99c977e8 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 5 Jul 2015 11:26:40 +0100 Subject: [PATCH 07/11] revert --- .../source/SocketTextStreamFunction.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index aabaff7a92057..1595ad47a2757 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -25,7 +25,6 @@ import java.net.Socket; import java.net.SocketException; -import org.apache.commons.lang.ArrayUtils; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +68,6 @@ public void run(SourceContext ctx) throws Exception { } public void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { - try { StringBuffer buffer = new StringBuffer(); BufferedReader reader = new BufferedReader(new InputStreamReader( @@ -99,7 +97,8 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex + (CONNECTION_RETRY_SLEEP / 1000) + " seconds..."); try { socket = new Socket(); - socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); + socket.connect(new InetSocketAddress(hostname, port), + CONNECTION_TIMEOUT_TIME); success = true; } catch (ConnectException ce) { Thread.sleep(CONNECTION_RETRY_SLEEP); @@ -116,14 +115,11 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - if (data != '\r') { // ignore carriage return - buffer.append((char)data); - } - - int delimiterIndex = buffer.indexOf(delimiter); - if (delimiterIndex != -1) { - ctx.collect(buffer.substring(0, delimiterIndex)); + if (String.valueOf((char)data).equals(delimiter)) { + ctx.collect(buffer.toString()); buffer = new StringBuffer(); + } else if (data != '\r') { // ignore carriage return + buffer.append((char) data); } } @@ -148,4 +144,4 @@ public void cancel() { } } } -} +} \ No newline at end of file From 77482670c524f39888c2626aaf4cde6ab265e596 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 12 Jul 2015 12:43:28 +0100 Subject: [PATCH 08/11] :( --- .../api/functions/source/SocketTextStreamFunction.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 1595ad47a2757..affb0cdc4ba12 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -25,6 +25,7 @@ import java.net.Socket; import java.net.SocketException; +import org.apache.commons.lang.ArrayUtils; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,11 +116,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - if (String.valueOf((char)data).equals(delimiter)) { - ctx.collect(buffer.toString()); buffer = new StringBuffer(); - } else if (data != '\r') { // ignore carriage return - buffer.append((char) data); } } @@ -144,4 +141,3 @@ public void cancel() { } } } -} \ No newline at end of file From 6b685d7eb066cebc10056fa400cbca58a5ba14f5 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 12 Jul 2015 12:46:28 +0100 Subject: [PATCH 09/11] dsadsa --- .../api/functions/source/SocketTextStreamFunction.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index affb0cdc4ba12..06dc82fb5055c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -116,6 +116,14 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } + if (data == '\r') // ignore carriage return + continue; + + buffer.append((char)data); + + int delimiterIndex = buffer.indexOf(delimiter); + if (delimiterIndex != -1) { + ctx.collect(buffer.substring(0, delimiterIndex)); buffer = new StringBuffer(); } } @@ -141,3 +149,4 @@ public void cancel() { } } } +} From a65bccaea650378bd3e29f60ee9a308a3ed0aee5 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 12 Jul 2015 12:48:53 +0100 Subject: [PATCH 10/11] tabs --- .../api/functions/source/SocketTextStreamFunction.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 06dc82fb5055c..00bfc9deb54b4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -118,8 +118,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex if (data == '\r') // ignore carriage return continue; - - buffer.append((char)data); + buffer.append((char)data); int delimiterIndex = buffer.indexOf(delimiter); if (delimiterIndex != -1) { From 1310cef4db9c7d7848d643f87cc77aa81e3e12e1 Mon Sep 17 00:00:00 2001 From: hczerpak Date: Sun, 12 Jul 2015 13:24:45 +0100 Subject: [PATCH 11/11] 123 --- .../api/functions/source/SocketTextStreamFunction.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 00bfc9deb54b4..cd453ef5b7cfa 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -25,7 +25,6 @@ import java.net.Socket; import java.net.SocketException; -import org.apache.commons.lang.ArrayUtils; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,8 +115,10 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - if (data == '\r') // ignore carriage return - continue; + if (data == '\r') { // ignore carriage return + continue; + } + buffer.append((char)data); int delimiterIndex = buffer.indexOf(delimiter);