Skip to content

Commit 402670c

Browse files
committed
[hotfix-35512][rdb] adjust code.
1 parent f08eeef commit 402670c

File tree

5 files changed

+33
-26
lines changed

5 files changed

+33
-26
lines changed

rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/JdbcResourceCheck.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.core.rdb;
2020

21-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
21+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2222
import com.dtstack.flink.sql.resource.ResourceCheck;
2323
import org.apache.commons.lang.StringUtils;
2424
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -109,7 +109,7 @@ public void checkPrivilege(
109109
, String schema
110110
, List<String> privilegeList) {
111111
Connection connection =
112-
JdbcConnectUtil.getConnectWithRetry(driverName, url, userName, password);
112+
JdbcConnectionUtil.getConnectionWithRetry(driverName, url, userName, password);
113113
Statement statement = null;
114114
String tableInfo = Objects.isNull(schema) ? tableName : schema + "." + tableName;
115115
String privilege = null;
@@ -133,7 +133,7 @@ public void checkPrivilege(
133133

134134
throw new SuppressRestartsException(new IllegalArgumentException(sqlException.getMessage()));
135135
} finally {
136-
JdbcConnectUtil.closeConnectionResource(null, statement, connection, false);
136+
JdbcConnectionUtil.closeConnectionResource(null, statement, connection, false);
137137
}
138138
}
139139
}

rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectUtil.java renamed to rdb/rdb-core/src/main/java/com/dtstack/flink/sql/core/rdb/util/JdbcConnectionUtil.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@
3838
* Date 2020-12-25
3939
* Company dtstack
4040
*/
41-
public class JdbcConnectUtil {
41+
public class JdbcConnectionUtil {
4242
private static final int DEFAULT_RETRY_NUM = 3;
4343
private static final long DEFAULT_RETRY_TIME_WAIT = 3L;
4444
private static final int DEFAULT_VALID_TIME = 10;
45-
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectUtil.class);
45+
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionUtil.class);
4646

4747
/**
4848
* 关闭连接资源
@@ -119,25 +119,26 @@ public static void rollBack(Connection conn) {
119119
}
120120

121121
/**
122-
* get connect from datasource and retry when failed.
122+
* get connection from datasource and retry when failed.
123123
*
124124
* @param driverName driver name for rdb datasource
125125
* @param url connect url
126126
* @param userName connect user name
127127
* @param password password for user name
128128
* @return a valid connection
129129
*/
130-
public static Connection getConnectWithRetry(
131-
String driverName
132-
, String url
133-
, String userName
134-
, String password) {
135-
String errorMessage = "\nGet connect failed with properties: \nurl: " + url
136-
+ (Objects.isNull(userName) ? "" : "\nuserName: " + userName
137-
+ "\nerror message: ");
138-
String errorCause = null;
130+
public static Connection getConnectionWithRetry(String driverName,
131+
String url,
132+
String userName,
133+
String password) {
134+
String message = "Get connection failed. " +
135+
"\nurl: [%s]" +
136+
"\nuserName: [%s]" +
137+
"\ncause: [%s]";
138+
String errorCause;
139+
String errorMessage = "";
139140

140-
ClassLoaderManager.forName(driverName, JdbcConnectUtil.class.getClassLoader());
141+
ClassLoaderManager.forName(driverName, JdbcConnectionUtil.class.getClassLoader());
141142
Preconditions.checkNotNull(url, "url can't be null!");
142143

143144
for (int i = 0; i < DEFAULT_RETRY_NUM; i++) {
@@ -150,11 +151,17 @@ public static Connection getConnectWithRetry(
150151
return connection;
151152
} catch (Exception e) {
152153
errorCause = ExceptionTrace.traceOriginalCause(e);
153-
LOG.warn(errorMessage + errorCause);
154+
errorMessage = String.format(
155+
message,
156+
url,
157+
userName,
158+
errorCause
159+
);
160+
LOG.warn(errorMessage);
154161
LOG.warn("Connect will retry after [{}] s. Retry time [{}] ...", DEFAULT_RETRY_TIME_WAIT, i + 1);
155162
ThreadUtil.sleepSeconds(DEFAULT_RETRY_TIME_WAIT);
156163
}
157164
}
158-
throw new SuppressRestartsException(new SQLException(errorMessage + errorCause));
165+
throw new SuppressRestartsException(new SQLException(errorMessage));
159166
}
160167
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package com.dtstack.flink.sql.side.rdb.all;
2020

2121
import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck;
22-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
22+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2323
import com.dtstack.flink.sql.side.BaseAllReqRow;
2424
import com.dtstack.flink.sql.side.BaseSideInfo;
2525
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
@@ -208,7 +208,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
208208
tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList())
209209
.add(oneRow);
210210
}
211-
JdbcConnectUtil.closeConnectionResource(resultSet, statement, connection, false);
211+
JdbcConnectionUtil.closeConnectionResource(resultSet, statement, connection, false);
212212
}
213213

214214
public int getFetchSize() {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/JDBCUpsertOutputFormat.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
import com.dtstack.flink.sql.core.rdb.JdbcResourceCheck;
23-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
23+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2424
import com.dtstack.flink.sql.enums.EUpdateMode;
2525
import com.dtstack.flink.sql.exception.ExceptionTrace;
2626
import com.dtstack.flink.sql.factory.DTThreadFactory;
@@ -135,7 +135,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
135135

136136
public void openJdbc() throws IOException {
137137
try {
138-
connection = JdbcConnectUtil.getConnectWithRetry(
138+
connection = JdbcConnectionUtil.getConnectionWithRetry(
139139
driverName,
140140
dbURL,
141141
username,
@@ -194,7 +194,7 @@ private void checkConnectionOpen() {
194194
try {
195195
if (!connection.isValid(10)) {
196196
LOG.info("db connection reconnect..");
197-
connection = JdbcConnectUtil.getConnectWithRetry(
197+
connection = JdbcConnectionUtil.getConnectionWithRetry(
198198
driverName,
199199
dbURL,
200200
username,

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/JDBCWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.rdb.writer;
2020

21-
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectUtil;
21+
import com.dtstack.flink.sql.core.rdb.util.JdbcConnectionUtil;
2222
import com.dtstack.flink.sql.exception.ExceptionTrace;
2323
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2424
import org.apache.flink.api.java.tuple.Tuple2;
@@ -82,8 +82,8 @@ default void dealExecuteError(Connection connection,
8282
Row row,
8383
long errorLimit,
8484
Logger LOG) {
85-
JdbcConnectUtil.rollBack(connection);
86-
JdbcConnectUtil.commit(connection);
85+
JdbcConnectionUtil.rollBack(connection);
86+
JdbcConnectionUtil.commit(connection);
8787

8888
if (metricOutputFormat.outDirtyRecords.getCount() % DIRTYDATA_PRINT_FREQUENTY == 0 ||
8989
LOG.isDebugEnabled()) {

0 commit comments

Comments
 (0)