Skip to content

Commit 78f7989

Browse files
authored
[Hotfix][Connector] Fixed TDengine connector using jdbc driver to cause loading error (#4598)
1 parent ef44c0d commit 78f7989

File tree

5 files changed

+125
-120
lines changed

5 files changed

+125
-120
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
19+
20+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Data;
24+
25+
import java.io.Serializable;
26+
import java.util.List;
27+
28+
@Data
29+
@AllArgsConstructor
30+
public class StableMetadata implements Serializable {
31+
private final SeaTunnelRowType rowType;
32+
private final String timestampFieldName;
33+
private final List<String> subTableNames;
34+
}

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java

Lines changed: 67 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@
3838
import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper;
3939

4040
import org.apache.commons.lang3.ArrayUtils;
41-
import org.apache.commons.lang3.StringUtils;
4241

4342
import com.google.auto.service.AutoService;
44-
import com.google.common.collect.Lists;
4543
import lombok.SneakyThrows;
4644

4745
import java.sql.Connection;
4846
import java.sql.DriverManager;
4947
import java.sql.ResultSet;
48+
import java.sql.SQLException;
5049
import java.sql.Statement;
50+
import java.util.ArrayList;
5151
import java.util.List;
5252

5353
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
@@ -67,7 +67,7 @@
6767
public class TDengineSource
6868
implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, TDengineSourceState> {
6969

70-
private SeaTunnelRowType seaTunnelRowType;
70+
private StableMetadata stableMetadata;
7171
private TDengineSourceConfig tdengineSourceConfig;
7272

7373
@Override
@@ -87,45 +87,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
8787
"TDengine connection require url/database/stable/username/password. All of these must not be empty.");
8888
}
8989
tdengineSourceConfig = buildSourceConfig(pluginConfig);
90-
91-
// add subtable_name and tags to `seaTunnelRowType`
92-
SeaTunnelRowType originRowType = getSTableMetaInfo(tdengineSourceConfig);
93-
seaTunnelRowType = addHiddenAttribute(originRowType);
94-
}
95-
96-
@SneakyThrows
97-
private SeaTunnelRowType getSTableMetaInfo(TDengineSourceConfig config) {
98-
String jdbcUrl =
99-
StringUtils.join(
100-
config.getUrl(),
101-
config.getDatabase(),
102-
"?user=",
103-
config.getUsername(),
104-
"&password=",
105-
config.getPassword());
106-
Connection conn = DriverManager.getConnection(jdbcUrl);
107-
List<String> fieldNames = Lists.newArrayList();
108-
List<SeaTunnelDataType<?>> fieldTypes = Lists.newArrayList();
109-
try (Statement statement = conn.createStatement()) {
110-
final ResultSet metaResultSet =
111-
statement.executeQuery(
112-
"desc " + config.getDatabase() + "." + config.getStable());
113-
while (metaResultSet.next()) {
114-
fieldNames.add(metaResultSet.getString(1));
115-
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
116-
}
117-
}
118-
return new SeaTunnelRowType(
119-
fieldNames.toArray(new String[0]), fieldTypes.toArray(new SeaTunnelDataType<?>[0]));
120-
}
121-
122-
private SeaTunnelRowType addHiddenAttribute(SeaTunnelRowType originRowType) {
123-
// 0-subtable_name / 1-n field_names /
124-
String[] fieldNames = ArrayUtils.add(originRowType.getFieldNames(), 0, "subtable_name");
125-
// n+1-> tags
126-
SeaTunnelDataType<?>[] fieldTypes =
127-
ArrayUtils.add(originRowType.getFieldTypes(), 0, BasicType.STRING_TYPE);
128-
return new SeaTunnelRowType(fieldNames, fieldTypes);
90+
stableMetadata = getStableMetadata(tdengineSourceConfig);
12991
}
13092

13193
@Override
@@ -135,7 +97,7 @@ public Boundedness getBoundedness() {
13597

13698
@Override
13799
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
138-
return seaTunnelRowType;
100+
return stableMetadata.getRowType();
139101
}
140102

141103
@Override
@@ -147,14 +109,74 @@ public SourceReader<SeaTunnelRow, TDengineSourceSplit> createReader(Context read
147109
public SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> createEnumerator(
148110
SourceSplitEnumerator.Context<TDengineSourceSplit> enumeratorContext) {
149111
return new TDengineSourceSplitEnumerator(
150-
seaTunnelRowType, tdengineSourceConfig, enumeratorContext);
112+
stableMetadata, tdengineSourceConfig, enumeratorContext);
151113
}
152114

