Skip to content

Commit 417178f

Browse files
EricJoy2048ashulin
andauthored
[Feature][Connector] add get source method to all source connector (#3846)
* add transform doc * add transform v2 document * remove transform v1 from document * improve document * fix dead link * fix dead link * fix dead link * update supported connnector num * Update docs/en/transform-v2/replace.md Co-authored-by: Zongwen Li <zongwen.li.tech@gmail.com> * fix ci * fix ci error * add Parallelism and SchemaProjection inteface to Source Connector * update schemaprojection to columnprojection * fix code style * tmp * all connector add getSourceClass method * fix ci error * fix ci error Co-authored-by: Zongwen Li <zongwen.li.tech@gmail.com>
1 parent 1118c83 commit 417178f

File tree

35 files changed

+197
-9
lines changed

35 files changed

+197
-9
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.source.SeaTunnelSource;
2323
import org.apache.seatunnel.api.source.SourceCommonOptions;
2424
import org.apache.seatunnel.api.source.SourceSplit;
25+
import org.apache.seatunnel.api.source.SupportParallelism;
2526
import org.apache.seatunnel.api.table.catalog.Catalog;
2627
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2728
import org.apache.seatunnel.api.table.connector.TableSource;
@@ -163,15 +164,19 @@ public static List<Factory> discoverFactories(ClassLoader classLoader) {
163164
* This method is called by SeaTunnel Web to get the full option rule of a source.
164165
* @return
165166
*/
166-
public static OptionRule sourceFullOptionRule(@NonNull Factory factory) {
167+
public static OptionRule sourceFullOptionRule(@NonNull TableSourceFactory factory) {
167168
OptionRule sourceOptionRule = factory.optionRule();
168169
if (sourceOptionRule == null) {
169170
throw new FactoryException("sourceOptionRule can not be null");
170171
}
171172

172-
OptionRule sourceCommonOptionRule =
173-
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
174-
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
173+
Class<? extends SeaTunnelSource> sourceClass = factory.getSourceClass();
174+
if (sourceClass.isAssignableFrom(SupportParallelism.class)) {
175+
OptionRule sourceCommonOptionRule =
176+
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
177+
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
178+
}
179+
175180
return sourceOptionRule;
176181
}
177182
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.api.table.factory;
1919

20+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2021
import org.apache.seatunnel.api.source.SourceSplit;
2122
import org.apache.seatunnel.api.table.connector.TableSource;
2223

@@ -36,4 +37,6 @@ default <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource
3637
TableFactoryContext context) {
3738
throw new UnsupportedOperationException("unsupported now");
3839
}
40+
41+
Class<? extends SeaTunnelSource> getSourceClass();
3942
}

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
2525

2626
import org.apache.seatunnel.api.configuration.util.OptionRule;
27+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2728
import org.apache.seatunnel.api.table.factory.Factory;
2829
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2930
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
@@ -42,4 +43,9 @@ public OptionRule optionRule() {
4243
return OptionRule.builder()
4344
.required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE, SeaTunnelSchema.SCHEMA).build();
4445
}
46+
47+
@Override
48+
public Class<? extends SeaTunnelSource> getSourceClass() {
49+
return AmazonDynamoDBSource.class;
50+
}
4551
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2122
import org.apache.seatunnel.api.table.factory.Factory;
2223
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2324
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
@@ -51,4 +52,9 @@ public OptionRule optionRule() {
5152
JdbcSourceOptions.CONNECTION_POOL_SIZE)
5253
.build();
5354
}
55+
56+
@Override
57+
public Class<? extends SeaTunnelSource> getSourceClass() {
58+
return MySqlIncrementalSource.class;
59+
}
5460
}

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
2525

2626
import org.apache.seatunnel.api.configuration.util.OptionRule;
27+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2728
import org.apache.seatunnel.api.table.factory.Factory;
2829
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2930

@@ -40,4 +41,9 @@ public String factoryIdentifier() {
4041
public OptionRule optionRule() {
4142
return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME, PASSWORD).build();
4243
}
44+
45+
@Override
46+
public Class<? extends SeaTunnelSource> getSourceClass() {
47+
return ClickhouseSource.class;
48+
}
4349
}

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE;
2727

2828
import org.apache.seatunnel.api.configuration.util.OptionRule;
29+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2930
import org.apache.seatunnel.api.table.factory.Factory;
3031
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
3132
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
@@ -44,4 +45,9 @@ public OptionRule optionRule() {
4445
return OptionRule.builder().required(HOSTS, INDEX).optional(USERNAME, PASSWORD, SCROLL_TIME, SCROLL_SIZE)
4546
.exclusive(SOURCE, SeaTunnelSchema.SCHEMA).build();
4647
}
48+
49+
@Override
50+
public Class<? extends SeaTunnelSource> getSourceClass() {
51+
return ElasticsearchSource.class;
52+
}
4753
}

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_LENGTH;
2727

2828
import org.apache.seatunnel.api.configuration.util.OptionRule;
29+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2930
import org.apache.seatunnel.api.table.factory.Factory;
3031
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
3132
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
@@ -44,4 +45,9 @@ public OptionRule optionRule() {
4445
return OptionRule.builder().required(SeaTunnelSchema.SCHEMA).optional(ROW_NUM, SPLIT_NUM, SPLIT_READ_INTERVAL, MAP_SIZE,
4546
ARRAY_SIZE, BYTES_LENGTH, STRING_LENGTH).build();
4647
}
48+
49+
@Override
50+
public Class<? extends SeaTunnelSource> getSourceClass() {
51+
return FakeSource.class;
52+
}
4753
}

seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2122
import org.apache.seatunnel.api.table.factory.Factory;
2223
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2324
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
@@ -55,4 +56,9 @@ public OptionRule optionRule() {
5556
.optional(BaseSourceConfig.TIME_FORMAT)
5657
.build();
5758
}
59+
60+
@Override
61+
public Class<? extends SeaTunnelSource> getSourceClass() {
62+
return FtpFileSource.class;
63+
}
5864
}

seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2122
import org.apache.seatunnel.api.table.factory.Factory;
2223
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2324
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
@@ -52,4 +53,9 @@ public OptionRule optionRule() {
5253
.optional(BaseSourceConfig.TIME_FORMAT)
5354
.build();
5455
}
56+
57+
@Override
58+
public Class<? extends SeaTunnelSource> getSourceClass() {
59+
return HdfsFileSource.class;
60+
}
5561
}

seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.local.source;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.source.SeaTunnelSource;
2122
import org.apache.seatunnel.api.table.factory.Factory;
2223
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2324
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
@@ -51,4 +52,9 @@ public OptionRule optionRule() {
5152
.optional(BaseSourceConfig.TIME_FORMAT)
5253
.build();
5354
}
55+
56+
@Override
57+
public Class<? extends SeaTunnelSource> getSourceClass() {
58+
return LocalFileSource.class;
59+
}
5460
}

0 commit comments

Comments
 (0)