Skip to content

Commit 36c4cdf

Browse files
committed
[fix-32869][rdb]fix rdb task hangs after connect retry.
1 parent 6d097ad commit 36c4cdf

File tree

2 files changed

+104
-32
lines changed

2 files changed

+104
-32
lines changed

core/src/main/java/com/dtstack/flink/sql/util/JDBCUtils.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,23 @@
1919

2020
package com.dtstack.flink.sql.util;
2121

22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.sql.Connection;
2226
import java.sql.DriverManager;
27+
import java.sql.ResultSet;
28+
import java.sql.SQLException;
29+
import java.sql.Statement;
30+
import java.util.Objects;
2331

2432
public class JDBCUtils {
33+
private static final Logger LOG = LoggerFactory.getLogger(JDBCUtils.class);
34+
2535
private static final Object LOCK = new Object();
2636

27-
public static void forName(String clazz, ClassLoader classLoader) {
28-
synchronized (LOCK){
37+
public static void forName(String clazz, ClassLoader classLoader) {
38+
synchronized (LOCK) {
2939
try {
3040
Class.forName(clazz, true, classLoader);
3141
DriverManager.setLoginTimeout(10);
@@ -44,4 +54,75 @@ public synchronized static void forName(String clazz) {
4454
throw new RuntimeException(e);
4555
}
4656
}
57+
58+
/**
59+
* 关闭连接资源
60+
*
61+
* @param rs ResultSet
62+
* @param stmt Statement
63+
* @param conn Connection
64+
* @param commit
65+
*/
66+
public static void closeConnectionResource(ResultSet rs, Statement stmt, Connection conn, boolean commit) {
67+
if (Objects.nonNull(rs)) {
68+
try {
69+
rs.close();
70+
} catch (SQLException e) {
71+
LOG.warn("Close resultSet error: {}", e.getMessage());
72+
}
73+
}
74+
75+
if (Objects.nonNull(stmt)) {
76+
try {
77+
stmt.close();
78+
} catch (SQLException e) {
79+
LOG.warn("Close statement error:{}", e.getMessage());
80+
}
81+
}
82+
83+
if (Objects.nonNull(conn)) {
84+
try {
85+
if (commit) {
86+
commit(conn);
87+
} else {
88+
rollBack(conn);
89+
}
90+
91+
conn.close();
92+
} catch (SQLException e) {
93+
LOG.warn("Close connection error:{}", e.getMessage());
94+
}
95+
}
96+
}
97+
98+
/**
99+
* 手动提交事物
100+
*
101+
* @param conn Connection
102+
*/
103+
public static void commit(Connection conn) {
104+
try {
105+
if (!conn.isClosed() && !conn.getAutoCommit()) {
106+
conn.commit();
107+
}
108+
} catch (SQLException e) {
109+
LOG.warn("commit error:{}", e.getMessage());
110+
}
111+
}
112+
113+
/**
114+
* 手动回滚事物
115+
*
116+
* @param conn Connection
117+
*/
118+
public static void rollBack(Connection conn) {
119+
try {
120+
if (!conn.isClosed() && !conn.getAutoCommit()) {
121+
conn.rollback();
122+
}
123+
} catch (SQLException e) {
124+
LOG.warn("rollBack error:{}", e.getMessage());
125+
}
126+
}
127+
47128
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,27 @@
2222
import com.dtstack.flink.sql.side.BaseSideInfo;
2323
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
25+
import com.dtstack.flink.sql.util.JDBCUtils;
2526
import com.dtstack.flink.sql.util.RowDataComplete;
2627
import com.dtstack.flink.sql.util.RowDataConvert;
2728
import com.google.common.collect.Lists;
2829
import com.google.common.collect.Maps;
2930
import org.apache.calcite.sql.JoinType;
3031
import org.apache.commons.collections.CollectionUtils;
3132
import org.apache.commons.lang3.StringUtils;
32-
import org.apache.flink.api.common.typeinfo.TypeInformation;
3333
import org.apache.flink.configuration.Configuration;
3434
import org.apache.flink.table.dataformat.BaseRow;
35-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3635
import org.apache.flink.types.Row;
3736
import org.apache.flink.util.Collector;
3837
import org.slf4j.Logger;
3938
import org.slf4j.LoggerFactory;
4039

41-
import java.sql.*;
42-
import java.time.LocalDateTime;
40+
import java.sql.Connection;
41+
import java.sql.ResultSet;
42+
import java.sql.SQLException;
43+
import java.sql.Statement;
4344
import java.util.ArrayList;
4445
import java.util.Calendar;
45-
import java.util.HashMap;
4646
import java.util.List;
4747
import java.util.Map;
4848
import java.util.Objects;
@@ -128,36 +128,26 @@ public void flatMap(Row value, Collector<BaseRow> out) throws Exception {
128128
}
129129

130130
private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQLException {
131-
RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo();
132-
Connection connection = null;
131+
queryAndFillData(tmpCache, getConnectionWithRetry((RdbSideTableInfo) sideInfo.getSideTableInfo()));
132+
}
133133

134-
try {
135-
for (int i = 0; i < CONN_RETRY_NUM; i++) {
134+
private Connection getConnectionWithRetry(RdbSideTableInfo tableInfo) throws SQLException {
135+
String connInfo = "url:" + tableInfo.getUrl() + "; userName:" + tableInfo.getUserName();
136+
String errorMsg = null;
137+
for (int i = 0; i < CONN_RETRY_NUM; i++) {
138+
try {
139+
return getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
140+
} catch (Exception e) {
136141
try {
137-
connection = getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
138-
break;
139-
} catch (Exception e) {
140-
if (i == CONN_RETRY_NUM - 1) {
141-
throw new RuntimeException("", e);
142-
}
143-
try {
144-
String connInfo = "url:" + tableInfo.getUrl() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword();
145-
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
146-
Thread.sleep(5 * 1000);
147-
} catch (InterruptedException e1) {
148-
LOG.error("", e1);
149-
}
142+
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
143+
errorMsg = e.getCause().toString();
144+
Thread.sleep(5 * 1000);
145+
} catch (InterruptedException e1) {
146+
LOG.error("", e1);
150147
}
151148
}
152-
queryAndFillData(tmpCache, connection);
153-
} catch (Exception e) {
154-
LOG.error("", e);
155-
throw new SQLException(e);
156-
} finally {
157-
if (connection != null) {
158-
connection.close();
159-
}
160149
}
150+
throw new SQLException("get conn fail. connInfo: " + connInfo + "\ncause by: " + errorMsg);
161151
}
162152

163153
private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, Connection connection) throws SQLException {
@@ -191,6 +181,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
191181
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())
192182
.add(oneRow);
193183
}
184+
JDBCUtils.closeConnectionResource(resultSet, statement, connection, false);
194185
}
195186

196187
public int getFetchSize() {

0 commit comments

Comments
 (0)