Skip to content

Commit b6ebbd4

Browse files
happyboy1024dengjunjie
andauthored
[Bugfix][TDengine] Fix the degree of multiple parallelism affects driver loading (#6020)
--------- Co-authored-by: dengjunjie <296442618@qq.com>
1 parent d69d88d commit b6ebbd4

File tree

4 files changed

+105
-4
lines changed

4 files changed

+105
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.tdengine.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
22+
public enum TDengineConnectorErrorCode implements SeaTunnelErrorCode {
23+
LOAD_DRIVER_FAILED("TDengine-01", "Fail to create driver of class");
24+
25+
private final String code;
26+
private final String description;
27+
28+
TDengineConnectorErrorCode(String code, String description) {
29+
this.code = code;
30+
this.description = description;
31+
}
32+
33+
@Override
34+
public String getCode() {
35+
return code;
36+
}
37+
38+
@Override
39+
public String getDescription() {
40+
return description;
41+
}
42+
}

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.Arrays;
4747
import java.util.Objects;
4848

49+
import static org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
50+
4951
@Slf4j
5052
public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
5153

@@ -66,6 +68,8 @@ public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType
6668
config.getUsername(),
6769
"&password=",
6870
config.getPassword());
71+
// check td driver whether exist and if not, try to register
72+
checkDriverExist(jdbcUrl);
6973
conn = DriverManager.getConnection(jdbcUrl);
7074
try (Statement statement = conn.createStatement()) {
7175
final ResultSet metaResultSet =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.tdengine.utils;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
21+
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
22+
23+
import lombok.extern.slf4j.Slf4j;
24+
25+
import java.sql.Driver;
26+
import java.sql.DriverManager;
27+
import java.sql.SQLException;
28+
29+
@Slf4j
30+
public class TDengineUtil {
31+
32+
public static synchronized void checkDriverExist(String jdbcUrl) {
33+
try {
34+
DriverManager.getDriver(jdbcUrl);
35+
} catch (SQLException e) {
36+
log.warn("no available driver found for this {}, waiting for it to load", jdbcUrl);
37+
}
38+
39+
String driverName;
40+
if (jdbcUrl.startsWith("jdbc:TAOS-RS://")) {
41+
driverName = "com.taosdata.jdbc.rs.RestfulDriver";
42+
} else {
43+
driverName = "com.taosdata.jdbc.TSDBDriver";
44+
}
45+
46+
try {
47+
Class<?> clazz =
48+
Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
49+
Driver driver = (Driver) clazz.getDeclaredConstructor().newInstance();
50+
DriverManager.registerDriver(driver);
51+
} catch (Exception ex) {
52+
throw new TDengineConnectorException(
53+
TDengineConnectorErrorCode.LOAD_DRIVER_FAILED,
54+
"Fail to create driver of class " + driverName,
55+
ex);
56+
}
57+
}
58+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@
1919
######
2020

2121
env {
22-
# You can set flink configuration here
23-
execution.parallelism = 2
22+
parallelism = 2
2423
job.mode = "BATCH"
25-
#execution.checkpoint.interval = 10000
26-
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
2724
}
2825

2926
source {

0 commit comments

Comments
 (0)