Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Starrocks sink connector #3164

Merged
merged 15 commits into from
Nov 7, 2022
128 changes: 128 additions & 0 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# StarRocks

> StarRocks sink connector
## Description
Used to send data to StarRocks. Both support streaming and batch mode.
The internal implementation of StarRocks sink connector is cached and imported by stream load in batches.
## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|------------------------------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | no | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| sink.properties.* | starrocks stream load config | no | - |

### node_urls [list]

`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`

### username [string]

`StarRocks` user username

### password [string]

`StarRocks` user password

### database [string]

The name of StarRocks database

### table [string]

The name of StarRocks table

### labelPrefix [string]

the prefix of StarRocks stream load label
531651225 marked this conversation as resolved.
Show resolved Hide resolved

### batch_max_rows [string]

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks

### batch_max_bytes [string]

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks

### batch_interval_ms [string]

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks

### max_retries [string]

The number of retries to flush failed

### retry_backoff_multiplier_ms [string]

Using as a multiplier for generating the next delay for backoff

### max_retry_backoff_ms [string]

The amount of time to wait before attempting to retry a request to `StarRocks`

### sink.properties.* [starrocks stream load config]
531651225 marked this conversation as resolved.
Show resolved Hide resolved

the parameter of the stream load `data_desc`
The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name.
For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`.

#### Supported import data formats
531651225 marked this conversation as resolved.
Show resolved Hide resolved

The supported formats include CSV and JSON. Default value: CSV

## Example
Use JSON format to import data
```
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
sink.properties.format = "JSON"
sink.properties.strip_outer_array = true
}
}
```

Use CSV format to import data
```
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
sink.properties.format = "CSV"
sink.properties.column_separator = "\\x01",
sink.properties.row_delimiter = "\\x02"
}
}
```

## Changelog

### next version

- Add StarRocks Sink Connector
3 changes: 2 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,5 @@ seatunnel.sink.MongoDB = connector-mongodb
seatunnel.source.Iceberg = connector-iceberg
seatunnel.source.InfluxDB = connector-influxdb
seatunnel.source.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
seatunnel.sink.StarRocks = connector-starrocks
58 changes: 58 additions & 0 deletions seatunnel-connectors-v2/connector-starrocks/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>seatunnel-connectors-v2</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-starrocks</artifactId>

<properties>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import org.apache.seatunnel.common.utils.JsonUtils;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class HttpHelper {
private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;

public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {
log.warn("Request failed with code:{}", code);
return null;
}
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
log.warn("Request failed with empty response.");
return null;
}
return respEntity;
}

public String doHttpGet(String getUrl) throws IOException {
log.info("Executing GET from {}.", getUrl);
try (CloseableHttpClient httpclient = buildHttpClient()) {
HttpGet httpGet = new HttpGet(getUrl);
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
log.warn("Request failed with empty response.");
return null;
}
return EntityUtils.toString(respEntity);
}
}
}

public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header) throws IOException {
log.info("Executing GET from {}.", getUrl);
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(getUrl);
if (null != header) {
for (Map.Entry<String, String> entry : header.entrySet()) {
httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
HttpEntity respEntity = getHttpEntity(resp);
if (null == respEntity) {
log.warn("Request failed with empty response.");
return null;
}
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
}
}
}

@SuppressWarnings("unchecked")
public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header) throws IOException {
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
HttpPut httpPut = new HttpPut(url);
if (null != header) {
for (Map.Entry<String, String> entry : header.entrySet()) {
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
httpPut.setEntity(new ByteArrayEntity(data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {
String errorText;
try {
HttpEntity respEntity = resp.getEntity();
errorText = EntityUtils.toString(respEntity);
} catch (Exception err) {
errorText = "find errorText failed: " + err.getMessage();
}
log.warn("Request failed with code:{}, err:{}", code, errorText);
Map<String, Object> errorMap = new HashMap<>();
errorMap.put("Status", "Fail");
errorMap.put("Message", errorText);
return errorMap;
}
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
log.warn("Request failed with empty response.");
return null;
}
return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
}
}
}

private CloseableHttpClient buildHttpClient() {
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
return httpClientBuilder.build();
}

public boolean tryHttpConnection(String host) {
try {
URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
co.connect();
co.disconnect();
return true;
} catch (Exception e1) {
log.warn("Failed to connect to address:{}", host, e1);
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.util.List;

@AllArgsConstructor
@Getter
@Setter
public class StarRocksFlushTuple {
531651225 marked this conversation as resolved.
Show resolved Hide resolved
private String label;
private Long bytes;
private List<byte[]> rows;
}
Loading