Skip to content

Commit ed94fff

Browse files
authored
[Improve] Remove all useless prepare, getProducedType method (#5741)
1 parent eff17cc commit ed94fff

File tree

4 files changed

+109
-83
lines changed
  • seatunnel-connectors-v2
    • connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink
    • connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source
    • connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source
  • seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector

4 files changed

+109
-83
lines changed

seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,16 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.amazonsqs.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2420
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
25-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2621
import org.apache.seatunnel.api.sink.SinkWriter;
2722
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2823
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2924
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
30-
import org.apache.seatunnel.common.config.CheckConfigUtil;
31-
import org.apache.seatunnel.common.config.CheckResult;
32-
import org.apache.seatunnel.common.constants.PluginType;
33-
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.exception.AmazonSqsConnectorException;
3425
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
3526
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
3627

37-
import com.google.auto.service.AutoService;
38-
import lombok.NoArgsConstructor;
39-
4028
import java.io.IOException;
4129

42-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.REGION;
43-
import static org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig.URL;
44-
45-
@AutoService(SeaTunnelSink.class)
46-
@NoArgsConstructor
4730
public class AmazonSqsSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
4831
private SeaTunnelRowType typeInfo;
4932
private ReadonlyConfig pluginConfig;
@@ -58,24 +41,6 @@ public AmazonSqsSink(ReadonlyConfig pluginConfig, SeaTunnelRowType typeInfo) {
5841
this.pluginConfig = pluginConfig;
5942
}
6043

61-
@Override
62-
public void prepare(Config pluginConfig) throws PrepareFailException {
63-
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL.key(), REGION.key());
64-
if (!result.isSuccess()) {
65-
throw new AmazonSqsConnectorException(
66-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
67-
String.format(
68-
"PluginName: %s, PluginType: %s, Message: %s",
69-
getPluginName(), PluginType.SOURCE, result.getMsg()));
70-
}
71-
this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
72-
}
73-
74-
@Override
75-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
76-
this.typeInfo = seaTunnelRowType;
77-
}
78-
7944
@Override
8045
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
8146
return typeInfo;

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
24-
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
2520
import org.apache.seatunnel.api.serialization.Serializer;
2621
import org.apache.seatunnel.api.source.Boundedness;
2722
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -31,37 +26,30 @@
3126
import org.apache.seatunnel.api.source.SupportParallelism;
3227
import org.apache.seatunnel.api.table.catalog.CatalogTable;
3328
import org.apache.seatunnel.api.table.catalog.TablePath;
34-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3529
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3630
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3731
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
38-
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
39-
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
4032
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
4133
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
4234

4335
import org.slf4j.Logger;
4436
import org.slf4j.LoggerFactory;
4537

46-
import com.google.auto.service.AutoService;
47-
import lombok.NoArgsConstructor;
4838
import lombok.SneakyThrows;
4939

5040
import java.util.HashMap;
5141
import java.util.List;
5242
import java.util.Map;
5343
import java.util.stream.Collectors;
5444

55-
@AutoService(SeaTunnelSource.class)
56-
@NoArgsConstructor
5745
public class JdbcSource
5846
implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit, JdbcSourceState>,
5947
SupportParallelism,
6048
SupportColumnProjection {
6149
protected static final Logger LOG = LoggerFactory.getLogger(JdbcSource.class);
6250

63-
private JdbcSourceConfig jdbcSourceConfig;
64-
private Map<TablePath, JdbcSourceTable> jdbcSourceTables;
51+
private final JdbcSourceConfig jdbcSourceConfig;
52+
private final Map<TablePath, JdbcSourceTable> jdbcSourceTables;
6553

6654
@SneakyThrows
6755
public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
@@ -77,40 +65,15 @@ public String getPluginName() {
7765
return "Jdbc";
7866
}
7967

80-
@SneakyThrows
81-
@Override
82-
public void prepare(Config pluginConfig) throws PrepareFailException {
83-
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
84-
ConfigValidator.of(config).validate(new JdbcSourceFactory().optionRule());
85-
this.jdbcSourceConfig = JdbcSourceConfig.of(config);
86-
JdbcDialect jdbcDialect =
87-
JdbcDialectLoader.load(
88-
jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
89-
jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode());
90-
jdbcDialect.connectionUrlParse(
91-
jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
92-
jdbcSourceConfig.getJdbcConnectionConfig().getProperties(),
93-
jdbcDialect.defaultParameter());
94-
this.jdbcSourceTables =
95-
JdbcCatalogUtils.getTables(
96-
jdbcSourceConfig.getJdbcConnectionConfig(),
97-
jdbcSourceConfig.getTableConfigList());
98-
}
99-
10068
@Override
10169
public Boundedness getBoundedness() {
10270
return Boundedness.BOUNDED;
10371
}
10472

105-
@Override
106-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
107-
return getProducedCatalogTables().get(0).getSeaTunnelRowType();
108-
}
109-
11073
@Override
11174
public List<CatalogTable> getProducedCatalogTables() {
11275
return jdbcSourceTables.values().stream()
113-
.map(e -> e.getCatalogTable())
76+
.map(JdbcSourceTable::getCatalogTable)
11477
.collect(Collectors.toList());
11578
}
11679

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,14 @@
2525
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2626
import org.apache.seatunnel.api.source.SupportParallelism;
2727
import org.apache.seatunnel.api.table.catalog.CatalogTable;
28-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2928
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3029
import org.apache.seatunnel.common.constants.JobMode;
3130
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
3231

