Skip to content

Commit 8c75469

Browse files
committed
console sink add log and serversocketSource modify exception scope
1 parent 740f0b5 commit 8c75469

File tree

2 files changed

+29
-20
lines changed

2 files changed

+29
-20
lines changed

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/table/TablePrintUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.dtstack.flink.sql.sink.console.table;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
36
import java.lang.reflect.InvocationTargetException;
47
import java.lang.reflect.Method;
58
import java.util.ArrayList;
@@ -15,6 +18,7 @@
1518
* @author xuqianjin
1619
*/
1720
public class TablePrintUtil {
21+
private static final Logger LOG = LoggerFactory.getLogger(TablePrintUtil.class);
1822
public static final int ALIGN_LEFT = 1;//左对齐
1923
public static final int ALIGN_RIGHT = 2;//右对齐
2024
public static final int ALIGN_CENTER = 3;//居中对齐
@@ -222,6 +226,7 @@ public String getTableString() {
222226
* 直接打印表格
223227
*/
224228
public void print() {
229+
LOG.info("\n"+getTableString());
225230
System.out.println(getTableString());
226231
}
227232

serversocket/serversocket-source/src/main/java/com/dtstack/flink/sql/source/serversocket/CustomerSocketTextStreamFunction.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.flink.streaming.api.functions.source.SourceFunction;
2626
import org.apache.flink.types.Row;
2727
import org.apache.flink.util.IOUtils;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
import java.io.BufferedReader;
3032
import java.io.IOException;
@@ -42,11 +44,12 @@
4244
* @author maqi
4345
*/
4446
public class CustomerSocketTextStreamFunction implements SourceFunction<Row> {
47+
private static final Logger LOG = LoggerFactory.getLogger(CustomerSocketTextStreamFunction.class);
4548

4649
/**
4750
* Default delay between successive connection attempts.
4851
*/
49-
private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
52+
private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 2000;
5053

5154
/**
5255
* Default connection timeout when connecting to the server socket (infinite).
@@ -92,32 +95,33 @@ public void run(SourceContext<Row> ctx) throws Exception {
9295
long attempt = 0;
9396

9497
while (isRunning) {
95-
96-
try (Socket socket = new Socket()) {
98+
try {
99+
Socket socket = new Socket();
97100
currentSocket = socket;
98-
99101
socket.connect(new InetSocketAddress(tableInfo.getHostname(), tableInfo.getPort()), CONNECTION_TIMEOUT_TIME);
100-
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
101-
102-
char[] cbuf = new char[8192];
103-
int bytesRead;
104-
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
105-
buffer.append(cbuf, 0, bytesRead);
106-
int delimPos;
107-
String delimiter = tableInfo.getDelimiter();
108-
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
109-
String record = buffer.substring(0, delimPos);
110-
// truncate trailing carriage return
111-
if (delimiter.equals("\n") && record.endsWith("\r")) {
112-
record = record.substring(0, record.length() - 1);
113-
}
114-
ctx.collect(convertToRow(record));
115-
buffer.delete(0, delimPos + delimiter.length());
102+
103+
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
104+
char[] cbuf = new char[8192];
105+
int bytesRead;
106+
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
107+
buffer.append(cbuf, 0, bytesRead);
108+
int delimPos;
109+
String delimiter = tableInfo.getDelimiter();
110+
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
111+
String record = buffer.substring(0, delimPos);
112+
// truncate trailing carriage return
113+
if (delimiter.equals("\n") && record.endsWith("\r")) {
114+
record = record.substring(0, record.length() - 1);
116115
}
116+
ctx.collect(convertToRow(record));
117+
buffer.delete(0, delimPos + delimiter.length());
117118
}
118119
}
120+
} catch (Exception e) {
121+
LOG.info("Connection server failed, Please check configuration !!!!!!!!!!!!!!!!");
119122
}
120123

124+
121125
// if we dropped out of this loop due to an EOF, sleep and retry
122126
if (isRunning) {
123127
attempt++;

0 commit comments

Comments
 (0)