Skip to content

Commit 8e32d1d

Browse files
committed
[feat-1049][sqlserver] 修复sqlserver插件bug
1 parent cc32339 commit 8e32d1d

File tree

9 files changed

+87
-39
lines changed

9 files changed

+87
-39
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
104104

105105
private final AtomicBoolean connectionStatus = new AtomicBoolean(true);
106106

107-
private static volatile boolean resourceCheck = true;
107+
private static volatile boolean resourceCheck = false;
108108

109109
private transient ThreadPoolExecutor executor;
110110

@@ -235,11 +235,7 @@ final protected void doAsyncQueryData(
235235
LOG.error(String.format("retry ... current time [%s]", failCounter.get()));
236236
if (failCounter.get() >= retryMaxNum) {
237237
resultFuture.completeExceptionally(
238-
new SuppressRestartsException(
239-
new Throwable(
240-
ExceptionTrace.traceOriginalCause(conn.cause())
241-
)
242-
)
238+
new SuppressRestartsException(conn.cause())
243239
);
244240
finishFlag.set(true);
245241
}

sqlserver/pom.xml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
<properties>
1818
<jtds.version>1.3.1</jtds.version>
1919
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
20+
<mssql.jdbc.version>9.2.1.jre8</mssql.jdbc.version>
2021
</properties>
2122

2223
<modules>
@@ -32,10 +33,11 @@
3233
<scope>provided</scope>
3334
</dependency>
3435

36+
<!-- https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc -->
3537
<dependency>
36-
<groupId>net.sourceforge.jtds</groupId>
37-
<artifactId>jtds</artifactId>
38-
<version>${jtds.version}</version>
38+
<groupId>com.microsoft.sqlserver</groupId>
39+
<artifactId>mssql-jdbc</artifactId>
40+
<version>${mssql.jdbc.version}</version>
3941
</dependency>
4042

4143
</dependencies>

sqlserver/sqlserver-side/sqlserver-all-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAllReqRow.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -44,7 +44,7 @@ public class SqlserverAllReqRow extends AbstractRdbAllReqRow {
4444

4545
private static final Logger LOG = LoggerFactory.getLogger(SqlserverAllReqRow.class);
4646

47-
private static final String SQLSERVER_DRIVER = "net.sourceforge.jtds.jdbc.Driver";
47+
private static final String SQLSERVER_DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
4848

4949
public SqlserverAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
5050
super(new SqlserverAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
@@ -63,5 +63,4 @@ public Connection getConn(String dbUrl, String userName, String password) {
6363
throw new RuntimeException("", e);
6464
}
6565
}
66-
6766
}

sqlserver/sqlserver-side/sqlserver-all-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAllSideInfo.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -18,11 +18,14 @@
1818
package com.dtstack.flink.sql.side.sqlserver;
1919

2020

21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2122
import com.dtstack.flink.sql.side.FieldInfo;
2223
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.all.RdbAllSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
2629
import java.util.List;
2730
/**
2831
* Date: 2019/11/26
@@ -34,4 +37,14 @@ public class SqlserverAllSideInfo extends RdbAllSideInfo {
3437
public SqlserverAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3538
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3639
}
40+
41+
@Override
42+
public String quoteIdentifier(String identifier) {
43+
return "\"" + identifier + "\"";
44+
}
45+
46+
@Override
47+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
48+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
49+
}
3750
}

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class SqlserverAsyncReqRow extends RdbAsyncReqRow {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(SqlserverAsyncReqRow.class);
4343

44-
private final static String SQLSERVER_DRIVER = "net.sourceforge.jtds.jdbc.Driver";
44+
private final static String SQLSERVER_DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
4545

4646
public SqlserverAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4747
super(new SqlserverAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));

sqlserver/sqlserver-side/sqlserver-async-side/src/main/java/com/dtstack/flink/sql/side/sqlserver/SqlserverAsyncSideInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
25+
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
26+
import com.dtstack.flink.sql.util.DtStringUtil;
2527
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2628

2729
import java.util.List;
@@ -37,4 +39,14 @@ public class SqlserverAsyncSideInfo extends RdbAsyncSideInfo {
3739
public SqlserverAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
3840
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
3941
}
42+
43+
@Override
44+
public String getTableName(RdbSideTableInfo rdbSideTableInfo) {
45+
return DtStringUtil.getTableFullPath(rdbSideTableInfo.getSchema(), rdbSideTableInfo.getTableName());
46+
}
47+
48+
@Override
49+
public String quoteIdentifier(String identifier) {
50+
return "\"" + identifier + "\"";
51+
}
4052
}

sqlserver/sqlserver-side/sqlserver-side-core/src/main/java/com/dtstack/flink/sql/side/sqlserver/table/SqlserverSideParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class SqlserverSideParser extends RdbSideParser {
3232

3333
@Override
3434
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
35-
props.put(JdbcCheckKeys.DRIVER_NAME, "net.sourceforge.jtds.jdbc.Driver");
35+
props.put(JdbcCheckKeys.DRIVER_NAME, "com.microsoft.sqlserver.jdbc.SQLServerDriver");
3636
AbstractTableInfo sqlServerTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
3737
sqlServerTableInfo.setType(CURR_TYPE);
3838
return sqlServerTableInfo;

sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,72 +30,98 @@
3030
/**
3131
* Date: 2020/1/15
3232
* Company: www.dtstack.com
33+
*
3334
* @author maqi
3435
*/
35-
public class SqlserverDialect implements JDBCDialect {
36+
public class SqlserverDialect implements JDBCDialect {
3637

3738
@Override
3839
public boolean canHandle(String url) {
39-
return url.startsWith("jdbc:jtds:");
40+
return url.startsWith("jdbc:sqlserver:");
4041
}
4142

4243
@Override
4344
public Optional<String> defaultDriverName() {
44-
return Optional.of("net.sourceforge.jtds.jdbc.Driver");
45+
return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
4546
}
4647

4748
@Override
4849
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
4950
tableName = DtStringUtil.getTableFullPath(schema, tableName);
5051
StringBuilder sb = new StringBuilder();
51-
sb.append("MERGE INTO " + tableName + " T1 USING "
52-
+ "(" + buildDualQueryStatement(fieldNames) + ") T2 ON ("
53-
+ buildConnectionConditions(uniqueKeyFields) + ") ");
52+
sb.append("MERGE INTO ")
53+
.append(tableName)
54+
.append(" T1 USING (")
55+
.append(buildDualQueryStatement(fieldNames))
56+
.append(") T2 ON (")
57+
.append(buildConnectionConditions(uniqueKeyFields))
58+
.append(") ");
5459

5560
String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace);
5661

5762
if (StringUtils.isNotEmpty(updateSql)) {
58-
sb.append(" WHEN MATCHED THEN UPDATE SET ");
59-
sb.append(updateSql);
63+
sb.append(" WHEN MATCHED THEN UPDATE SET ")
64+
.append(updateSql);
6065
}
6166

62-
sb.append(" WHEN NOT MATCHED THEN "
63-
+ "INSERT (" + Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")) + ") VALUES ("
64-
+ Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")) + ")");
65-
sb.append(";");
67+
sb.append(" WHEN NOT MATCHED THEN " + "INSERT (")
68+
.append(
69+
Arrays
70+
.stream(fieldNames)
71+
.map(this::quoteIdentifier)
72+
.collect(Collectors.joining(",")))
73+
.append(") VALUES (")
74+
.append(
75+
Arrays
76+
.stream(fieldNames)
77+
.map(col -> "T2." + quoteIdentifier(col))
78+
.collect(Collectors.joining(",")))
79+
.append(");");
6680
return Optional.of(sb.toString());
6781
}
6882

6983
/**
70-
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
84+
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
85+
*
7186
* @param fieldNames
7287
* @param uniqueKeyFields
7388
* @param allReplace
7489
* @return
7590
*/
7691
private String buildUpdateConnection(String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
7792
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
78-
return Arrays.stream(fieldNames).filter(col -> !uniqueKeyList.contains(col)).map(col -> {
79-
return allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
80-
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =ISNULL(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + ","
81-
+ quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")";
82-
}).collect(Collectors.joining(","));
93+
return
94+
Arrays
95+
.stream(fieldNames)
96+
.filter(col -> !uniqueKeyList.contains(col))
97+
.map(col -> allReplace ?
98+
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
99+
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =ISNULL(" + quoteIdentifier("T2") + "." + quoteIdentifier(col)
100+
+ "," + quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")").collect(Collectors.joining(","));
83101
}
84102

85103

86104
private String buildConnectionConditions(String[] uniqueKeyFields) {
87-
return Arrays.stream(uniqueKeyFields).map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col)).collect(Collectors.joining(","));
105+
return
106+
Arrays
107+
.stream(uniqueKeyFields)
108+
.map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col))
109+
.collect(Collectors.joining(","));
88110
}
89111

90112
/**
91113
* build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
92114
*
93-
* @param column destination column
115+
* @param column destination column
94116
* @return
95117
*/
96118
public String buildDualQueryStatement(String[] column) {
97119
StringBuilder sb = new StringBuilder("SELECT ");
98-
String collect = Arrays.stream(column).map(col -> " ? " + quoteIdentifier(col)).collect(Collectors.joining(", "));
120+
String collect =
121+
Arrays
122+
.stream(column)
123+
.map(col -> " ? " + quoteIdentifier(col))
124+
.collect(Collectors.joining(", "));
99125
sb.append(collect);
100126
return sb.toString();
101127
}

sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/table/SqlserverSinkParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -35,7 +35,7 @@ public class SqlserverSinkParser extends RdbSinkParser {
3535

3636
@Override
3737
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
38-
props.put(JdbcCheckKeys.DRIVER_NAME, "net.sourceforge.jtds.jdbc.Driver");
38+
props.put(JdbcCheckKeys.DRIVER_NAME, "com.microsoft.sqlserver.jdbc.SQLServerDriver");
3939
AbstractTableInfo sqlserverTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
4040
sqlserverTableInfo.setType(CURR_TYPE);
4141
return sqlserverTableInfo;

0 commit comments

Comments
 (0)