153115
@Override
154116
public SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> restoreEnumerator(
155117
SourceSplitEnumerator.Context<TDengineSourceSplit> enumeratorContext,
156118
TDengineSourceState checkpointState) {
157119
return new TDengineSourceSplitEnumerator(
158-
seaTunnelRowType, tdengineSourceConfig, checkpointState, enumeratorContext);
120+
stableMetadata, tdengineSourceConfig, checkpointState, enumeratorContext);
121+
}
122+
123+
private StableMetadata getStableMetadata(TDengineSourceConfig config) throws SQLException {
124+
String timestampFieldName = null;
125+
List<String> subTableNames = new ArrayList<>();
126+
List<String> fieldNames = new ArrayList<>();
127+
List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>();
128+
129+
String jdbcUrl =
130+
String.join(
131+
"",
132+
config.getUrl(),
133+
config.getDatabase(),
134+
"?user=",
135+
config.getUsername(),
136+
"&password=",
137+
config.getPassword());
138+
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
139+
try (Statement statement = conn.createStatement()) {
140+
ResultSet metaResultSet =
141+
statement.executeQuery(
142+
"desc " + config.getDatabase() + "." + config.getStable());
143+
while (metaResultSet.next()) {
144+
if (timestampFieldName == null) {
145+
timestampFieldName = metaResultSet.getString(1);
146+
}
147+
fieldNames.add(metaResultSet.getString(1));
148+
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
149+
}
150+
}
151+
try (Statement statement = conn.createStatement()) {
152+
String metaSQL =
153+
"select table_name from information_schema.ins_tables where db_name = '"
154+
+ config.getDatabase()
155+
+ "' and stable_name='"
156+
+ config.getStable()
157+
+ "';";
158+
ResultSet subTableNameResultSet = statement.executeQuery(metaSQL);
159+
while (subTableNameResultSet.next()) {
160+
String subTableName = subTableNameResultSet.getString(1);
161+
subTableNames.add(subTableName);
162+
}
163+
}
164+
}
165+
166+
SeaTunnelRowType rowType = addHiddenAttribute(fieldNames, fieldTypes);
167+
return new StableMetadata(rowType, timestampFieldName, subTableNames);
168+
}
169+
170+
private SeaTunnelRowType addHiddenAttribute(
171+
List<String> fieldNames, List<SeaTunnelDataType<?>> fieldTypes) {
172+
// add subtable_name and tags to `seaTunnelRowType`
173+
// 0-subtable_name / 1-n field_names /
174+
String[] newFieldNames =
175+
ArrayUtils.add(fieldNames.toArray(new String[0]), 0, "subtable_name");
176+
// n+1-> tags
177+
SeaTunnelDataType<?>[] newFieldTypes =
178+
ArrayUtils.add(
179+
fieldTypes.toArray(new SeaTunnelDataType[0]), 0, BasicType.STRING_TYPE);
180+
return new SeaTunnelRowType(newFieldNames, newFieldTypes);
159181
}
160182
}

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java

Lines changed: 18 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,9 @@
1919

2020
import org.apache.seatunnel.api.source.SourceEvent;
2121
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
22-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23-
import org.apache.seatunnel.common.exception.CommonErrorCode;
2422
import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
25-
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
2623
import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
2724

28-
import org.apache.commons.lang3.StringUtils;
29-
30-
import com.google.common.collect.Sets;
31-
import lombok.SneakyThrows;
32-
33-
import java.sql.Connection;
34-
import java.sql.DriverManager;
35-
import java.sql.ResultSet;
36-
import java.sql.SQLException;
37-
import java.sql.Statement;
3825
import java.util.Arrays;
3926
import java.util.Collection;
4027
import java.util.Collections;
@@ -49,26 +36,25 @@ public class TDengineSourceSplitEnumerator
4936

5037
private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
5138
private final TDengineSourceConfig config;
39+
private final StableMetadata stableMetadata;
5240
private Set<TDengineSourceSplit> pendingSplit = new HashSet<>();
5341
private Set<TDengineSourceSplit> assignedSplit = new HashSet<>();
54-
private Connection conn;
55-
private SeaTunnelRowType seaTunnelRowType;
5642

5743
public TDengineSourceSplitEnumerator(
58-
SeaTunnelRowType seaTunnelRowType,
44+
StableMetadata stableMetadata,
5945
TDengineSourceConfig config,
6046
SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
61-
this(seaTunnelRowType, config, null, context);
47+
this(stableMetadata, config, null, context);
6248
}
6349

