Skip to content

Commit 9681173

Browse files
531651225毕博
andauthored
[Feature][Connector-V2] StarRocks source connector (#3679)
* [Feature][Conenctor-V2] Add StarRocks thrift source connector Co-authored-by: 毕博 <bibo@mafengwo.com>
1 parent 49d9172 commit 9681173

File tree

26 files changed

+2166
-13
lines changed

26 files changed

+2166
-13
lines changed

docs/en/connector-v2/Error-Quick-Reference-Manual.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,11 @@ problems encountered by users.
215215
|--------------|-------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
216216
| STARROCKS-01 | Flush batch data to sink connector failed | When users encounter this error code, it means that flush batch data to sink connector failed, please check it |
217217
| STARROCKS-02 | Writing records to StarRocks failed | When users encounter this error code, it means that writing records to StarRocks failed, please check data from files whether is correct |
218+
| STARROCKS-03 | Close StarRocks BE reader failed. | it means that StarRocks has some problems, please check it whether is work |
219+
| STARROCKS-04 | Create StarRocks BE reader failed. | it means that StarRocks has some problems, please check it whether is work |
220+
| STARROCKS-05 | Scan data from StarRocks BE failed. | When users encounter this error code, it means that scan data from StarRocks failed, please check it |
221+
| STARROCKS-06 | Request query Plan failed. | When users encounter this error code, it means that scan data from StarRocks failed, please check it |
222+
| STARROCKS-07 | Read Arrow data failed. | When users encounter this error code, it means that that job has some problems, please check it whether is work well |
218223

219224
## DingTalk Connector Error Codes
220225

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# StarRocks
2+
3+
> StarRocks source connector
4+
5+
## Description
6+
7+
Read external data source data through StarRocks.
8+
The internal implementation of StarRocks source connector is obtains the query plan from the frontend (FE),
9+
delivers the query plan as a parameter to BE nodes, and then obtains data results from BE nodes.
10+
11+
## Key features
12+
13+
- [x] [batch](../../concept/connector-v2-features.md)
14+
- [ ] [stream](../../concept/connector-v2-features.md)
15+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
16+
- [x] [schema projection](../../concept/connector-v2-features.md)
17+
- [x] [parallelism](../../concept/connector-v2-features.md)
18+
- [x] [support user-defined split](../../concept/connector-v2-features.md)
19+
20+
## Options
21+
22+
| name | type | required | default value |
23+
|-------------------------|--------|----------|-------------------|
24+
| node_urls | list | yes | - |
25+
| username | string | yes | - |
26+
| password | string | yes | - |
27+
| database | string | yes | - |
28+
| table | string | yes | - |
29+
| scan_filter | string | no | - |
30+
| schema | config | yes | - |
31+
| request_tablet_size | int | no | Integer.MAX_VALUE |
32+
| scan_connect_timeout_ms | int | no | 30000 |
33+
| scan_query_timeout_sec | int | no | 3600 |
34+
| scan_keep_alive_min | int | no | 10 |
35+
| scan_batch_rows | int | no | 1024 |
36+
| scan_mem_limit | long | no | 2147483648 |
37+
| max_retries | int | no | 3 |
38+
39+
### node_urls [list]
40+
41+
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
42+
43+
### username [string]
44+
45+
`StarRocks` user username
46+
47+
### password [string]
48+
49+
`StarRocks` user password
50+
51+
### database [string]
52+
53+
The name of StarRocks database
54+
55+
### table [string]
56+
57+
The name of StarRocks table
58+
59+
### scan_filter [string]
60+
61+
Filter expression of the query, which is transparently transmitted to StarRocks. StarRocks uses this expression to complete source-side data filtering.
62+
63+
e.g.
64+
65+
```
66+
"tinyint_1 = 100"
67+
```
68+
69+
### schema [config]
70+
71+
#### fields [Config]
72+
73+
The schema of the starRocks that you want to generate
74+
75+
e.g.
76+
77+
```
78+
schema {
79+
fields {
80+
name = string
81+
age = int
82+
}
83+
}
84+
```
85+
86+
### request_tablet_size [int]
87+
88+
The number of StarRocks Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the engine side, but at the same time will cause greater pressure on StarRocks.
89+
90+
The following is an example to explain how to use request_tablet_size to controls the generation of partitions
91+
92+
```
93+
the tablet distribution of StarRocks table in cluster as follower
94+
95+
be_node_1 tablet[1, 2, 3, 4, 5]
96+
be_node_2 tablet[6, 7, 8, 9, 10]
97+
be_node_3 tablet[11, 12, 13, 14, 15]
98+
99+
1.If not set request_tablet_size, there will no limit on the number of tablets in a single partition. The partitions will be generated as follows
100+
101+
partition[0] read data of tablet[1, 2, 3, 4, 5] from be_node_1
102+
partition[1] read data of tablet[6, 7, 8, 9, 10] from be_node_2
103+
partition[2] read data of tablet[11, 12, 13, 14, 15] from be_node_3
104+
105+
2.if set request_tablet_size=3, the limit on the number of tablets in a single partition is 3. The partitions will be generated as follows
106+
107+
partition[0] read data of tablet[1, 2, 3] from be_node_1
108+
partition[1] read data of tablet[4, 5] from be_node_1
109+
partition[2] read data of tablet[6, 7, 8] from be_node_2
110+
partition[3] read data of tablet[9, 10] from be_node_2
111+
partition[4] read data of tablet[11, 12, 13] from be_node_3
112+
partition[5] read data of tablet[14, 15] from be_node_3
113+
```
114+
115+
### scan_connect_timeout_ms [int]
116+
117+
requests connection timeout sent to StarRocks
118+
119+
### scan_query_timeout_sec [int]
120+
121+
Query the timeout time of StarRocks, the default value is 1 hour, -1 means no timeout limit
122+
123+
### scan_keep_alive_min [int]
124+
125+
The keep-alive duration of the query task, in minutes. The default value is 10. we recommend that you set this parameter to a value greater than or equal to 5.
126+
127+
### scan_batch_rows [int]
128+
129+
The maximum number of data rows to read from BE at a time. Increasing this value reduces the number of connections established between engine and StarRocks and therefore mitigates overhead caused by network latency.
130+
131+
### scan_mem_limit [long]
132+
133+
The maximum memory space allowed for a single query in the BE node, in bytes. The default value is 2147483648 (2 GB).
134+
135+
### max_retries [int]
136+
137+
number of retry requests sent to StarRocks
138+
139+
## Example
140+
141+
```
142+
source {
143+
StarRocks {
144+
nodeUrls = ["starrocks_e2e:8030"]
145+
username = root
146+
password = ""
147+
database = "test"
148+
table = "e2e_table_source"
149+
scan_batch_rows = 10
150+
max_retries = 3
151+
fields {
152+
BIGINT_COL = BIGINT
153+
LARGEINT_COL = STRING
154+
SMALLINT_COL = SMALLINT
155+
TINYINT_COL = TINYINT
156+
BOOLEAN_COL = BOOLEAN
157+
DECIMAL_COL = "DECIMAL(20, 1)"
158+
DOUBLE_COL = DOUBLE
159+
FLOAT_COL = FLOAT
160+
INT_COL = INT
161+
CHAR_COL = STRING
162+
VARCHAR_11_COL = STRING
163+
STRING_COL = STRING
164+
DATETIME_COL = TIMESTAMP
165+
DATE_COL = DATE
166+
}
167+
}
168+
}
169+
```
170+
171+
## Changelog
172+
173+
### next version
174+
175+
- Add StarRocks Source Connector
176+

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,5 @@ seatunnel.sink.TDengine = connector-tdengine
172172
seatunnel.source.Persistiq = connector-http-persistiq
173173
seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
174174
seatunnel.sink.Hbase = connector-hbase
175+
seatunnel.source.StarRocks = connector-starrocks
175176

seatunnel-connectors-v2/connector-starrocks/pom.xml

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
<name>SeaTunnel : Connectors V2 : StarRocks</name>
3131

3232
<properties>
33+
<connector.name>connector.starrocks</connector.name>
3334
<httpclient.version>4.5.13</httpclient.version>
3435
<httpcore.version>4.4.4</httpcore.version>
36+
<starrocks.thrift.sdk.version>1.0.1</starrocks.thrift.sdk.version>
37+
<arrow.version>5.0.0</arrow.version>
3538
</properties>
3639

3740
<dependencies>
@@ -45,6 +48,11 @@
4548
<artifactId>connector-common</artifactId>
4649
<version>${project.version}</version>
4750
</dependency>
51+
<dependency>
52+
<groupId>org.apache.seatunnel</groupId>
53+
<artifactId>seatunnel-common</artifactId>
54+
<version>${project.version}</version>
55+
</dependency>
4856
<dependency>
4957
<groupId>org.apache.httpcomponents</groupId>
5058
<artifactId>httpclient</artifactId>
@@ -55,5 +63,57 @@
5563
<artifactId>httpcore</artifactId>
5664
<version>${httpcore.version}</version>
5765
</dependency>
66+
<dependency>
67+
<groupId>com.starrocks</groupId>
68+
<artifactId>starrocks-thrift-sdk</artifactId>
69+
<version>${starrocks.thrift.sdk.version}</version>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.apache.arrow</groupId>
73+
<artifactId>arrow-vector</artifactId>
74+
<version>${arrow.version}</version>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.apache.arrow</groupId>
78+
<artifactId>arrow-memory-netty</artifactId>
79+
<version>${arrow.version}</version>
80+
</dependency>
5881
</dependencies>
82+
83+
<build>
84+
<plugins>
85+
<plugin>
86+
<groupId>org.apache.maven.plugins</groupId>
87+
<artifactId>maven-shade-plugin</artifactId>
88+
<configuration>
89+
<relocations>
90+
<relocation>
91+
<pattern>org.apache.arrow</pattern>
92+
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.arrow</shadedPattern>
93+
</relocation>
94+
<relocation>
95+
<pattern>io.netty</pattern>
96+
<shadedPattern>${seatunnel.shade.package}.${connector.name}.io.netty</shadedPattern>
97+
</relocation>
98+
<relocation>
99+
<pattern>com.google.flatbuffers</pattern>
100+
<shadedPattern>${seatunnel.shade.package}.${connector.name}.com.google.flatbuffers</shadedPattern>
101+
</relocation>
102+
<relocation>
103+
<pattern>com.fasterxml.jackson</pattern>
104+
<shadedPattern>${seatunnel.shade.package}.${connector.name}.com.fasterxml.jackson</shadedPattern>
105+
</relocation>
106+
</relocations>
107+
</configuration>
108+
<executions>
109+
<execution>
110+
<goals>
111+
<goal>shade</goal>
112+
</goals>
113+
<phase>package</phase>
114+
</execution>
115+
</executions>
116+
</plugin>
117+
</plugins>
118+
</build>
59119
</project>

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.http.client.config.RequestConfig;
2525
import org.apache.http.client.methods.CloseableHttpResponse;
2626
import org.apache.http.client.methods.HttpGet;
27+
import org.apache.http.client.methods.HttpPost;
2728
import org.apache.http.client.methods.HttpPut;
2829
import org.apache.http.entity.ByteArrayEntity;
2930
import org.apache.http.impl.client.CloseableHttpClient;
@@ -58,6 +59,24 @@ public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
5859
return respEntity;
5960
}
6061

62+
public String doHttpPost(String postUrl, Map<String, String> header, String postBody)
63+
throws IOException {
64+
log.info("Executing POST from {}.", postUrl);
65+
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
66+
HttpPost httpPost = new HttpPost(postUrl);
67+
if (null != header) {
68+
for (Map.Entry<String, String> entry : header.entrySet()) {
69+
httpPost.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
70+
}
71+
}
72+
httpPost.setEntity(new ByteArrayEntity(postBody.getBytes()));
73+
try (CloseableHttpResponse resp = httpClient.execute(httpPost)) {
74+
HttpEntity respEntity = getHttpEntity(resp);
75+
return respEntity != null ? EntityUtils.toString(respEntity, "UTF-8") : null;
76+
}
77+
}
78+
}
79+
6180
public String doHttpGet(String getUrl) throws IOException {
6281
log.info("Executing GET from {}.", getUrl);
6382
try (CloseableHttpClient httpclient = buildHttpClient()) {

0 commit comments

Comments
 (0)