Skip to content

Commit e68ecf7

Browse files
[Feature][Conenctor-V2] Add openmldb source connector (#3313)
* [Feature][Connector-V2][OpenMldb] First commit * [Feature][Connector-V2] Add openmldb source connector * [Feature][Connector-V2] Fix code style * [Feature][Connector-V2] Update plugin-mapping.properties * [Feature][Connector-V2] Update pom of seatunnel-dist * [Feature][Connector-V2] Fix some bugs * [Feature][Connector-V2] Fix some bugs * [Feature][Connector-V2] Fix some bugs * [Feature][Connector-V2] Make code more clearly * [Feature][Connector-V2] Add docs * [Feature][Connector-V2][OpenMldb] Unified exception * [Feature][Connector-V2][OpenMldb] Add option factory * [Feature][Connector-V2][OpenMldb] Unified exception * [Feature][Connector-V2][OpenMldb] Fix code style * [Feature][Connector-V2][OpenMldb] Fix pom error
1 parent c5a2302 commit e68ecf7

File tree

12 files changed

+707
-1
lines changed

12 files changed

+707
-1
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# OpenMldb
2+
3+
> OpenMldb source connector
4+
5+
## Description
6+
7+
Used to read data from OpenMldb.
8+
9+
## Key features
10+
11+
- [x] [batch](../../concept/connector-v2-features.md)
12+
- [x] [stream](../../concept/connector-v2-features.md)
13+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
14+
- [ ] [schema projection](../../concept/connector-v2-features.md)
15+
- [ ] [parallelism](../../concept/connector-v2-features.md)
16+
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
17+
18+
## Options
19+
20+
| name | type | required | default value |
21+
|-----------------|---------|----------|---------------|
22+
| cluster_mode | boolean | yes | - |
23+
| sql | string | yes | - |
24+
| database | string | yes | - |
25+
| host | string | no | - |
26+
| port | int | no | - |
27+
| zk_path | string | no | - |
28+
| zk_host | string | no | - |
29+
| session_timeout | int | no | 10000 |
30+
| request_timeout | int | no | 60000 |
31+
| common-options | | no | - |
32+
33+
### cluster_mode [string]
34+
35+
OpenMldb is or not cluster mode
36+
37+
### sql [string]
38+
39+
Sql statement
40+
41+
### database [string]
42+
43+
Database name
44+
45+
### host [string]
46+
47+
OpenMldb host, only supported on OpenMldb single mode
48+
49+
### port [int]
50+
51+
OpenMldb port, only supported on OpenMldb single mode
52+
53+
### zk_host [string]
54+
55+
Zookeeper host, only supported on OpenMldb cluster mode
56+
57+
### zk_path [string]
58+
59+
Zookeeper path, only supported on OpenMldb cluster mode
60+
61+
### session_timeout [int]
62+
63+
OpenMldb session timeout(ms), default 60000
64+
65+
### request_timeout [int]
66+
67+
OpenMldb request timeout(ms), default 10000
68+
69+
### common options
70+
71+
Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details
72+
73+
## Example
74+
75+
```hocon
76+
77+
OpenMldb {
78+
host = "172.17.0.2"
79+
port = 6527
80+
sql = "select * from demo_table1"
81+
database = "demo_db"
82+
cluster_mode = false
83+
}
84+
85+
```

plugin-mapping.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,5 @@ seatunnel.source.OneSignal = connector-http-onesignal
155155
seatunnel.source.Jira = connector-http-jira
156156
seatunnel.source.Gitlab = connector-http-gitlab
157157
seatunnel.sink.RabbitMQ = connector-rabbitmq
158-
seatunnel.source.RabbitMQ = connector-rabbitmq
158+
seatunnel.source.RabbitMQ = connector-rabbitmq
159+
seatunnel.source.OpenMldb = connector-openmldb
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<artifactId>seatunnel-connectors-v2</artifactId>
25+
<groupId>org.apache.seatunnel</groupId>
26+
<version>${revision}</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<artifactId>connector-openmldb</artifactId>
31+
32+
<properties>
33+
<openmldb.version>0.6.3</openmldb.version>
34+
</properties>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.seatunnel</groupId>
39+
<artifactId>connector-common</artifactId>
40+
<version>${project.version}</version>
41+
</dependency>
42+
<dependency>
43+
<groupId>com.4paradigm.openmldb</groupId>
44+
<artifactId>openmldb-jdbc</artifactId>
45+
<version>${openmldb.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>com.4paradigm.openmldb</groupId>
49+
<artifactId>openmldb-native</artifactId>
50+
<version>${openmldb.version}</version>
51+
</dependency>
52+
</dependencies>
53+
54+
</project>
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.openmldb.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
public class OpenMldbConfig {
24+
private static final int DEFAULT_SESSION_TIMEOUT = 10000;
25+
private static final int DEFAULT_REQUEST_TIMEOUT = 60000;
26+
public static final Option<String> ZK_HOST = Options.key("zk_host")
27+
.stringType()
28+
.noDefaultValue()
29+
.withDescription("Zookeeper server host");
30+
public static final Option<String> ZK_PATH = Options.key("zk_path")
31+
.stringType()
32+
.noDefaultValue()
33+
.withDescription("Zookeeper server path of OpenMldb cluster");
34+
public static final Option<String> HOST = Options.key("host")
35+
.stringType()
36+
.noDefaultValue()
37+
.withDescription("OpenMldb host");
38+
public static final Option<Integer> PORT = Options.key("port")
39+
.intType()
40+
.noDefaultValue()
41+
.withDescription("OpenMldb port");
42+
public static final Option<Integer> SESSION_TIMEOUT = Options.key("session_timeout")
43+
.intType()
44+
.defaultValue(DEFAULT_SESSION_TIMEOUT)
45+
.withDescription("OpenMldb session timeout");
46+
public static final Option<Integer> REQUEST_TIMEOUT = Options.key("request_timeout")
47+
.intType()
48+
.defaultValue(DEFAULT_REQUEST_TIMEOUT)
49+
.withDescription("OpenMldb request timeout");
50+
public static final Option<Boolean> CLUSTER_MODE = Options.key("cluster_mode")
51+
.booleanType()
52+
.noDefaultValue()
53+
.withDescription("Whether cluster mode is enabled");
54+
public static final Option<String> SQL = Options.key("sql")
55+
.stringType()
56+
.noDefaultValue()
57+
.withDescription("Sql statement");
58+
public static final Option<String> DATABASE = Options.key("database")
59+
.stringType()
60+
.noDefaultValue()
61+
.withDescription("The database you want to access");
62+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.openmldb.config;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
22+
import lombok.Getter;
23+
24+
import java.io.Serializable;
25+
26+
@Getter
27+
public class OpenMldbParameters implements Serializable {
28+
private String zkHost;
29+
private String zkPath;
30+
private String host;
31+
private int port;
32+
private int sessionTimeout = OpenMldbConfig.SESSION_TIMEOUT.defaultValue();
33+
private int requestTimeout = OpenMldbConfig.REQUEST_TIMEOUT.defaultValue();
34+
private Boolean clusterMode;
35+
private String database;
36+
private String sql;
37+
38+
private OpenMldbParameters() {
39+
// do nothing
40+
}
41+
42+
public static OpenMldbParameters buildWithConfig(Config pluginConfig) {
43+
OpenMldbParameters openMldbParameters = new OpenMldbParameters();
44+
openMldbParameters.clusterMode = pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key());
45+
openMldbParameters.database = pluginConfig.getString(OpenMldbConfig.DATABASE.key());
46+
openMldbParameters.sql = pluginConfig.getString(OpenMldbConfig.SQL.key());
47+
// set zkHost
48+
if (pluginConfig.hasPath(OpenMldbConfig.ZK_HOST.key())) {
49+
openMldbParameters.zkHost = pluginConfig.getString(OpenMldbConfig.ZK_HOST.key());
50+
}
51+
// set zkPath
52+
if (pluginConfig.hasPath(OpenMldbConfig.ZK_PATH.key())) {
53+
openMldbParameters.zkPath = pluginConfig.getString(OpenMldbConfig.ZK_PATH.key());
54+
}
55+
// set host
56+
if (pluginConfig.hasPath(OpenMldbConfig.HOST.key())) {
57+
openMldbParameters.host = pluginConfig.getString(OpenMldbConfig.HOST.key());
58+
}
59+
// set port
60+
if (pluginConfig.hasPath(OpenMldbConfig.PORT.key())) {
61+
openMldbParameters.port = pluginConfig.getInt(OpenMldbConfig.PORT.key());
62+
}
63+
// set session timeout
64+
if (pluginConfig.hasPath(OpenMldbConfig.SESSION_TIMEOUT.key())) {
65+
openMldbParameters.sessionTimeout = pluginConfig.getInt(OpenMldbConfig.SESSION_TIMEOUT.key());
66+
}
67+
// set request timeout
68+
if (pluginConfig.hasPath(OpenMldbConfig.REQUEST_TIMEOUT.key())) {
69+
openMldbParameters.requestTimeout = pluginConfig.getInt(OpenMldbConfig.REQUEST_TIMEOUT.key());
70+
}
71+
return openMldbParameters;
72+
}
73+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.openmldb.config;
19+
20+
import com._4paradigm.openmldb.sdk.SdkOption;
21+
import com._4paradigm.openmldb.sdk.SqlException;
22+
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
23+
24+
public class OpenMldbSqlExecutor {
25+
private static final SdkOption SDK_OPTION = new SdkOption();
26+
private static volatile SqlClusterExecutor SQL_EXECUTOR;
27+
28+
private OpenMldbSqlExecutor() {
29+
30+
}
31+
32+
public static void initSdkOption(OpenMldbParameters openMldbParameters) {
33+
if (openMldbParameters.getClusterMode()) {
34+
SDK_OPTION.setZkCluster(openMldbParameters.getZkHost());
35+
SDK_OPTION.setZkPath(openMldbParameters.getZkPath());
36+
} else {
37+
SDK_OPTION.setHost(openMldbParameters.getHost());
38+
SDK_OPTION.setPort(openMldbParameters.getPort());
39+
SDK_OPTION.setClusterMode(false);
40+
}
41+
SDK_OPTION.setSessionTimeout(openMldbParameters.getSessionTimeout());
42+
SDK_OPTION.setRequestTimeout(openMldbParameters.getRequestTimeout());
43+
}
44+
45+
public static SqlClusterExecutor getSqlExecutor() throws SqlException {
46+
if (SQL_EXECUTOR == null) {
47+
synchronized (OpenMldbSqlExecutor.class) {
48+
if (SQL_EXECUTOR == null) {
49+
SQL_EXECUTOR = new SqlClusterExecutor(SDK_OPTION);
50+
}
51+
}
52+
}
53+
return SQL_EXECUTOR;
54+
}
55+
56+
public static void close() {
57+
if (SQL_EXECUTOR != null) {
58+
synchronized (OpenMldbParameters.class) {
59+
if (SQL_EXECUTOR != null) {
60+
SQL_EXECUTOR.close();
61+
SQL_EXECUTOR = null;
62+
}
63+
}
64+
}
65+
}
66+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.openmldb.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class OpenMldbConnectorException extends SeaTunnelRuntimeException {
24+
public OpenMldbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
25+
super(seaTunnelErrorCode, errorMessage);
26+
}
27+
28+
public OpenMldbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
29+
super(seaTunnelErrorCode, errorMessage, cause);
30+
}
31+
32+
public OpenMldbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
33+
super(seaTunnelErrorCode, cause);
34+
}
35+
}

0 commit comments

Comments
 (0)