Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2125][streaming] Delimiter change from char to string #1077

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {

private String hostname;
private int port;
private char delimiter;
private String delimiter;
private long maxRetry;
private boolean retryForever;
private Socket socket;
Expand All @@ -47,6 +47,10 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
private volatile boolean isRunning;

public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
this(hostname,port,String.valueOf(delimiter),maxRetry);
}

public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxRetry) {
this.hostname = hostname;
this.port = port;
this.delimiter = delimiter;
Expand All @@ -70,13 +74,14 @@ public void run(SourceContext<String> ctx) throws Exception {
public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
try {
StringBuffer buffer = new StringBuffer();
char[] charBuffer = new char[Math.max(8192, 2*delimiter.length())];
BufferedReader reader = new BufferedReader(new InputStreamReader(
socket.getInputStream()));

while (isRunning) {
int data;
int readCount;
try {
data = reader.read();
readCount = reader.read(charBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good change. It's much better to read a buffer instead of individual characters.

} catch (SocketException e) {
if (!isRunning) {
break;
Expand All @@ -85,7 +90,7 @@ public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Ex
}
}

if (data == -1) {
if (readCount == -1) {
socket.close();
long retry = 0;
boolean success = false;
Expand Down Expand Up @@ -116,12 +121,13 @@ public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Ex
continue;
}

if (data == delimiter) {
ctx.collect(buffer.toString());
buffer = new StringBuffer();
} else if (data != '\r') { // ignore carriage return
buffer.append((char) data);
}
buffer.append(charBuffer,0,readCount);
String[] splits = buffer.toString().split(delimiter);
int sc = 0;
for (; sc < splits.length-1; sc++) {
ctx.collect(splits[sc].replace("\r", ""));
}
buffer = new StringBuffer(splits[sc].replace("\r", ""));
}

if (buffer.length() > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;


public class SocketTextStreamFunctionTest {
//Actual text
/*
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. Sed vitae nisl vitae turpis dapibus lacinia in id elit. Integer lorem dolor, porttitor ut nisi in, tincidunt sodales leo. Aliquam tristique dui sit amet odio bibendum, et rutrum turpis auctor. Morbi sit amet mollis augue, ac rutrum velit. Vestibulum suscipit finibus sapien, et congue enim laoreet consequat.

Integer aliquam metus iaculis risus hendrerit maximus. Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae.

Quisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. Phasellus a metus dignissim risus auctor lacinia. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet facilisis.

Ut vitae volutpat odio. Sed eget vestibulum libero, eu tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.

In ac imperdiet ex, nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam.
*/
// Generated 5 paragraphs, 290 words, 2000 bytes of Lorem Ipsum

private static final String content = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sagittis nisl non euismod fermentum. " +
"Curabitur lacinia vehicula enim quis tristique. Suspendisse imperdiet arcu sed bibendum vulputate. " +
"Sed vitae nisl vitae turpis dapibus lacinia in id elit. Integer lorem dolor, " +
"porttitor ut nisi in, tincidunt sodales leo. Aliquam tristique dui sit amet odio bibendum, " +
"et rutrum turpis auctor. Morbi sit amet mollis augue, ac rutrum velit. " +
"Vestibulum suscipit finibus sapien, et congue enim laoreet consequat.\r\nInteger " +
"aliquam metus iaculis risus hendrerit maximus. Suspendisse vestibulum nibh ac " +
"mauris cursus molestie sit amet vel turpis. Nulla et posuere orci. " +
"Aliquam dui quam, posuere vitae erat vitae, finibus commodo ipsum. Aliquam " +
"eu dui quis arcu porttitor sollicitudin. Integer sodales finibus ullamcorper. " +
"Praesent et felis tempor, laoreet libero eget, consequat nisl. " +
"Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae.\r\nQuisque sodales " +
"dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " +
"Phasellus a metus dignissim risus auctor lacinia. Class " +
"aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" +
". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " +
"non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " +
"facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " +
"tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " +
"et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " +
"euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " +
"Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " +
"Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " +
"Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.\r\nIn ac imperdiet ex, " +
"nec aliquet erat. Nullam sit amet enim in dolor finibus convallis id eu nibh. Fusce aliquam convallis orci aliquam.";

private static final String[] fakeServerResponses = content.split("\\.");

private class TestServer implements Runnable{
private int port = 44444;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not suitable to use a static port since sometimes it will get failed if this port is already used.
You can call serverSocket = new ServerSocket(0); to set up a socket server with a free port.

public int getPort(){
return port;
}
private ServerSocket serverSocket;

@Override public void run() {
try {
serverSocket = new ServerSocket(port);
while(true){
Socket clientSocket = serverSocket.accept();
try {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream());
for (String res : fakeServerResponses){
out.print(res+".");
out.flush();
}
}finally {
clientSocket.close();
}
}
} catch (IOException e) {
}
}

public void stopServer() throws IOException {
serverSocket.close();
}
}

private TestServer testServer;

@Before
public void startTestServerSocket(){
testServer = new TestServer();
new Thread(testServer).start();
}

@After
public void stopTestServerSocket() throws IOException {
testServer.stopServer();
}

private void prepareTestForOld(char delimiter,List<String> actualList) throws Exception {
SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0);
final ListSourceContext<String> flinkCollector = new ListSourceContext<String>(actualList);
source.open(new Configuration());
source.run(flinkCollector);
}

private void prepareTest(String delimiter,List<String> actualList) throws Exception {
SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", testServer.getPort(), delimiter, 0);
final ListSourceContext<String> flinkCollector = new ListSourceContext<String>(actualList);
source.open(new Configuration());
source.run(flinkCollector);
}

@Test
public void testNewLineDelimitedOldApiWithChar() throws Exception {
List<String> actualList = new ArrayList<>();
prepareTestForOld('\n',actualList);
assertEquals(5, actualList.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you assert the whole received buffer in SocketTextSource is correct?
Thank you!

}

@Test
public void testCarriageDelimitedOldApiWithChar() throws Exception {
List<String> actualList = new ArrayList<>();
prepareTestForOld('\r',actualList);
assertEquals(5, actualList.size());
}

@Test
public void testNewLineDelimited() throws Exception {
List<String> actualList = new ArrayList<>();
prepareTest("\n",actualList);
assertEquals(5, actualList.size());
}

@Test
public void testCarriageDelimited() throws Exception {
List<String> actualList = new ArrayList<>();
prepareTest("\r",actualList);
assertEquals(5, actualList.size());
assertTrue(actualList.get(1).indexOf('\n') != -1);
}

@Test
public void testWindowsLineEndDelimited() throws Exception {
List<String> actualList = new ArrayList<>();
prepareTest("\r\n",actualList);
assertEquals(5, actualList.size());
assertTrue(actualList.get(0).indexOf('\r') == -1);
assertTrue(actualList.get(0).indexOf('\n') == -1);
}

@Test
public void testWindowsLineEndSuffixDelimited() throws Exception {
List<String> actualList = new ArrayList<>();
prepareTest(".\r\n",actualList);
assertEquals(5, actualList.size());
assertTrue(actualList.get(0).indexOf('\r') == -1);
assertTrue(actualList.get(0).indexOf('\n') == -1);
}

@Test
public void testLongDelimited() throws Exception {
List<String> actualList = new ArrayList<>();

prepareTest("Integer aliquam metus iaculis risus hendrerit maximus. " +
"Suspendisse vestibulum nibh ac mauris cursus molestie sit amet vel turpis. " +
"Nulla et posuere orci. Aliquam dui quam, posuere vitae erat vitae, " +
"finibus commodo ipsum. Aliquam eu dui quis arcu porttitor sollicitudin. " +
"Integer sodales finibus ullamcorper. Praesent et felis tempor, laoreet libero eget, " +
"consequat nisl. Aenean molestie rutrum lorem, ac cursus nisl dapibus vitae."+
"\r\nQuisque sodales dui et sem bibendum semper. Pellentesque luctus leo nec lacus euismod pellentesque. " +
"Phasellus a metus dignissim risus auctor lacinia. Class " +
"aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos" +
". Aenean consectetur bibendum imperdiet. Etiam dignissim rutrum enim, " +
"non volutpat nisi condimentum sed. Quisque condimentum ultrices est sit amet " +
"facilisis.\r\nUt vitae volutpat odio. Sed eget vestibulum libero, eu " +
"tincidunt lorem. Nam pretium nulla nisl. Maecenas fringilla nunc ut turpis consectetur, " +
"et fringilla sem placerat. Etiam nec scelerisque nisi, at sodales ligula. Aliquam " +
"euismod faucibus egestas. Curabitur eget enim quam. Praesent convallis mattis lobortis. " +
"Pellentesque a consectetur nisl. Duis molestie diam est. Nam a malesuada augue. " +
"Vivamus enim massa, luctus ac elit ut, vestibulum laoreet nulla. " +
"Curabitur pellentesque vel mi eget tempus. Donec cursus et leo quis viverra.",actualList);

assertEquals(2, actualList.size());
assertTrue(actualList.get(0).indexOf('\r') == -1);
assertTrue(actualList.get(0).indexOf('\n') != -1);

}

}