-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Feature][Connector-V2] Starrocks sink connector (#3164)
* [Feature][Connector-V2] Add starrocks connector sink * [Feature][Connector-V2] StarRocks sink connector (StarRocks stream load API) * improve StarRocksFlushTuple * add Changelog * delete useless log4j file * change sr e2e port
- Loading branch information
Showing
22 changed files
with
1,796 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
# StarRocks | ||
|
||
> StarRocks sink connector | ||
## Description | ||
Used to send data to StarRocks. Both support streaming and batch mode. | ||
The internal implementation of StarRocks sink connector is cached and imported by stream load in batches. | ||
## Key features | ||
|
||
- [ ] [exactly-once](../../concept/connector-v2-features.md) | ||
- [ ] [schema projection](../../concept/connector-v2-features.md) | ||
|
||
## Options | ||
|
||
| name | type | required | default value | | ||
|-----------------------------|------------------------------|----------|-----------------| | ||
| node_urls | list | yes | - | | ||
| username | string | yes | - | | ||
| password | string | yes | - | | ||
| database | string | yes | - | | ||
| table | string | no | - | | ||
| labelPrefix | string | no | - | | ||
| batch_max_rows | long | no | 1024 | | ||
| batch_max_bytes | int | no | 5 * 1024 * 1024 | | ||
| batch_interval_ms | int | no | - | | ||
| max_retries | int | no | - | | ||
| retry_backoff_multiplier_ms | int | no | - | | ||
| max_retry_backoff_ms | int | no | - | | ||
| sink.properties.* | starrocks stream load config | no | - | | ||
|
||
### node_urls [list] | ||
|
||
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` | ||
|
||
### username [string] | ||
|
||
`StarRocks` user username | ||
|
||
### password [string] | ||
|
||
`StarRocks` user password | ||
|
||
### database [string] | ||
|
||
The name of StarRocks database | ||
|
||
### table [string] | ||
|
||
The name of StarRocks table | ||
|
||
### labelPrefix [string] | ||
|
||
the prefix of StarRocks stream load label | ||
|
||
### batch_max_rows [string] | ||
|
||
For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | ||
|
||
### batch_max_bytes [string] | ||
|
||
For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | ||
|
||
### batch_interval_ms [string] | ||
|
||
For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | ||
|
||
### max_retries [string] | ||
|
||
The number of retries to flush failed | ||
|
||
### retry_backoff_multiplier_ms [string] | ||
|
||
Using as a multiplier for generating the next delay for backoff | ||
|
||
### max_retry_backoff_ms [string] | ||
|
||
The amount of time to wait before attempting to retry a request to `StarRocks` | ||
|
||
### sink.properties.* [starrocks stream load config] | ||
|
||
the parameter of the stream load `data_desc` | ||
The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name. | ||
For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`. | ||
|
||
#### Supported import data formats | ||
|
||
The supported formats include CSV and JSON. Default value: CSV | ||
|
||
## Example | ||
Use JSON format to import data | ||
``` | ||
sink { | ||
StarRocks { | ||
nodeUrls = ["e2e_starRocksdb:8030"] | ||
username = root | ||
password = "" | ||
database = "test" | ||
table = "e2e_table_sink" | ||
batch_max_rows = 10 | ||
sink.properties.format = "JSON" | ||
sink.properties.strip_outer_array = true | ||
} | ||
} | ||
``` | ||
|
||
Use CSV format to import data | ||
``` | ||
sink { | ||
StarRocks { | ||
nodeUrls = ["e2e_starRocksdb:8030"] | ||
username = root | ||
password = "" | ||
database = "test" | ||
table = "e2e_table_sink" | ||
batch_max_rows = 10 | ||
sink.properties.format = "CSV" | ||
sink.properties.column_separator = "\\x01", | ||
sink.properties.row_delimiter = "\\x02" | ||
} | ||
} | ||
``` | ||
|
||
## Changelog | ||
|
||
### next version | ||
|
||
- Add StarRocks Sink Connector |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
<parent> | ||
<artifactId>seatunnel-connectors-v2</artifactId> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<version>${revision}</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>connector-starrocks</artifactId> | ||
|
||
<properties> | ||
<httpclient.version>4.5.13</httpclient.version> | ||
<httpcore.version>4.4.4</httpcore.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>seatunnel-api</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.seatunnel</groupId> | ||
<artifactId>connector-common</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.httpcomponents</groupId> | ||
<artifactId>httpclient</artifactId> | ||
<version>${httpclient.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.httpcomponents</groupId> | ||
<artifactId>httpcore</artifactId> | ||
<version>${httpcore.version}</version> | ||
</dependency> | ||
</dependencies> | ||
</project> |
163 changes: 163 additions & 0 deletions
163
.../src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.starrocks.client; | ||
|
||
import org.apache.seatunnel.common.utils.JsonUtils; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.http.HttpEntity; | ||
import org.apache.http.HttpStatus; | ||
import org.apache.http.client.config.RequestConfig; | ||
import org.apache.http.client.methods.CloseableHttpResponse; | ||
import org.apache.http.client.methods.HttpGet; | ||
import org.apache.http.client.methods.HttpPut; | ||
import org.apache.http.entity.ByteArrayEntity; | ||
import org.apache.http.impl.client.CloseableHttpClient; | ||
import org.apache.http.impl.client.DefaultRedirectStrategy; | ||
import org.apache.http.impl.client.HttpClientBuilder; | ||
import org.apache.http.impl.client.HttpClients; | ||
import org.apache.http.util.EntityUtils; | ||
|
||
import java.io.IOException; | ||
import java.net.HttpURLConnection; | ||
import java.net.URL; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
@Slf4j | ||
public class HttpHelper { | ||
private static final int DEFAULT_CONNECT_TIMEOUT = 1000000; | ||
|
||
public HttpEntity getHttpEntity(CloseableHttpResponse resp) { | ||
int code = resp.getStatusLine().getStatusCode(); | ||
if (HttpStatus.SC_OK != code) { | ||
log.warn("Request failed with code:{}", code); | ||
return null; | ||
} | ||
HttpEntity respEntity = resp.getEntity(); | ||
if (null == respEntity) { | ||
log.warn("Request failed with empty response."); | ||
return null; | ||
} | ||
return respEntity; | ||
} | ||
|
||
public String doHttpGet(String getUrl) throws IOException { | ||
log.info("Executing GET from {}.", getUrl); | ||
try (CloseableHttpClient httpclient = buildHttpClient()) { | ||
HttpGet httpGet = new HttpGet(getUrl); | ||
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { | ||
HttpEntity respEntity = resp.getEntity(); | ||
if (null == respEntity) { | ||
log.warn("Request failed with empty response."); | ||
return null; | ||
} | ||
return EntityUtils.toString(respEntity); | ||
} | ||
} | ||
} | ||
|
||
public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header) throws IOException { | ||
log.info("Executing GET from {}.", getUrl); | ||
try (CloseableHttpClient httpclient = HttpClients.createDefault()) { | ||
HttpGet httpGet = new HttpGet(getUrl); | ||
if (null != header) { | ||
for (Map.Entry<String, String> entry : header.entrySet()) { | ||
httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue())); | ||
} | ||
} | ||
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { | ||
HttpEntity respEntity = getHttpEntity(resp); | ||
if (null == respEntity) { | ||
log.warn("Request failed with empty response."); | ||
return null; | ||
} | ||
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); | ||
} | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header) throws IOException { | ||
final HttpClientBuilder httpClientBuilder = HttpClients.custom() | ||
.setRedirectStrategy(new DefaultRedirectStrategy() { | ||
@Override | ||
protected boolean isRedirectable(String method) { | ||
return true; | ||
} | ||
}); | ||
try (CloseableHttpClient httpclient = httpClientBuilder.build()) { | ||
HttpPut httpPut = new HttpPut(url); | ||
if (null != header) { | ||
for (Map.Entry<String, String> entry : header.entrySet()) { | ||
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); | ||
} | ||
} | ||
httpPut.setEntity(new ByteArrayEntity(data)); | ||
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); | ||
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { | ||
int code = resp.getStatusLine().getStatusCode(); | ||
if (HttpStatus.SC_OK != code) { | ||
String errorText; | ||
try { | ||
HttpEntity respEntity = resp.getEntity(); | ||
errorText = EntityUtils.toString(respEntity); | ||
} catch (Exception err) { | ||
errorText = "find errorText failed: " + err.getMessage(); | ||
} | ||
log.warn("Request failed with code:{}, err:{}", code, errorText); | ||
Map<String, Object> errorMap = new HashMap<>(); | ||
errorMap.put("Status", "Fail"); | ||
errorMap.put("Message", errorText); | ||
return errorMap; | ||
} | ||
HttpEntity respEntity = resp.getEntity(); | ||
if (null == respEntity) { | ||
log.warn("Request failed with empty response."); | ||
return null; | ||
} | ||
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class); | ||
} | ||
} | ||
} | ||
|
||
private CloseableHttpClient buildHttpClient() { | ||
final HttpClientBuilder httpClientBuilder = HttpClients.custom() | ||
.setRedirectStrategy(new DefaultRedirectStrategy() { | ||
@Override | ||
protected boolean isRedirectable(String method) { | ||
return true; | ||
} | ||
}); | ||
return httpClientBuilder.build(); | ||
} | ||
|
||
public boolean tryHttpConnection(String host) { | ||
try { | ||
URL url = new URL(host); | ||
HttpURLConnection co = (HttpURLConnection) url.openConnection(); | ||
co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); | ||
co.connect(); | ||
co.disconnect(); | ||
return true; | ||
} catch (Exception e1) { | ||
log.warn("Failed to connect to address:{}", host, e1); | ||
return false; | ||
} | ||
} | ||
} |
33 changes: 33 additions & 0 deletions
33
.../java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksFlushTuple.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.starrocks.client; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
|
||
import java.util.List; | ||
|
||
@AllArgsConstructor | ||
@Getter | ||
@Setter | ||
public class StarRocksFlushTuple { | ||
private String label; | ||
private Long bytes; | ||
private List<byte[]> rows; | ||
} |
Oops, something went wrong.