6450
public TDengineSourceSplitEnumerator(
65-
SeaTunnelRowType seaTunnelRowType,
51+
StableMetadata stableMetadata,
6652
TDengineSourceConfig config,
6753
TDengineSourceState sourceState,
6854
SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
6955
this.config = config;
7056
this.context = context;
71-
this.seaTunnelRowType = seaTunnelRowType;
57+
this.stableMetadata = stableMetadata;
7258
if (sourceState != null) {
7359
this.assignedSplit = sourceState.getAssignedSplit();
7460
}
@@ -78,64 +64,33 @@ private static int getSplitOwner(String tp, int numReaders) {
7864
return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
7965
}
8066

81-
@SneakyThrows
8267
@Override
83-
public void open() {
84-
String jdbcUrl =
85-
StringUtils.join(
86-
config.getUrl(),
87-
config.getDatabase(),
88-
"?user=",
89-
config.getUsername(),
90-
"&password=",
91-
config.getPassword());
92-
conn = DriverManager.getConnection(jdbcUrl);
93-
}
68+
public void open() {}
9469

9570
@Override
96-
public void run() throws SQLException {
71+
public void run() {
9772
pendingSplit = getAllSplits();
9873
assignSplit(context.registeredReaders());
9974
}
10075

10176
/*
102-
* 1. get timestampField
103-
* 2. get all sub tables of configured super table
104-
* 3. each split has one sub table
77+
* each split has one sub table
10578
*/
106-
private Set<TDengineSourceSplit> getAllSplits() throws SQLException {
107-
final String timestampFieldName;
108-
try (Statement statement = conn.createStatement()) {
109-
final ResultSet fieldNameResultSet =
110-
statement.executeQuery(
111-
"desc " + config.getDatabase() + "." + config.getStable());
112-
fieldNameResultSet.next();
113-
timestampFieldName = fieldNameResultSet.getString(1);
114-
}
115-
116-
final Set<TDengineSourceSplit> splits = Sets.newHashSet();
117-
try (Statement statement = conn.createStatement()) {
118-
String metaSQL =
119-
"select table_name from information_schema.ins_tables where db_name = '"
120-
+ config.getDatabase()
121-
+ "' and stable_name='"
122-
+ config.getStable()
123-
+ "';";
124-
ResultSet subTableNameResultSet = statement.executeQuery(metaSQL);
125-
while (subTableNameResultSet.next()) {
126-
final String subTableName = subTableNameResultSet.getString(1);
127-
final TDengineSourceSplit splitBySubTable =
128-
createSplitBySubTable(subTableName, timestampFieldName);
129-
splits.add(splitBySubTable);
130-
}
79+
private Set<TDengineSourceSplit> getAllSplits() {
80+
final String timestampFieldName = stableMetadata.getTimestampFieldName();
81+
final Set<TDengineSourceSplit> splits = new HashSet<>();
82+
for (String subTableName : stableMetadata.getSubTableNames()) {
83+
TDengineSourceSplit splitBySubTable =
84+
createSplitBySubTable(subTableName, timestampFieldName);
85+
splits.add(splitBySubTable);
13186
}
13287
return splits;
13388
}
13489

13590
private TDengineSourceSplit createSplitBySubTable(
13691
String subTableName, String timestampFieldName) {
13792
String selectFields =
138-
Arrays.stream(seaTunnelRowType.getFieldNames())
93+
Arrays.stream(stableMetadata.getRowType().getFieldNames())
13994
.skip(1)
14095
.collect(Collectors.joining(","));
14196
String subTableSQL =
@@ -152,7 +107,7 @@ private TDengineSourceSplit createSplitBySubTable(
152107
if (end != null) {
153108
endCondition = timestampFieldName + " < '" + end + "'";
154109
}
155-
String query = StringUtils.join(new String[] {startCondition, endCondition}, " and ");
110+
String query = String.join(" and ", startCondition, endCondition);
156111
subTableSQL = subTableSQL + " where " + query;
157112
}
158113

@@ -220,18 +175,7 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
220175
}
221176

222177
@Override
223-
public void close() {
224-
try {
225-
if (!Objects.isNull(conn)) {
226-
conn.close();
227-
}
228-
} catch (SQLException e) {
229-
throw new TDengineConnectorException(
230-
CommonErrorCode.READER_OPERATION_FAILED,
231-
"TDengine split_enumerator connection close failed",
232-
e);
233-
}
234-
}
178+
public void close() {}
235179

236180
@Override
237181
public void handleSplitRequest(int subtaskId) {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.e2e.common.TestResource;
2121
import org.apache.seatunnel.e2e.common.TestSuiteBase;
2222
import org.apache.seatunnel.e2e.common.container.TestContainer;
23+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2324

2425
import org.junit.jupiter.api.AfterAll;
2526
import org.junit.jupiter.api.Assertions;
@@ -48,6 +49,10 @@
4849
import static org.awaitility.Awaitility.given;
4950

5051
@Slf4j
52+
@DisabledOnContainer(
53+
value = {},
54+
type = {},
55+
disabledReason = "Override TestSuiteBase @DisabledOnContainer")
5156
public class TDengineIT extends TestSuiteBase implements TestResource {
5257
private static final String DOCKER_IMAGE = "tdengine/tdengine:3.0.2.1";
5358
private static final String NETWORK_ALIASES1 = "flink_e2e_tdengine_src";

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
env {
2222
# You can set flink configuration here
2323
execution.parallelism = 2
24-
job.mode = "STREAMING"
24+
job.mode = "BATCH"
2525
#execution.checkpoint.interval = 10000
2626
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
2727
}

0 commit comments

Comments
 (0)