Skip to content

Commit d324fc5

Browse files
authored
[improve] openmldb options (#9166)
1 parent 737b242 commit d324fc5

File tree

5 files changed

+80
-96
lines changed

5 files changed

+80
-96
lines changed

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ private Set<String> buildWhiteList() {
209209
whiteList.add("Neo4jSourceOptions");
210210
whiteList.add("QdrantSourceOptions");
211211
whiteList.add("SocketSourceOptions");
212-
whiteList.add("OpenMldbSourceOptions");
213212
whiteList.add("PostgresIncrementalSourceOptions");
214213
whiteList.add("SqlServerIncrementalSourceOptions");
215214
whiteList.add("OracleIncrementalSourceOptions");

seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ public class OpenMldbParameters implements Serializable {
2929
private String zkPath;
3030
private String host;
3131
private int port;
32-
private int sessionTimeout = OpenMldbConfig.SESSION_TIMEOUT.defaultValue();
33-
private int requestTimeout = OpenMldbConfig.REQUEST_TIMEOUT.defaultValue();
32+
private int sessionTimeout = OpenMldbSourceOptions.SESSION_TIMEOUT.defaultValue();
33+
private int requestTimeout = OpenMldbSourceOptions.REQUEST_TIMEOUT.defaultValue();
3434
private Boolean clusterMode;
3535
private String database;
3636
private String sql;
@@ -41,34 +41,35 @@ private OpenMldbParameters() {
4141

4242
public static OpenMldbParameters buildWithConfig(Config pluginConfig) {
4343
OpenMldbParameters openMldbParameters = new OpenMldbParameters();
44-
openMldbParameters.clusterMode = pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key());
45-
openMldbParameters.database = pluginConfig.getString(OpenMldbConfig.DATABASE.key());
46-
openMldbParameters.sql = pluginConfig.getString(OpenMldbConfig.SQL.key());
44+
openMldbParameters.clusterMode =
45+
pluginConfig.getBoolean(OpenMldbSourceOptions.CLUSTER_MODE.key());
46+
openMldbParameters.database = pluginConfig.getString(OpenMldbSourceOptions.DATABASE.key());
47+
openMldbParameters.sql = pluginConfig.getString(OpenMldbSourceOptions.SQL.key());
4748
// set zkHost
48-
if (pluginConfig.hasPath(OpenMldbConfig.ZK_HOST.key())) {
49-
openMldbParameters.zkHost = pluginConfig.getString(OpenMldbConfig.ZK_HOST.key());
49+
if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_HOST.key())) {
50+
openMldbParameters.zkHost = pluginConfig.getString(OpenMldbSourceOptions.ZK_HOST.key());
5051
}
5152
// set zkPath
52-
if (pluginConfig.hasPath(OpenMldbConfig.ZK_PATH.key())) {
53-
openMldbParameters.zkPath = pluginConfig.getString(OpenMldbConfig.ZK_PATH.key());
53+
if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_PATH.key())) {
54+
openMldbParameters.zkPath = pluginConfig.getString(OpenMldbSourceOptions.ZK_PATH.key());
5455
}
5556
// set host
56-
if (pluginConfig.hasPath(OpenMldbConfig.HOST.key())) {
57-
openMldbParameters.host = pluginConfig.getString(OpenMldbConfig.HOST.key());
57+
if (pluginConfig.hasPath(OpenMldbSourceOptions.HOST.key())) {
58+
openMldbParameters.host = pluginConfig.getString(OpenMldbSourceOptions.HOST.key());
5859
}
5960
// set port
60-
if (pluginConfig.hasPath(OpenMldbConfig.PORT.key())) {
61-
openMldbParameters.port = pluginConfig.getInt(OpenMldbConfig.PORT.key());
61+
if (pluginConfig.hasPath(OpenMldbSourceOptions.PORT.key())) {
62+
openMldbParameters.port = pluginConfig.getInt(OpenMldbSourceOptions.PORT.key());
6263
}
6364
// set session timeout
64-
if (pluginConfig.hasPath(OpenMldbConfig.SESSION_TIMEOUT.key())) {
65+
if (pluginConfig.hasPath(OpenMldbSourceOptions.SESSION_TIMEOUT.key())) {
6566
openMldbParameters.sessionTimeout =
66-
pluginConfig.getInt(OpenMldbConfig.SESSION_TIMEOUT.key());
67+
pluginConfig.getInt(OpenMldbSourceOptions.SESSION_TIMEOUT.key());
6768
}
6869
// set request timeout
69-
if (pluginConfig.hasPath(OpenMldbConfig.REQUEST_TIMEOUT.key())) {
70+
if (pluginConfig.hasPath(OpenMldbSourceOptions.REQUEST_TIMEOUT.key())) {
7071
openMldbParameters.requestTimeout =
71-
pluginConfig.getInt(OpenMldbConfig.REQUEST_TIMEOUT.key());
72+
pluginConfig.getInt(OpenMldbSourceOptions.REQUEST_TIMEOUT.key());
7273
}
7374
return openMldbParameters;
7475
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222

23-
public class OpenMldbConfig {
23+
public class OpenMldbSourceOptions {
2424
private static final int DEFAULT_SESSION_TIMEOUT = 10000;
2525
private static final int DEFAULT_REQUEST_TIMEOUT = 60000;
2626
public static final Option<String> ZK_HOST =

seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java

Lines changed: 35 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,22 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.openmldb.source;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.common.JobContext;
23-
import org.apache.seatunnel.api.common.PrepareFailException;
24-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2521
import org.apache.seatunnel.api.source.Boundedness;
26-
import org.apache.seatunnel.api.source.SeaTunnelSource;
2722
import org.apache.seatunnel.api.source.SupportColumnProjection;
23+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
24+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
26+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2827
import org.apache.seatunnel.api.table.type.BasicType;
2928
import org.apache.seatunnel.api.table.type.LocalTimeType;
3029
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3130
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
32-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
33-
import org.apache.seatunnel.common.config.CheckConfigUtil;
34-
import org.apache.seatunnel.common.config.CheckResult;
3531
import org.apache.seatunnel.common.constants.JobMode;
36-
import org.apache.seatunnel.common.constants.PluginType;
3732
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
3833
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
3934
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
4035
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
41-
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
4236
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
4337
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSqlExecutor;
4438
import org.apache.seatunnel.connectors.seatunnel.openmldb.exception.OpenMldbConnectorException;
@@ -47,75 +41,40 @@
4741
import com._4paradigm.openmldb.sdk.Schema;
4842
import com._4paradigm.openmldb.sdk.SqlException;
4943
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
50-
import com.google.auto.service.AutoService;
5144

5245
import java.sql.SQLException;
5346
import java.sql.Types;
47+
import java.util.Collections;
5448
import java.util.List;
5549

56-
@AutoService(SeaTunnelSource.class)
5750
public class OpenMldbSource extends AbstractSingleSplitSource<SeaTunnelRow>
5851
implements SupportColumnProjection {
59-
private OpenMldbParameters openMldbParameters;
52+
private final OpenMldbParameters openMldbParameters;
53+
private final CatalogTable catalogTable;
6054
private JobContext jobContext;
61-
private SeaTunnelRowType seaTunnelRowType;
6255

63-
@Override
64-
public String getPluginName() {
65-
return "OpenMldb";
66-
}
67-
68-
@Override
69-
public void prepare(Config pluginConfig) throws PrepareFailException {
70-
CheckResult result =
71-
CheckConfigUtil.checkAllExists(
72-
pluginConfig,
73-
OpenMldbConfig.CLUSTER_MODE.key(),
74-
OpenMldbConfig.SQL.key(),
75-
OpenMldbConfig.DATABASE.key());
76-
if (!result.isSuccess()) {
77-
throw new OpenMldbConnectorException(
78-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
79-
String.format(
80-
"PluginName: %s, PluginType: %s, Message: %s",
81-
getPluginName(), PluginType.SOURCE, result.getMsg()));
82-
}
83-
if (pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key())) {
84-
// cluster mode
85-
result =
86-
CheckConfigUtil.checkAllExists(
87-
pluginConfig,
88-
OpenMldbConfig.ZK_HOST.key(),
89-
OpenMldbConfig.ZK_PATH.key());
90-
} else {
91-
// single mode
92-
result =
93-
CheckConfigUtil.checkAllExists(
94-
pluginConfig, OpenMldbConfig.HOST.key(), OpenMldbConfig.PORT.key());
95-
}
96-
if (!result.isSuccess()) {
97-
throw new OpenMldbConnectorException(
98-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
99-
String.format(
100-
"PluginName: %s, PluginType: %s, Message: %s",
101-
getPluginName(), PluginType.SOURCE, result.getMsg()));
102-
}
103-
this.openMldbParameters = OpenMldbParameters.buildWithConfig(pluginConfig);
56+
public OpenMldbSource(OpenMldbParameters openMldbParameters) {
57+
this.openMldbParameters = openMldbParameters;
10458
OpenMldbSqlExecutor.initSdkOption(openMldbParameters);
10559
try {
10660
SqlClusterExecutor sqlExecutor = OpenMldbSqlExecutor.getSqlExecutor();
10761
Schema inputSchema =
10862
sqlExecutor.getInputSchema(
10963
openMldbParameters.getDatabase(), openMldbParameters.getSql());
11064
List<Column> columnList = inputSchema.getColumnList();
111-
this.seaTunnelRowType = convert(columnList);
65+
this.catalogTable = convert(columnList);
11266
} catch (SQLException | SqlException e) {
11367
throw new OpenMldbConnectorException(
11468
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
11569
"Failed to initialize data schema");
11670
}
11771
}
11872

73+
@Override
74+
public String getPluginName() {
75+
return "OpenMldb";
76+
}
77+
11978
@Override
12079
public Boundedness getBoundedness() {
12180
return JobMode.BATCH.equals(jobContext.getJobMode())
@@ -124,14 +83,15 @@ public Boundedness getBoundedness() {
12483
}
12584

12685
@Override
127-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
128-
return seaTunnelRowType;
86+
public List<CatalogTable> getProducedCatalogTables() {
87+
return Collections.singletonList(catalogTable);
12988
}
13089

13190
@Override
13291
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
13392
SingleSplitReaderContext readerContext) throws Exception {
134-
return new OpenMldbSourceReader(openMldbParameters, seaTunnelRowType, readerContext);
93+
return new OpenMldbSourceReader(
94+
openMldbParameters, catalogTable.getSeaTunnelRowType(), readerContext);
13595
}
13696

13797
@Override
@@ -166,14 +126,24 @@ private SeaTunnelDataType<?> convertSeaTunnelDataType(int type) {
166126
}
167127
}
168128

169-
private SeaTunnelRowType convert(List<Column> columnList) {
170-
String[] fieldsName = new String[columnList.size()];
171-
SeaTunnelDataType<?>[] fieldsType = new SeaTunnelDataType<?>[columnList.size()];
129+
private CatalogTable convert(List<Column> columnList) {
130+
TableSchema.Builder builder = TableSchema.builder();
172131
for (int i = 0; i < columnList.size(); i++) {
173132
Column column = columnList.get(i);
174-
fieldsName[i] = column.getColumnName();
175-
fieldsType[i] = convertSeaTunnelDataType(column.getSqlType());
133+
builder.column(
134+
PhysicalColumn.of(
135+
column.getColumnName(),
136+
convertSeaTunnelDataType(column.getSqlType()),
137+
(Long) null,
138+
column.isNotNull(),
139+
null,
140+
null));
176141
}
177-
return new SeaTunnelRowType(fieldsName, fieldsType);
142+
return CatalogTable.of(
143+
TableIdentifier.of("OpenMldb", openMldbParameters.getDatabase(), "default"),
144+
builder.build(),
145+
null,
146+
null,
147+
null);
178148
}
179149
}

seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
2121
import org.apache.seatunnel.api.source.SeaTunnelSource;
22+
import org.apache.seatunnel.api.source.SourceSplit;
23+
import org.apache.seatunnel.api.table.connector.TableSource;
2224
import org.apache.seatunnel.api.table.factory.Factory;
2325
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
24-
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
26+
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
27+
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
28+
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSourceOptions;
2529

2630
import com.google.auto.service.AutoService;
2731

32+
import java.io.Serializable;
33+
2834
@AutoService(Factory.class)
2935
public class OpenMldbSourceFactory implements TableSourceFactory {
3036
@Override
@@ -35,26 +41,34 @@ public String factoryIdentifier() {
3541
@Override
3642
public OptionRule optionRule() {
3743
return OptionRule.builder()
38-
.required(OpenMldbConfig.CLUSTER_MODE)
39-
.required(OpenMldbConfig.SQL)
40-
.required(OpenMldbConfig.DATABASE)
41-
.optional(OpenMldbConfig.SESSION_TIMEOUT)
42-
.optional(OpenMldbConfig.REQUEST_TIMEOUT)
44+
.required(OpenMldbSourceOptions.CLUSTER_MODE)
45+
.required(OpenMldbSourceOptions.SQL)
46+
.required(OpenMldbSourceOptions.DATABASE)
47+
.optional(OpenMldbSourceOptions.SESSION_TIMEOUT)
48+
.optional(OpenMldbSourceOptions.REQUEST_TIMEOUT)
4349
.conditional(
44-
OpenMldbConfig.CLUSTER_MODE,
50+
OpenMldbSourceOptions.CLUSTER_MODE,
4551
false,
46-
OpenMldbConfig.HOST,
47-
OpenMldbConfig.PORT)
52+
OpenMldbSourceOptions.HOST,
53+
OpenMldbSourceOptions.PORT)
4854
.conditional(
49-
OpenMldbConfig.CLUSTER_MODE,
55+
OpenMldbSourceOptions.CLUSTER_MODE,
5056
true,
51-
OpenMldbConfig.ZK_HOST,
52-
OpenMldbConfig.ZK_PATH)
57+
OpenMldbSourceOptions.ZK_HOST,
58+
OpenMldbSourceOptions.ZK_PATH)
5359
.build();
5460
}
5561

5662
@Override
5763
public Class<? extends SeaTunnelSource> getSourceClass() {
5864
return OpenMldbSource.class;
5965
}
66+
67+
@Override
68+
public <T, SplitT extends SourceSplit, StateT extends Serializable>
69+
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
70+
OpenMldbParameters openMldbParameters =
71+
OpenMldbParameters.buildWithConfig(context.getOptions().toConfig());
72+
return () -> (SeaTunnelSource<T, SplitT, StateT>) new OpenMldbSource(openMldbParameters);
73+
}
6074
}

0 commit comments

Comments
 (0)