Skip to content

Commit 3e6caf7

Browse files
authored
[Feature][Connector-V2] Starrocks sink connector (#3164)
* [Feature][Connector-V2] Add starrocks connector sink * [Feature][Connector-V2] StarRocks sink connector (StarRocks stream load API) * improve StarRocksFlushTuple * add Changelog * delete useless log4j file * change sr e2e port
1 parent d28d4d3 commit 3e6caf7

File tree

22 files changed

+1796
-0
lines changed

22 files changed

+1796
-0
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# StarRocks
2+
3+
> StarRocks sink connector
4+
5+
## Description
6+
Used to send data to StarRocks. Both support streaming and batch mode.
7+
The internal implementation of StarRocks sink connector is cached and imported by stream load in batches.
8+
## Key features
9+
10+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
11+
- [ ] [schema projection](../../concept/connector-v2-features.md)
12+
13+
## Options
14+
15+
| name | type | required | default value |
16+
|-----------------------------|------------------------------|----------|-----------------|
17+
| node_urls | list | yes | - |
18+
| username | string | yes | - |
19+
| password | string | yes | - |
20+
| database | string | yes | - |
21+
| table | string | no | - |
22+
| labelPrefix | string | no | - |
23+
| batch_max_rows | long | no | 1024 |
24+
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
25+
| batch_interval_ms | int | no | - |
26+
| max_retries | int | no | - |
27+
| retry_backoff_multiplier_ms | int | no | - |
28+
| max_retry_backoff_ms | int | no | - |
29+
| sink.properties.* | starrocks stream load config | no | - |
30+
31+
### node_urls [list]
32+
33+
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
34+
35+
### username [string]
36+
37+
`StarRocks` user username
38+
39+
### password [string]
40+
41+
`StarRocks` user password
42+
43+
### database [string]
44+
45+
The name of StarRocks database
46+
47+
### table [string]
48+
49+
The name of StarRocks table
50+
51+
### labelPrefix [string]
52+
53+
the prefix of StarRocks stream load label
54+
55+
### batch_max_rows [string]
56+
57+
For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks
58+
59+
### batch_max_bytes [string]
60+
61+
For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks
62+
63+
### batch_interval_ms [string]
64+
65+
For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks
66+
67+
### max_retries [string]
68+
69+
The number of retries to flush failed
70+
71+
### retry_backoff_multiplier_ms [string]
72+
73+
Using as a multiplier for generating the next delay for backoff
74+
75+
### max_retry_backoff_ms [string]
76+
77+
The amount of time to wait before attempting to retry a request to `StarRocks`
78+
79+
### sink.properties.* [starrocks stream load config]
80+
81+
the parameter of the stream load `data_desc`
82+
The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name.
83+
For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`.
84+
85+
#### Supported import data formats
86+
87+
The supported formats include CSV and JSON. Default value: CSV
88+
89+
## Example
90+
Use JSON format to import data
91+
```
92+
sink {
93+
StarRocks {
94+
nodeUrls = ["e2e_starRocksdb:8030"]
95+
username = root
96+
password = ""
97+
database = "test"
98+
table = "e2e_table_sink"
99+
batch_max_rows = 10
100+
sink.properties.format = "JSON"
101+
sink.properties.strip_outer_array = true
102+
}
103+
}
104+
105+
```
106+
107+
Use CSV format to import data
108+
```
109+
sink {
110+
StarRocks {
111+
nodeUrls = ["e2e_starRocksdb:8030"]
112+
username = root
113+
password = ""
114+
database = "test"
115+
table = "e2e_table_sink"
116+
batch_max_rows = 10
117+
sink.properties.format = "CSV"
118+
sink.properties.column_separator = "\\x01",
119+
sink.properties.row_delimiter = "\\x02"
120+
}
121+
}
122+
```
123+
124+
## Changelog
125+
126+
### next version
127+
128+
- Add StarRocks Sink Connector

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,4 @@ seatunnel.source.S3File = connector-file-s3
139139
seatunnel.sink.S3File = connector-file-s3
140140
seatunnel.source.Amazondynamodb = connector-amazondynamodb
141141
seatunnel.sink.Amazondynamodb = connector-amazondynamodb
142+
seatunnel.sink.StarRocks = connector-starrocks
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
22+
<parent>
23+
<artifactId>seatunnel-connectors-v2</artifactId>
24+
<groupId>org.apache.seatunnel</groupId>
25+
<version>${revision}</version>
26+
</parent>
27+
<modelVersion>4.0.0</modelVersion>
28+
29+
<artifactId>connector-starrocks</artifactId>
30+
31+
<properties>
32+
<httpclient.version>4.5.13</httpclient.version>
33+
<httpcore.version>4.4.4</httpcore.version>
34+
</properties>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.seatunnel</groupId>
39+
<artifactId>seatunnel-api</artifactId>
40+
<version>${project.version}</version>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.apache.seatunnel</groupId>
44+
<artifactId>connector-common</artifactId>
45+
<version>${project.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.apache.httpcomponents</groupId>
49+
<artifactId>httpclient</artifactId>
50+
<version>${httpclient.version}</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.apache.httpcomponents</groupId>
54+
<artifactId>httpcore</artifactId>
55+
<version>${httpcore.version}</version>
56+
</dependency>
57+
</dependencies>
58+
</project>
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
19+
20+
import org.apache.seatunnel.common.utils.JsonUtils;
21+
22+
import lombok.extern.slf4j.Slf4j;
23+
import org.apache.http.HttpEntity;
24+
import org.apache.http.HttpStatus;
25+
import org.apache.http.client.config.RequestConfig;
26+
import org.apache.http.client.methods.CloseableHttpResponse;
27+
import org.apache.http.client.methods.HttpGet;
28+
import org.apache.http.client.methods.HttpPut;
29+
import org.apache.http.entity.ByteArrayEntity;
30+
import org.apache.http.impl.client.CloseableHttpClient;
31+
import org.apache.http.impl.client.DefaultRedirectStrategy;
32+
import org.apache.http.impl.client.HttpClientBuilder;
33+
import org.apache.http.impl.client.HttpClients;
34+
import org.apache.http.util.EntityUtils;
35+
36+
import java.io.IOException;
37+
import java.net.HttpURLConnection;
38+
import java.net.URL;
39+
import java.util.HashMap;
40+
import java.util.Map;
41+
42+
@Slf4j
43+
public class HttpHelper {
44+
private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;
45+
46+
public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
47+
int code = resp.getStatusLine().getStatusCode();
48+
if (HttpStatus.SC_OK != code) {
49+
log.warn("Request failed with code:{}", code);
50+
return null;
51+
}
52+
HttpEntity respEntity = resp.getEntity();
53+
if (null == respEntity) {
54+
log.warn("Request failed with empty response.");
55+
return null;
56+
}
57+
return respEntity;
58+
}
59+
60+
public String doHttpGet(String getUrl) throws IOException {
61+
log.info("Executing GET from {}.", getUrl);
62+
try (CloseableHttpClient httpclient = buildHttpClient()) {
63+
HttpGet httpGet = new HttpGet(getUrl);
64+
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
65+
HttpEntity respEntity = resp.getEntity();
66+
if (null == respEntity) {
67+
log.warn("Request failed with empty response.");
68+
return null;
69+
}
70+
return EntityUtils.toString(respEntity);
71+
}
72+
}
73+
}
74+
75+
public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header) throws IOException {
76+
log.info("Executing GET from {}.", getUrl);
77+
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
78+
HttpGet httpGet = new HttpGet(getUrl);
79+
if (null != header) {
80+
for (Map.Entry<String, String> entry : header.entrySet()) {
81+
httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
82+
}
83+
}
84+
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
85+
HttpEntity respEntity = getHttpEntity(resp);
86+
if (null == respEntity) {
87+
log.warn("Request failed with empty response.");
88+
return null;
89+
}
90+
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
91+
}
92+
}
93+
}
94+
95+
@SuppressWarnings("unchecked")
96+
public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header) throws IOException {
97+
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
98+
.setRedirectStrategy(new DefaultRedirectStrategy() {
99+
@Override
100+
protected boolean isRedirectable(String method) {
101+
return true;
102+
}
103+
});
104+
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
105+
HttpPut httpPut = new HttpPut(url);
106+
if (null != header) {
107+
for (Map.Entry<String, String> entry : header.entrySet()) {
108+
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
109+
}
110+
}
111+
httpPut.setEntity(new ByteArrayEntity(data));
112+
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
113+
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
114+
int code = resp.getStatusLine().getStatusCode();
115+
if (HttpStatus.SC_OK != code) {
116+
String errorText;
117+
try {
118+
HttpEntity respEntity = resp.getEntity();
119+
errorText = EntityUtils.toString(respEntity);
120+
} catch (Exception err) {
121+
errorText = "find errorText failed: " + err.getMessage();
122+
}
123+
log.warn("Request failed with code:{}, err:{}", code, errorText);
124+
Map<String, Object> errorMap = new HashMap<>();
125+
errorMap.put("Status", "Fail");
126+
errorMap.put("Message", errorText);
127+
return errorMap;
128+
}
129+
HttpEntity respEntity = resp.getEntity();
130+
if (null == respEntity) {
131+
log.warn("Request failed with empty response.");
132+
return null;
133+
}
134+
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
135+
}
136+
}
137+
}
138+
139+
private CloseableHttpClient buildHttpClient() {
140+
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
141+
.setRedirectStrategy(new DefaultRedirectStrategy() {
142+
@Override
143+
protected boolean isRedirectable(String method) {
144+
return true;
145+
}
146+
});
147+
return httpClientBuilder.build();
148+
}
149+
150+
public boolean tryHttpConnection(String host) {
151+
try {
152+
URL url = new URL(host);
153+
HttpURLConnection co = (HttpURLConnection) url.openConnection();
154+
co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
155+
co.connect();
156+
co.disconnect();
157+
return true;
158+
} catch (Exception e1) {
159+
log.warn("Failed to connect to address:{}", host, e1);
160+
return false;
161+
}
162+
}
163+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Getter;
22+
import lombok.Setter;
23+
24+
import java.util.List;
25+
26+
@AllArgsConstructor
27+
@Getter
28+
@Setter
29+
public class StarRocksFlushTuple {
30+
private String label;
31+
private Long bytes;
32+
private List<byte[]> rows;
33+
}

0 commit comments

Comments
 (0)