33-
import com.google.auto.service.AutoService;
3432
import com.google.common.collect.Lists;
3533

3634
import java.util.List;
3735

38-
@AutoService(SeaTunnelSource.class)
3936
public class KafkaSource
4037
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
4138
SupportParallelism {
@@ -65,11 +62,6 @@ public List<CatalogTable> getProducedCatalogTables() {
6562
return Lists.newArrayList(kafkaSourceConfig.getCatalogTable());
6663
}
6764

68-
@Override
69-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
70-
return kafkaSourceConfig.getCatalogTable().getSeaTunnelRowType();
71-
}
72-
7365
@Override
7466
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
7567
SourceReader.Context readerContext) {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.connector;
19+
20+
import org.apache.seatunnel.api.source.SeaTunnelSource;
21+
import org.apache.seatunnel.api.table.factory.FactoryUtil;
22+
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
23+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
24+
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
25+
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
26+
import org.apache.seatunnel.common.utils.ReflectionUtils;
27+
28+
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.lang.reflect.Method;
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
import java.util.Optional;
35+
36+
public class ConnectorSpecificationCheckTest {
37+
38+
@Test
39+
public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNotFoundException {
40+
List<TableSourceFactory> sourceFactories =
41+
FactoryUtil.discoverFactories(
42+
Thread.currentThread().getContextClassLoader(), TableSourceFactory.class);
43+
44+
// Some class can not get method, because it without some necessary jar dependency, like
45+
// hive-exec.jar. We need to check manually.
46+
List<String> blockList = new ArrayList<>();
47+
blockList.add("HiveSourceFactory");
48+
49+
for (TableSourceFactory factory : sourceFactories) {
50+
if (ReflectionUtils.getDeclaredMethod(
51+
factory.getClass(),
52+
"createSource",
53+
TableSourceFactoryContext.class)
54+
.isPresent()
55+
&& !blockList.contains(factory.getClass().getSimpleName())) {
56+
Class<? extends SeaTunnelSource> sourceClass = factory.getSourceClass();
57+
Optional<Method> prepare =
58+
ReflectionUtils.getDeclaredMethod(sourceClass, "prepare");
59+
Optional<Method> getProducedType =
60+
ReflectionUtils.getDeclaredMethod(sourceClass, "getProducedType");
61+
Optional<Method> getProducedCatalogTables =
62+
ReflectionUtils.getDeclaredMethod(sourceClass, "getProducedCatalogTables");
63+
Assertions.assertFalse(
64+
prepare.isPresent(),
65+
"Please remove `prepare` method, it will not be used any more");
66+
Assertions.assertFalse(
67+
getProducedType.isPresent(),
68+
"Please use `getProducedCatalogTables` method, do not implement `getProducedType` method in "
69+
+ sourceClass.getSimpleName());
70+
Assertions.assertTrue(
71+
getProducedCatalogTables.isPresent(),
72+
"Please implement `getProducedCatalogTables` method in "
73+
+ sourceClass.getSimpleName());
74+
}
75+
}
76+
77+
List<TableSinkFactory> sinkFactories =
78+
FactoryUtil.discoverFactories(
79+
Thread.currentThread().getContextClassLoader(), TableSinkFactory.class);
80+
for (TableSinkFactory factory : sinkFactories) {
81+
String factoryName = factory.getClass().getSimpleName();
82+
if (ReflectionUtils.getDeclaredMethod(
83+
factory.getClass(), "createSink", TableSinkFactoryContext.class)
84+
.isPresent()
85+
&& !blockList.contains(factoryName)) {
86+
Class<? extends SeaTunnelSource> sinkClass =
87+
(Class<? extends SeaTunnelSource>)
88+
Class.forName(
89+
factory.getClass()
90+
.getName()
91+
.replace(
92+
factoryName,
93+
factoryName.replace("Factory", "")));
94+
Optional<Method> prepare = ReflectionUtils.getDeclaredMethod(sinkClass, "prepare");
95+
Optional<Method> setTypeInfo =
96+
ReflectionUtils.getDeclaredMethod(sinkClass, "setTypeInfo");
97+
Assertions.assertFalse(
98+
prepare.isPresent(),
99+
"Please remove `prepare` method in " + sinkClass.getSimpleName());
100+
Assertions.assertFalse(
101+
setTypeInfo.isPresent(),
102+
"Please remove `setTypeInfo` method in " + sinkClass.getSimpleName());
103+
}
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)