From c78b6d726c60b1e197bf5ee513e081c852362919 Mon Sep 17 00:00:00 2001 From: ogokal Date: Sun, 30 Aug 2015 20:42:56 +0300 Subject: [PATCH 01/13] delimiter change from char to string --- .../source/SocketTextStreamFunction.java | 26 +- .../SocketTextStreamFunctionTest.java | 236 ++++++++++++++++++ 2 files changed, 252 insertions(+), 10 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index a55a56d7b0f1f..d723aead9b5b4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -37,7 +37,7 @@ public class SocketTextStreamFunction extends RichSourceFunction { private String hostname; private int port; - private char delimiter; + private String delimiter; private long maxRetry; private boolean retryForever; private Socket socket; @@ -47,6 +47,10 @@ public class SocketTextStreamFunction extends RichSourceFunction { private volatile boolean isRunning; public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { + this(hostname,port,String.valueOf(delimiter),maxRetry); + } + + public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxRetry) { this.hostname = hostname; this.port = port; this.delimiter = delimiter; @@ -70,13 +74,14 @@ public void run(SourceContext ctx) throws Exception { public void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { try { StringBuffer buffer = new StringBuffer(); + char[] charBuffer = new char[Math.max(8192, 2*delimiter.length())]; BufferedReader reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); while (isRunning) { - int data; + int readCount; try { - data = reader.read(); + readCount = reader.read(charBuffer); } catch (SocketException e) { if (!isRunning) { break; @@ -85,7 +90,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex } } - if (data == -1) { + if (readCount == -1) { socket.close(); long retry = 0; boolean success = false; @@ -116,12 +121,13 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex continue; } - if (data == delimiter) { - ctx.collect(buffer.toString()); - buffer = new StringBuffer(); - } else if (data != '\r') { // ignore carriage return - buffer.append((char) data); - } + buffer.append(charBuffer,0,readCount); + String[] splits = buffer.toString().split(delimiter); + int i = 0; + for (; i < splits.length-1; i++) { + ctx.collect(splits[i].replace("\r", "")); + } + buffer = new StringBuffer(splits[i].replace("\r", "")); } if (buffer.length() > 0) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java new file mode 100644 index 0000000000000..51237dceb3240 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.lang.reflect.Field; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class SocketTextStreamFunctionTest { + //Actual text + /* + Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. Sed vitae nisl vitae turpis dapibus lacinia in id elit. Integer lorem dolor, porttitor ut nisi in, tincidunt sodales leo. Aliquam tristique dui sit amet odio bibendum, et rutrum turpis auctor. Morbi sit amet mollis augue, ac rutrum velit. Vestibulum suscipit finibus sapien, et congue enim laoreet consequat. + + Integer aliquam metus iaculis risus hendrerit maximus. Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae. + + Quisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. Phasellus a metus dignissim risus auctor lacinia. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet facilisis. + + Ut vitae volutpat odio. Sed eget vestibulum libero, eu tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra. + + In ac imperdiet ex, nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam. + */ + // Generated 5 paragraphs, 290 words, 2000 bytes of Lorem Ipsum + + private static final String content = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. " + + "Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. " + + "Sed vitae nisl vitae turpis dapibus lacinia in id elit. Integer lorem dolor, " + + "porttitor ut nisi in, tincidunt sodales leo. Aliquam tristique dui sit amet odio bibendum, " + + "et rutrum turpis auctor. Morbi sit amet mollis augue, ac rutrum velit. " + + "Vestibulum suscipit finibus sapien, et congue enim laoreet consequat.\r\nInteger " + + "aliquam metus iaculis risus hendrerit maximus. Suspendisse vestibulum nibh ac " + + "mauris cursus molestie sit amet vel turpis. Nulla et posuere orci. " + + "Aliquam dui quam, posuere vitae erat vitae, finibus commodo ipsum. Aliquam " + + "eu dui quis arcu porttitor sollicitudin. Integer sodales finibus ullamcorper. " + + "Praesent et felis tempor, laoreet libero eget, consequat nisl. " + + "Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae.\r\nQuisque sodales " + + "dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.\r\nIn ac imperdiet ex, " + + "nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam."; + private static final byte[] data = content.getBytes(); + + @Test + public void testNewLineDelimitedOldApiWithChar() throws Exception { + List actualList = new ArrayList<>(); + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, '\n', 0); + Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); + field.setAccessible(true); + field.set(source, true); + + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.streamFromSocket(flinkCollector, socket); + assertEquals(5, actualList.size()); + } + + @Test + public void testCarriageDelimitedOldApiWithChar() throws Exception { + List actualList = new ArrayList<>(); + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, '\r', 0); + Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); + field.setAccessible(true); + field.set(source, true); + + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.streamFromSocket(flinkCollector, socket); + assertEquals(5, actualList.size()); + } + + @Test + public void testNewLineDelimited() throws Exception { + List actualList = new ArrayList<>(); + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, "\n", 0); + Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); + field.setAccessible(true); + field.set(source, true); + + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.streamFromSocket(flinkCollector, socket); + assertEquals(5, actualList.size()); + } + + @Test + public void testCarriageDelimited() throws Exception { + List actualList = new ArrayList<>(); + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, "\r", 0); + Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); + field.setAccessible(true); + field.set(source, true); + + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.streamFromSocket(flinkCollector, socket); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(1).indexOf('\n') != -1); + } + + @Test + public void testWindowsLineEndDelimited() throws Exception { + List actualList = new ArrayList<>(); + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, "\r\n", 0); + Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); + field.setAccessible(true); + field.set(source, true); + + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.streamFromSocket(flinkCollector, socket); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') == -1); + } + @Test + public void testWindowsLineEndSuffixDelimited() throws Exception { + List actualList = new ArrayList<>(); + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, ".\r\n", 0); + Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); + field.setAccessible(true); + field.set(source, true); + + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.streamFromSocket(flinkCollector, socket); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') == -1); + } + + @Test + public void testLongDelimited() throws Exception { + List actualList = new ArrayList<>(); + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, + "Integer aliquam metus iaculis risus hendrerit maximus. " + + "Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " + + "Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " + + "finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " + + "Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " + + "consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+ + "\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra." + , 0); + Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); + field.setAccessible(true); + field.set(source, true); + + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.streamFromSocket(flinkCollector, socket); + assertEquals(2, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') != -1); + + } + +} From a51c486370e3e168912cbb71bde325701112d14b Mon Sep 17 00:00:00 2001 From: ogokal Date: Sun, 30 Aug 2015 21:01:57 +0300 Subject: [PATCH 02/13] [FLINK-2125][streaming] Delimiter change from char to string --- .../api/functions/source/SocketTextStreamFunction.java | 8 ++++---- .../api/functions/SocketTextStreamFunctionTest.java | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index d723aead9b5b4..9708190ded0a1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -123,11 +123,11 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex buffer.append(charBuffer,0,readCount); String[] splits = buffer.toString().split(delimiter); - int i = 0; - for (; i < splits.length-1; i++) { - ctx.collect(splits[i].replace("\r", "")); + int sc = 0; + for (; sc < splits.length-1; sc++) { + ctx.collect(splits[sc].replace("\r", "")); } - buffer = new StringBuffer(splits[i].replace("\r", "")); + buffer = new StringBuffer(splits[sc].replace("\r", "")); } if (buffer.length() > 0) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index 51237dceb3240..a80fb7ad3558f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -171,6 +171,7 @@ public void testWindowsLineEndDelimited() throws Exception { assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') == -1); } + @Test public void testWindowsLineEndSuffixDelimited() throws Exception { List actualList = new ArrayList<>(); From 8fdc30c01218aed0e0a63ece81ef16a51d97bcc4 Mon Sep 17 00:00:00 2001 From: ogokal Date: Mon, 31 Aug 2015 22:55:41 +0300 Subject: [PATCH 03/13] [FLINK-2125][streaming] Delimiter change from char to string (test change) --- .../source/SocketTextStreamFunction.java | 2 +- .../SocketTextStreamFunctionTest.java | 205 ++++++++---------- 2 files changed, 92 insertions(+), 115 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 9708190ded0a1..10e6f6ee807cc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -81,7 +81,7 @@ public void streamFromSocket(SourceContext ctx, Socket socket) throws Ex while (isRunning) { int readCount; try { - readCount = reader.read(charBuffer); + readCount = reader.read(charBuffer); } catch (SocketException e) { if (!isRunning) { break; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index a80fb7ad3558f..8e9b7d8d6facc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -17,19 +17,21 @@ package org.apache.flink.streaming.api.functions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.lang.reflect.Field; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class SocketTextStreamFunctionTest { @@ -72,81 +74,92 @@ public class SocketTextStreamFunctionTest { "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.\r\nIn ac imperdiet ex, " + "nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam."; - private static final byte[] data = content.getBytes(); - @Test - public void testNewLineDelimitedOldApiWithChar() throws Exception { - List actualList = new ArrayList<>(); + private static final String[] fakeServerResponses = content.split("\\."); + + private class TestServer implements Runnable{ + private int port = 44444; + public int getPort(){ + return port; + } + private ServerSocket serverSocket; + + @Override public void run() { + try { + serverSocket = new ServerSocket(port); + while(true){ + Socket clientSocket = serverSocket.accept(); + try { + PrintWriter out = new PrintWriter(clientSocket.getOutputStream()); + for (String res : fakeServerResponses){ + out.print(res+"."); + out.flush(); + } + }finally { + clientSocket.close(); + } + } + } catch (IOException e) { + } + } + + public void stopServer() throws IOException { + serverSocket.close(); + } + } + + private TestServer testServer; + + @Before + public void startTestServerSocket(){ + testServer = new TestServer(); + new Thread(testServer).start(); + } - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); + @After + public void stopTestServerSocket() throws IOException { + testServer.stopServer(); + } - SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, '\n', 0); - Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); - field.setAccessible(true); - field.set(source, true); + private void prepareTestForOld(char delimiter,List actualList) throws Exception { + SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0); + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.open(new Configuration()); + source.run(flinkCollector); + } + private void prepareTest(String delimiter,List actualList) throws Exception { + SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0); final ListSourceContext flinkCollector = new ListSourceContext(actualList); - source.streamFromSocket(flinkCollector, socket); + source.open(new Configuration()); + source.run(flinkCollector); + } + + @Test + public void testNewLineDelimitedOldApiWithChar() throws Exception { + List actualList = new ArrayList<>(); + prepareTestForOld('\n',actualList); assertEquals(5, actualList.size()); } @Test public void testCarriageDelimitedOldApiWithChar() throws Exception { List actualList = new ArrayList<>(); - - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); - - SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, '\r', 0); - Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); - field.setAccessible(true); - field.set(source, true); - - final ListSourceContext flinkCollector = new ListSourceContext(actualList); - source.streamFromSocket(flinkCollector, socket); + prepareTestForOld('\r',actualList); assertEquals(5, actualList.size()); } @Test public void testNewLineDelimited() throws Exception { - List actualList = new ArrayList<>(); - - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); - - SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, "\n", 0); - Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); - field.setAccessible(true); - field.set(source, true); - - final ListSourceContext flinkCollector = new ListSourceContext(actualList); - source.streamFromSocket(flinkCollector, socket); - assertEquals(5, actualList.size()); + List actualList = new ArrayList<>(); + prepareTest("\n",actualList); + assertEquals(5, actualList.size()); } @Test public void testCarriageDelimited() throws Exception { List actualList = new ArrayList<>(); - - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); - - SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, "\r", 0); - Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); - field.setAccessible(true); - field.set(source, true); - - final ListSourceContext flinkCollector = new ListSourceContext(actualList); - source.streamFromSocket(flinkCollector, socket); + prepareTest("\r",actualList); assertEquals(5, actualList.size()); assertTrue(actualList.get(1).indexOf('\n') != -1); } @@ -154,19 +167,7 @@ public void testCarriageDelimited() throws Exception { @Test public void testWindowsLineEndDelimited() throws Exception { List actualList = new ArrayList<>(); - - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); - - SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, "\r\n", 0); - Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); - field.setAccessible(true); - field.set(source, true); - - final ListSourceContext flinkCollector = new ListSourceContext(actualList); - source.streamFromSocket(flinkCollector, socket); + prepareTest("\r\n",actualList); assertEquals(5, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') == -1); @@ -175,19 +176,7 @@ public void testWindowsLineEndDelimited() throws Exception { @Test public void testWindowsLineEndSuffixDelimited() throws Exception { List actualList = new ArrayList<>(); - - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); - - SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, ".\r\n", 0); - Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); - field.setAccessible(true); - field.set(source, true); - - final ListSourceContext flinkCollector = new ListSourceContext(actualList); - source.streamFromSocket(flinkCollector, socket); + prepareTest(".\r\n",actualList); assertEquals(5, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') == -1); @@ -197,37 +186,25 @@ public void testWindowsLineEndSuffixDelimited() throws Exception { public void testLongDelimited() throws Exception { List actualList = new ArrayList<>(); - Socket socket = mock(Socket.class); - when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); - when(socket.isClosed()).thenReturn(false); - when(socket.isConnected()).thenReturn(true); - - SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, - "Integer aliquam metus iaculis risus hendrerit maximus. " + - "Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " + - "Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " + - "finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " + - "Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " + - "consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+ - "\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + - "Phasellus a metus dignissim risus auctor lacinia. Class " + - "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + - ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + - "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + - "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + - "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + - "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + - "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + - "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + - "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + - "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra." - , 0); - Field field = SocketTextStreamFunction.class.getDeclaredField("isRunning"); - field.setAccessible(true); - field.set(source, true); + prepareTest("Integer aliquam metus iaculis risus hendrerit maximus. " + + "Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " + + "Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " + + "finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " + + "Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " + + "consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+ + "\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.",actualList); - final ListSourceContext flinkCollector = new ListSourceContext(actualList); - source.streamFromSocket(flinkCollector, socket); assertEquals(2, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') != -1); From bad70797aae31be2d35001196b7dc833e657631f Mon Sep 17 00:00:00 2001 From: ogokal Date: Mon, 31 Aug 2015 23:00:48 +0300 Subject: [PATCH 04/13] [FLINK-2125][streaming] reformated test --- .../streaming/api/functions/SocketTextStreamFunctionTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index 8e9b7d8d6facc..b0a418a5ce582 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -151,9 +151,9 @@ public void testCarriageDelimitedOldApiWithChar() throws Exception { @Test public void testNewLineDelimited() throws Exception { - List actualList = new ArrayList<>(); + List actualList = new ArrayList<>(); prepareTest("\n",actualList); - assertEquals(5, actualList.size()); + assertEquals(5, actualList.size()); } @Test From b7e0c2ec3ea299ad67737f148ffb8097bef919f3 Mon Sep 17 00:00:00 2001 From: ogokal Date: Tue, 1 Sep 2015 21:55:02 +0300 Subject: [PATCH 05/13] [FLINK-2125][streaming] changed port and assertion for full test content --- .../SocketTextStreamFunctionTest.java | 107 ++++++++++-------- 1 file changed, 60 insertions(+), 47 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index b0a418a5ce582..321742a0e63d7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -35,19 +35,6 @@ public class SocketTextStreamFunctionTest { - //Actual text - /* - Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. Sed vitae nisl vitae turpis dapibus lacinia in id elit. Integer lorem dolor, porttitor ut nisi in, tincidunt sodales leo. Aliquam tristique dui sit amet odio bibendum, et rutrum turpis auctor. Morbi sit amet mollis augue, ac rutrum velit. Vestibulum suscipit finibus sapien, et congue enim laoreet consequat. - - Integer aliquam metus iaculis risus hendrerit maximus. Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae. - - Quisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. Phasellus a metus dignissim risus auctor lacinia. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet facilisis. - - Ut vitae volutpat odio. Sed eget vestibulum libero, eu tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra. - - In ac imperdiet ex, nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam. - */ - // Generated 5 paragraphs, 290 words, 2000 bytes of Lorem Ipsum private static final String content = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. " + "Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. " + @@ -75,26 +62,29 @@ public class SocketTextStreamFunctionTest { "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.\r\nIn ac imperdiet ex, " + "nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam."; - private static final String[] fakeServerResponses = content.split("\\."); - private class TestServer implements Runnable{ - private int port = 44444; + private volatile int port = 0; public int getPort(){ return port; } private ServerSocket serverSocket; + public TestServer() { + try { + serverSocket = new ServerSocket(port); + } catch (IOException e) { + e.printStackTrace(); + } + port = serverSocket.getLocalPort(); + } @Override public void run() { try { - serverSocket = new ServerSocket(port); while(true){ Socket clientSocket = serverSocket.accept(); try { PrintWriter out = new PrintWriter(clientSocket.getOutputStream()); - for (String res : fakeServerResponses){ - out.print(res+"."); - out.flush(); - } + out.print(content); + out.flush(); }finally { clientSocket.close(); } @@ -135,76 +125,99 @@ private void prepareTest(String delimiter,List actualList) throws Except source.run(flinkCollector); } + private String restoreContentFromActual(String delimiter, List actualList, boolean newLineAsWindows){ + StringBuilder sb = new StringBuilder(); + int i = 0; + for (; i < actualList.size()-1; i++) { + sb.append(actualList.get(i)+delimiter); + } + sb.append(actualList.get(i)); + return sb.toString().replace("\r","").replace("\n","\r\n"); + } + @Test public void testNewLineDelimitedOldApiWithChar() throws Exception { List actualList = new ArrayList<>(); - prepareTestForOld('\n',actualList); + char delimiter = '\n'; + prepareTestForOld(delimiter, actualList); assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter),actualList,true)); } @Test public void testCarriageDelimitedOldApiWithChar() throws Exception { List actualList = new ArrayList<>(); - prepareTestForOld('\r',actualList); + char delimiter = '\r'; + prepareTestForOld(delimiter,actualList); assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter), actualList, false)); } @Test public void testNewLineDelimited() throws Exception { List actualList = new ArrayList<>(); - prepareTest("\n",actualList); + String delimiter = "\n"; + prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(delimiter,actualList,true)); } @Test public void testCarriageDelimited() throws Exception { List actualList = new ArrayList<>(); - prepareTest("\r",actualList); + String delimiter = "\r"; + prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); assertTrue(actualList.get(1).indexOf('\n') != -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); } @Test public void testWindowsLineEndDelimited() throws Exception { List actualList = new ArrayList<>(); - prepareTest("\r\n",actualList); + String delimiter = "\r\n"; + prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') == -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); } @Test public void testWindowsLineEndSuffixDelimited() throws Exception { List actualList = new ArrayList<>(); - prepareTest(".\r\n",actualList); + String delimiter = ".\r\n"; + prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') == -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); } @Test public void testLongDelimited() throws Exception { List actualList = new ArrayList<>(); - - prepareTest("Integer aliquam metus iaculis risus hendrerit maximus. " + - "Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " + - "Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " + - "finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " + - "Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " + - "consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+ - "\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + - "Phasellus a metus dignissim risus auctor lacinia. Class " + - "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + - ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + - "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + - "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + - "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + - "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + - "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + - "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + - "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + - "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.",actualList); - + String delimiter = "Integer aliquam metus iaculis risus hendrerit maximus. " + + "Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " + + "Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " + + "finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " + + "Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " + + "consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+ + "\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra."; + + prepareTest(delimiter,actualList); + assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); assertEquals(2, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') != -1); From 7aaa02af131acf89084a3856670a4c0100988509 Mon Sep 17 00:00:00 2001 From: ogokal Date: Tue, 1 Sep 2015 23:12:16 +0300 Subject: [PATCH 06/13] [FLINK-2125] removed unnecessary parameter --- .../functions/SocketTextStreamFunctionTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index 321742a0e63d7..c43a8164db652 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -125,7 +125,7 @@ private void prepareTest(String delimiter,List actualList) throws Except source.run(flinkCollector); } - private String restoreContentFromActual(String delimiter, List actualList, boolean newLineAsWindows){ + private String restoreContentFromActual(String delimiter, List actualList){ StringBuilder sb = new StringBuilder(); int i = 0; for (; i < actualList.size()-1; i++) { @@ -141,7 +141,7 @@ public void testNewLineDelimitedOldApiWithChar() throws Exception { char delimiter = '\n'; prepareTestForOld(delimiter, actualList); assertEquals(5, actualList.size()); - assertEquals(content,restoreContentFromActual(String.valueOf(delimiter),actualList,true)); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter),actualList)); } @Test @@ -150,7 +150,7 @@ public void testCarriageDelimitedOldApiWithChar() throws Exception { char delimiter = '\r'; prepareTestForOld(delimiter,actualList); assertEquals(5, actualList.size()); - assertEquals(content,restoreContentFromActual(String.valueOf(delimiter), actualList, false)); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter), actualList)); } @Test @@ -159,7 +159,7 @@ public void testNewLineDelimited() throws Exception { String delimiter = "\n"; prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); - assertEquals(content,restoreContentFromActual(delimiter,actualList,true)); + assertEquals(content,restoreContentFromActual(delimiter,actualList)); } @Test @@ -169,7 +169,7 @@ public void testCarriageDelimited() throws Exception { prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); assertTrue(actualList.get(1).indexOf('\n') != -1); - assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); } @Test @@ -180,7 +180,7 @@ public void testWindowsLineEndDelimited() throws Exception { assertEquals(5, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') == -1); - assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); } @Test @@ -191,7 +191,7 @@ public void testWindowsLineEndSuffixDelimited() throws Exception { assertEquals(5, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') == -1); - assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); } @Test @@ -217,7 +217,7 @@ public void testLongDelimited() throws Exception { "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra."; prepareTest(delimiter,actualList); - assertEquals(content, restoreContentFromActual(delimiter, actualList, false)); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); assertEquals(2, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') != -1); From ab652143da54c458a34de87cf8918b9c48e07328 Mon Sep 17 00:00:00 2001 From: ogokal Date: Sun, 30 Aug 2015 20:42:56 +0300 Subject: [PATCH 07/13] delimiter change from char to string --- .../source/SocketTextStreamFunction.java | 28 ++- .../SocketTextStreamFunctionTest.java | 227 ++++++++++++++++++ 2 files changed, 244 insertions(+), 11 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 6e7bcf691035e..7f6dce79f22a3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -37,7 +37,7 @@ public class SocketTextStreamFunction extends RichSourceFunction { private String hostname; private int port; - private char delimiter; + private String delimiter; private long maxRetry; private boolean retryForever; private Socket socket; @@ -48,6 +48,10 @@ public class SocketTextStreamFunction extends RichSourceFunction { private volatile boolean isRunning; public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { + this(hostname, port, String.valueOf(delimiter), maxRetry); + } + + public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxRetry) { this.hostname = hostname; this.port = port; this.delimiter = delimiter; @@ -70,14 +74,15 @@ public void run(SourceContext ctx) throws Exception { private void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { try { - StringBuilder buffer = new StringBuilder(); + StringBuffer buffer = new StringBuffer(); + char[] charBuffer = new char[Math.max(8192, 2 * delimiter.length())]; BufferedReader reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); while (isRunning) { - int data; + int readCount; try { - data = reader.read(); + readCount = reader.read(charBuffer); } catch (SocketException e) { if (!isRunning) { break; @@ -86,11 +91,11 @@ private void streamFromSocket(SourceContext ctx, Socket socket) throws E } } - if (data == -1) { + if (readCount == -1) { socket.close(); boolean success = false; retries = 0; - while ((retries < maxRetry || retryForever) && !success) { + while ((retries < maxRetry || retryForever) && !success) { if (!retryForever) { retries++; } @@ -117,12 +122,13 @@ private void streamFromSocket(SourceContext ctx, Socket socket) throws E continue; } - if (data == delimiter) { - ctx.collect(buffer.toString()); - buffer = new StringBuilder(); - } else if (data != '\r') { // ignore carriage return - buffer.append((char) data); + buffer.append(charBuffer, 0, readCount); + String[] splits = buffer.toString().split(delimiter); + int sc = 0; + for (; sc < splits.length - 1; sc++) { + ctx.collect(splits[sc].replace("\r", "")); } + buffer = new StringBuffer(splits[sc].replace("\r", "")); } if (buffer.length() > 0) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java new file mode 100644 index 0000000000000..c43a8164db652 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class SocketTextStreamFunctionTest { + + private static final String content = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. " + + "Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. " + + "Sed vitae nisl vitae turpis dapibus lacinia in id elit. Integer lorem dolor, " + + "porttitor ut nisi in, tincidunt sodales leo. Aliquam tristique dui sit amet odio bibendum, " + + "et rutrum turpis auctor. Morbi sit amet mollis augue, ac rutrum velit. " + + "Vestibulum suscipit finibus sapien, et congue enim laoreet consequat.\r\nInteger " + + "aliquam metus iaculis risus hendrerit maximus. Suspendisse vestibulum nibh ac " + + "mauris cursus molestie sit amet vel turpis. Nulla et posuere orci. " + + "Aliquam dui quam, posuere vitae erat vitae, finibus commodo ipsum. Aliquam " + + "eu dui quis arcu porttitor sollicitudin. Integer sodales finibus ullamcorper. " + + "Praesent et felis tempor, laoreet libero eget, consequat nisl. " + + "Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae.\r\nQuisque sodales " + + "dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.\r\nIn ac imperdiet ex, " + + "nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam."; + + private class TestServer implements Runnable{ + private volatile int port = 0; + public int getPort(){ + return port; + } + private ServerSocket serverSocket; + public TestServer() { + try { + serverSocket = new ServerSocket(port); + } catch (IOException e) { + e.printStackTrace(); + } + port = serverSocket.getLocalPort(); + } + + @Override public void run() { + try { + while(true){ + Socket clientSocket = serverSocket.accept(); + try { + PrintWriter out = new PrintWriter(clientSocket.getOutputStream()); + out.print(content); + out.flush(); + }finally { + clientSocket.close(); + } + } + } catch (IOException e) { + } + } + + public void stopServer() throws IOException { + serverSocket.close(); + } + } + + private TestServer testServer; + + @Before + public void startTestServerSocket(){ + testServer = new TestServer(); + new Thread(testServer).start(); + } + + @After + public void stopTestServerSocket() throws IOException { + testServer.stopServer(); + } + + private void prepareTestForOld(char delimiter,List actualList) throws Exception { + SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0); + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.open(new Configuration()); + source.run(flinkCollector); + } + + private void prepareTest(String delimiter,List actualList) throws Exception { + SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0); + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.open(new Configuration()); + source.run(flinkCollector); + } + + private String restoreContentFromActual(String delimiter, List actualList){ + StringBuilder sb = new StringBuilder(); + int i = 0; + for (; i < actualList.size()-1; i++) { + sb.append(actualList.get(i)+delimiter); + } + sb.append(actualList.get(i)); + return sb.toString().replace("\r","").replace("\n","\r\n"); + } + + @Test + public void testNewLineDelimitedOldApiWithChar() throws Exception { + List actualList = new ArrayList<>(); + char delimiter = '\n'; + prepareTestForOld(delimiter, actualList); + assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter),actualList)); + } + + @Test + public void testCarriageDelimitedOldApiWithChar() throws Exception { + List actualList = new ArrayList<>(); + char delimiter = '\r'; + prepareTestForOld(delimiter,actualList); + assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter), actualList)); + } + + @Test + public void testNewLineDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "\n"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(delimiter,actualList)); + } + + @Test + public void testCarriageDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "\r"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(1).indexOf('\n') != -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + } + + @Test + public void testWindowsLineEndDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "\r\n"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') == -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + } + + @Test + public void testWindowsLineEndSuffixDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = ".\r\n"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') == -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + } + + @Test + public void testLongDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "Integer aliquam metus iaculis risus hendrerit maximus. " + + "Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " + + "Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " + + "finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " + + "Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " + + "consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+ + "\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra."; + + prepareTest(delimiter,actualList); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + assertEquals(2, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') != -1); + + } + +} From c5410290cf4b2e1f34a5c04f86c53ccd3ceaa7b2 Mon Sep 17 00:00:00 2001 From: ogokal Date: Sun, 30 Aug 2015 21:01:57 +0300 Subject: [PATCH 08/13] [FLINK-2125][streaming] Delimiter change from char to string --- .../functions/source/SocketTextStreamFunction.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 7f6dce79f22a3..7bf5133574cf8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -122,13 +122,13 @@ private void streamFromSocket(SourceContext ctx, Socket socket) throws E continue; } - buffer.append(charBuffer, 0, readCount); - String[] splits = buffer.toString().split(delimiter); - int sc = 0; - for (; sc < splits.length - 1; sc++) { - ctx.collect(splits[sc].replace("\r", "")); - } - buffer = new StringBuffer(splits[sc].replace("\r", "")); + buffer.append(charBuffer,0,readCount); + String[] splits = buffer.toString().split(delimiter); + int sc = 0; + for (; sc < splits.length-1; sc++) { + ctx.collect(splits[sc].replace("\r", "")); + } + buffer = new StringBuffer(splits[sc].replace("\r", "")); } if (buffer.length() > 0) { From 90e189675c570433ba61bbf2b2efd205130dceae Mon Sep 17 00:00:00 2001 From: ogokal Date: Mon, 31 Aug 2015 22:55:41 +0300 Subject: [PATCH 09/13] [FLINK-2125][streaming] Delimiter change from char to string (test change) --- .../streaming/api/functions/SocketTextStreamFunctionTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index c43a8164db652..6859c66e54c55 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -155,7 +155,7 @@ public void testCarriageDelimitedOldApiWithChar() throws Exception { @Test public void testNewLineDelimited() throws Exception { - List actualList = new ArrayList<>(); + List actualList = new ArrayList<>(); String delimiter = "\n"; prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); @@ -182,7 +182,7 @@ public void testWindowsLineEndDelimited() throws Exception { assertTrue(actualList.get(0).indexOf('\n') == -1); assertEquals(content, restoreContentFromActual(delimiter, actualList)); } - + @Test public void testWindowsLineEndSuffixDelimited() throws Exception { List actualList = new ArrayList<>(); From 6e920df079192ea4359b974624df6bb7a76a73bc Mon Sep 17 00:00:00 2001 From: ogokal Date: Mon, 31 Aug 2015 23:00:48 +0300 Subject: [PATCH 10/13] [FLINK-2125][streaming] reformated test --- .../api/functions/SocketTextStreamFunctionTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index 6859c66e54c55..eedcca1c5e2ff 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -156,10 +156,10 @@ public void testCarriageDelimitedOldApiWithChar() throws Exception { @Test public void testNewLineDelimited() throws Exception { List actualList = new ArrayList<>(); - String delimiter = "\n"; - prepareTest(delimiter,actualList); + char delimiter = '\n'; + prepareTestForOld(delimiter, actualList); assertEquals(5, actualList.size()); - assertEquals(content,restoreContentFromActual(delimiter,actualList)); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter),actualList)); } @Test From feb5507cbf04bd97f384e322db80235a0a4bc1c8 Mon Sep 17 00:00:00 2001 From: ogokal Date: Tue, 1 Sep 2015 21:55:02 +0300 Subject: [PATCH 11/13] [FLINK-2125][streaming] changed port and assertion for full test content --- .../api/functions/SocketTextStreamFunctionTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index eedcca1c5e2ff..6859c66e54c55 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -156,10 +156,10 @@ public void testCarriageDelimitedOldApiWithChar() throws Exception { @Test public void testNewLineDelimited() throws Exception { List actualList = new ArrayList<>(); - char delimiter = '\n'; - prepareTestForOld(delimiter, actualList); + String delimiter = "\n"; + prepareTest(delimiter,actualList); assertEquals(5, actualList.size()); - assertEquals(content,restoreContentFromActual(String.valueOf(delimiter),actualList)); + assertEquals(content,restoreContentFromActual(delimiter,actualList)); } @Test From 33bd4eac232168f614c43a9cae6582864dde502d Mon Sep 17 00:00:00 2001 From: ogokal Date: Wed, 16 Sep 2015 21:53:02 +0300 Subject: [PATCH 12/13] [FLINK-2125][streaming] Delimiter change from char to string --- .../streaming/api/functions/SocketTextStreamFunctionTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java index 6859c66e54c55..3f7f24a5a0296 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -221,7 +221,6 @@ public void testLongDelimited() throws Exception { assertEquals(2, actualList.size()); assertTrue(actualList.get(0).indexOf('\r') == -1); assertTrue(actualList.get(0).indexOf('\n') != -1); - } } From 72cbbdec9eebaf93626885275311e06f5551cb6c Mon Sep 17 00:00:00 2001 From: ogokal Date: Wed, 16 Sep 2015 21:53:02 +0300 Subject: [PATCH 13/13] [FLINK-2125][streaming] Delimiter change from char to string --- .../source/SocketTextStreamFunction.java | 28 ++- .../SocketTextStreamFunctionTest.java | 227 ++++++++++++++++++ 2 files changed, 244 insertions(+), 11 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index 6e7bcf691035e..7f6dce79f22a3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -37,7 +37,7 @@ public class SocketTextStreamFunction extends RichSourceFunction { private String hostname; private int port; - private char delimiter; + private String delimiter; private long maxRetry; private boolean retryForever; private Socket socket; @@ -48,6 +48,10 @@ public class SocketTextStreamFunction extends RichSourceFunction { private volatile boolean isRunning; public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { + this(hostname, port, String.valueOf(delimiter), maxRetry); + } + + public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxRetry) { this.hostname = hostname; this.port = port; this.delimiter = delimiter; @@ -70,14 +74,15 @@ public void run(SourceContext ctx) throws Exception { private void streamFromSocket(SourceContext ctx, Socket socket) throws Exception { try { - StringBuilder buffer = new StringBuilder(); + StringBuffer buffer = new StringBuffer(); + char[] charBuffer = new char[Math.max(8192, 2 * delimiter.length())]; BufferedReader reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); while (isRunning) { - int data; + int readCount; try { - data = reader.read(); + readCount = reader.read(charBuffer); } catch (SocketException e) { if (!isRunning) { break; @@ -86,11 +91,11 @@ private void streamFromSocket(SourceContext ctx, Socket socket) throws E } } - if (data == -1) { + if (readCount == -1) { socket.close(); boolean success = false; retries = 0; - while ((retries < maxRetry || retryForever) && !success) { + while ((retries < maxRetry || retryForever) && !success) { if (!retryForever) { retries++; } @@ -117,12 +122,13 @@ private void streamFromSocket(SourceContext ctx, Socket socket) throws E continue; } - if (data == delimiter) { - ctx.collect(buffer.toString()); - buffer = new StringBuilder(); - } else if (data != '\r') { // ignore carriage return - buffer.append((char) data); + buffer.append(charBuffer, 0, readCount); + String[] splits = buffer.toString().split(delimiter); + int sc = 0; + for (; sc < splits.length - 1; sc++) { + ctx.collect(splits[sc].replace("\r", "")); } + buffer = new StringBuffer(splits[sc].replace("\r", "")); } if (buffer.length() > 0) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java new file mode 100644 index 0000000000000..6859c66e54c55 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketTextStreamFunctionTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class SocketTextStreamFunctionTest { + + private static final String content = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. " + + "Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. " + + "Sed vitae nisl vitae turpis dapibus lacinia in id elit. Integer lorem dolor, " + + "porttitor ut nisi in, tincidunt sodales leo. Aliquam tristique dui sit amet odio bibendum, " + + "et rutrum turpis auctor. Morbi sit amet mollis augue, ac rutrum velit. " + + "Vestibulum suscipit finibus sapien, et congue enim laoreet consequat.\r\nInteger " + + "aliquam metus iaculis risus hendrerit maximus. Suspendisse vestibulum nibh ac " + + "mauris cursus molestie sit amet vel turpis. Nulla et posuere orci. " + + "Aliquam dui quam, posuere vitae erat vitae, finibus commodo ipsum. Aliquam " + + "eu dui quis arcu porttitor sollicitudin. Integer sodales finibus ullamcorper. " + + "Praesent et felis tempor, laoreet libero eget, consequat nisl. " + + "Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae.\r\nQuisque sodales " + + "dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.\r\nIn ac imperdiet ex, " + + "nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam."; + + private class TestServer implements Runnable{ + private volatile int port = 0; + public int getPort(){ + return port; + } + private ServerSocket serverSocket; + public TestServer() { + try { + serverSocket = new ServerSocket(port); + } catch (IOException e) { + e.printStackTrace(); + } + port = serverSocket.getLocalPort(); + } + + @Override public void run() { + try { + while(true){ + Socket clientSocket = serverSocket.accept(); + try { + PrintWriter out = new PrintWriter(clientSocket.getOutputStream()); + out.print(content); + out.flush(); + }finally { + clientSocket.close(); + } + } + } catch (IOException e) { + } + } + + public void stopServer() throws IOException { + serverSocket.close(); + } + } + + private TestServer testServer; + + @Before + public void startTestServerSocket(){ + testServer = new TestServer(); + new Thread(testServer).start(); + } + + @After + public void stopTestServerSocket() throws IOException { + testServer.stopServer(); + } + + private void prepareTestForOld(char delimiter,List actualList) throws Exception { + SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0); + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.open(new Configuration()); + source.run(flinkCollector); + } + + private void prepareTest(String delimiter,List actualList) throws Exception { + SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0); + final ListSourceContext flinkCollector = new ListSourceContext(actualList); + source.open(new Configuration()); + source.run(flinkCollector); + } + + private String restoreContentFromActual(String delimiter, List actualList){ + StringBuilder sb = new StringBuilder(); + int i = 0; + for (; i < actualList.size()-1; i++) { + sb.append(actualList.get(i)+delimiter); + } + sb.append(actualList.get(i)); + return sb.toString().replace("\r","").replace("\n","\r\n"); + } + + @Test + public void testNewLineDelimitedOldApiWithChar() throws Exception { + List actualList = new ArrayList<>(); + char delimiter = '\n'; + prepareTestForOld(delimiter, actualList); + assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter),actualList)); + } + + @Test + public void testCarriageDelimitedOldApiWithChar() throws Exception { + List actualList = new ArrayList<>(); + char delimiter = '\r'; + prepareTestForOld(delimiter,actualList); + assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(String.valueOf(delimiter), actualList)); + } + + @Test + public void testNewLineDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "\n"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertEquals(content,restoreContentFromActual(delimiter,actualList)); + } + + @Test + public void testCarriageDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "\r"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(1).indexOf('\n') != -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + } + + @Test + public void testWindowsLineEndDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "\r\n"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') == -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + } + + @Test + public void testWindowsLineEndSuffixDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = ".\r\n"; + prepareTest(delimiter,actualList); + assertEquals(5, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') == -1); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + } + + @Test + public void testLongDelimited() throws Exception { + List actualList = new ArrayList<>(); + String delimiter = "Integer aliquam metus iaculis risus hendrerit maximus. " + + "Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " + + "Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " + + "finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " + + "Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " + + "consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+ + "\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " + + "Phasellus a metus dignissim risus auctor lacinia. Class " + + "aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" + + ". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " + + "non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " + + "facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " + + "tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " + + "et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " + + "euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " + + "Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " + + "Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " + + "Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra."; + + prepareTest(delimiter,actualList); + assertEquals(content, restoreContentFromActual(delimiter, actualList)); + assertEquals(2, actualList.size()); + assertTrue(actualList.get(0).indexOf('\r') == -1); + assertTrue(actualList.get(0).indexOf('\n') != -1); + + } + +}