Skip to content

Commit 04ee8ac

Browse files
authored
[Feature][http-Sink] Implementing http batch writes (#9292)
1 parent 68b0504 commit 04ee8ac

File tree

8 files changed

+376
-3
lines changed

8 files changed

+376
-3
lines changed

docs/en/connector-v2/sink/Http.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
4444
| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed |
4545
| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. |
4646
| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. |
47+
| array_mode | Boolean| No | false | Send data as a JSON array when true, or as a single JSON object when false (default) |
48+
| batch_size | Int | No | 1 | The batch size of records to send in one HTTP request. Only works when array_mode is true. |
49+
| request_interval_ms | Int | No | 0 | The interval milliseconds between two HTTP requests, to avoid sending requests too frequently. |
4750
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |
4851

4952
## Example
@@ -59,6 +62,21 @@ Http {
5962
}
6063
```
6164

65+
### With Batch Processing
66+
67+
```hocon
68+
Http {
69+
url = "http://localhost/test/webhook"
70+
headers {
71+
token = "9e32e859ef044462a257e1fc76730066"
72+
Content-Type = "application/json"
73+
}
74+
array_mode = true
75+
batch_size = 50
76+
request_interval_ms = 500
77+
}
78+
```
79+
6280
### Multiple table
6381

6482
#### example1

docs/zh/connector-v2/sink/Http.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ import ChangeLog from '../changelog/connector-http.md';
4242
| retry_backoff_max_ms | Int || 10000 | http请求失败,最大重试回退时间(毫秒) |
4343
| connect_timeout_ms | Int || 12000 | 连接超时设置,默认12s |
4444
| socket_timeout_ms | Int || 60000 | 套接字超时设置,默认为60s |
45+
| array_mode | Boolean|| false | 为true时将数据作为JSON数组发送,为false时作为单个JSON对象发送(默认) |
46+
| batch_size | Int || 1 | 在一个HTTP请求中发送的记录批量大小。仅在array_mode为true时有效 |
47+
| request_interval_ms | Int || 0 | 两次HTTP请求之间的间隔毫秒数,以避免请求过于频繁 |
4548
| common-options | || - | Sink插件常用参数,请参考 [Sink常用选项 ](../sink-common-options.md) 了解详情 |
4649

4750
## 示例
@@ -57,6 +60,21 @@ Http {
5760
}
5861
```
5962

63+
### 带批处理的示例
64+
65+
```hocon
66+
Http {
67+
url = "http://localhost/test/webhook"
68+
headers {
69+
token = "9e32e859ef044462a257e1fc76730066"
70+
Content-Type = "application/json"
71+
}
72+
array_mode = true
73+
batch_size = 50
74+
request_interval_ms = 500
75+
}
76+
```
77+
6078
## 变更日志
6179

6280
<ChangeLog />

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class HttpParameter implements Serializable {
4242
protected boolean enableMultilines;
4343
protected int connectTimeoutMs;
4444
protected int socketTimeoutMs;
45+
protected boolean arrayMode = false;
46+
protected int batchSize = 1;
47+
protected int requestIntervalMs = 0;
4548

4649
public void buildWithConfig(ReadonlyConfig pluginConfig) {
4750
// set url

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,27 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.http.config;
1919

20-
public class HttpSinkOptions extends HttpCommonOptions {}
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
public class HttpSinkOptions extends HttpCommonOptions {
24+
public static final Option<Boolean> ARRAY_MODE =
25+
Options.key("array_mode")
26+
.booleanType()
27+
.defaultValue(false)
28+
.withDescription(
29+
"Send data as a JSON array when true, or as a single JSON object when false (default)");
30+
31+
public static final Option<Integer> BATCH_SIZE =
32+
Options.key("batch_size")
33+
.intType()
34+
.defaultValue(1)
35+
.withDescription(
36+
"The batch size of records to send in one HTTP request. Only works when array_mode is true");
37+
38+
public static final Option<Integer> REQUEST_INTERVAL_MS =
39+
Options.key("request_interval_ms")
40+
.intType()
41+
.defaultValue(0)
42+
.withDescription("The interval milliseconds between two HTTP requests");
43+
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ public HttpSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
4747
if (pluginConfig.getOptional(HttpSinkOptions.PARAMS).isPresent()) {
4848
httpParameter.setHeaders(pluginConfig.get(HttpSinkOptions.PARAMS));
4949
}
50+
if (pluginConfig.getOptional(HttpSinkOptions.ARRAY_MODE).isPresent()) {
51+
httpParameter.setArrayMode(pluginConfig.get(HttpSinkOptions.ARRAY_MODE));
52+
}
53+
if (pluginConfig.getOptional(HttpSinkOptions.BATCH_SIZE).isPresent()) {
54+
httpParameter.setBatchSize(pluginConfig.get(HttpSinkOptions.BATCH_SIZE));
55+
}
56+
if (pluginConfig.getOptional(HttpSinkOptions.REQUEST_INTERVAL_MS).isPresent()) {
57+
httpParameter.setRequestIntervalMs(
58+
pluginConfig.get(HttpSinkOptions.REQUEST_INTERVAL_MS));
59+
}
5060
this.catalogTable = catalogTable;
5161
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
5262
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public OptionRule optionRule() {
4848
.optional(HttpSinkOptions.RETRY)
4949
.optional(HttpSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS)
5050
.optional(HttpSinkOptions.RETRY_BACKOFF_MAX_MS)
51+
.optional(HttpSinkOptions.ARRAY_MODE)
52+
.optional(HttpSinkOptions.BATCH_SIZE)
53+
.optional(HttpSinkOptions.REQUEST_INTERVAL_MS)
5154
.optional(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
5255
.build();
5356
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.http.sink;
1919

20+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
21+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
22+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
23+
2024
import org.apache.seatunnel.api.serialization.SerializationSchema;
2125
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
2226
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -30,7 +34,10 @@
3034
import lombok.extern.slf4j.Slf4j;
3135

3236
import java.io.IOException;
37+
import java.util.ArrayList;
38+
import java.util.List;
3339
import java.util.Objects;
40+
import java.util.Optional;
3441

3542
@Slf4j
3643
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
@@ -40,6 +47,13 @@ public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
4047
protected final HttpParameter httpParameter;
4148
protected final SerializationSchema serializationSchema;
4249

50+
// Batch related fields
51+
private final boolean arrayMode;
52+
private final int batchSize;
53+
private final int requestIntervalMs;
54+
private final List<SeaTunnelRow> batchBuffer;
55+
private long lastRequestTime;
56+
4357
public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter httpParameter) {
4458
this(seaTunnelRowType, httpParameter, new JsonSerializationSchema(seaTunnelRowType));
4559
}
@@ -48,18 +62,81 @@ public HttpSinkWriter(
4862
SeaTunnelRowType seaTunnelRowType,
4963
HttpParameter httpParameter,
5064
SerializationSchema serializationSchema) {
65+
this(
66+
seaTunnelRowType,
67+
httpParameter,
68+
serializationSchema,
69+
httpParameter.isArrayMode(),
70+
httpParameter.getBatchSize(),
71+
httpParameter.getRequestIntervalMs());
72+
}
73+
74+
public HttpSinkWriter(
75+
SeaTunnelRowType seaTunnelRowType,
76+
HttpParameter httpParameter,
77+
SerializationSchema serializationSchema,
78+
boolean arrayMode,
79+
int batchSize,
80+
int requestIntervalMs) {
5181
this.seaTunnelRowType = seaTunnelRowType;
5282
this.httpParameter = httpParameter;
53-
this.httpClient = new HttpClientProvider(httpParameter);
83+
this.httpClient = createHttpClient(httpParameter);
5484
this.serializationSchema = serializationSchema;
85+
this.arrayMode = arrayMode;
86+
this.batchSize = batchSize;
87+
this.requestIntervalMs = requestIntervalMs;
88+
this.batchBuffer = new ArrayList<>(batchSize);
89+
this.lastRequestTime = System.currentTimeMillis();
5590
}
5691

5792
@Override
5893
public void write(SeaTunnelRow element) throws IOException {
94+
if (!arrayMode) {
95+
writeSingleRecord(element);
96+
} else {
97+
batchBuffer.add(element);
98+
if (batchBuffer.size() >= batchSize) {
99+
flush();
100+
}
101+
}
102+
}
103+
104+
private void writeSingleRecord(SeaTunnelRow element) throws IOException {
59105
byte[] serialize = serializationSchema.serialize(element);
60106
String body = new String(serialize);
107+
doHttpRequest(body);
108+
}
109+
110+
private void flush() throws IOException {
111+
if (batchBuffer.isEmpty()) {
112+
return;
113+
}
114+
long currentTime = System.currentTimeMillis();
115+
long timeSinceLastRequest = currentTime - lastRequestTime;
116+
if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs) {
117+
try {
118+
Thread.sleep(requestIntervalMs - timeSinceLastRequest);
119+
} catch (InterruptedException e) {
120+
throw new RuntimeException(e);
121+
}
122+
}
123+
124+
// Array mode: serialize batch data as JSON
125+
ObjectMapper mapper = new ObjectMapper();
126+
ArrayNode arrayNode = mapper.createArrayNode();
127+
for (SeaTunnelRow row : batchBuffer) {
128+
byte[] serialize = serializationSchema.serialize(row);
129+
arrayNode.add(new String(serialize));
130+
}
131+
String body = mapper.writeValueAsString(arrayNode);
132+
doHttpRequest(body);
133+
134+
batchBuffer.clear();
135+
lastRequestTime = System.currentTimeMillis();
136+
}
137+
138+
private void doHttpRequest(String body) {
61139
try {
62-
// only support post web hook
63140
HttpResponse response =
64141
httpClient.doPost(httpParameter.getUrl(), httpParameter.getHeaders(), body);
65142
if (HttpResponse.STATUS_OK == response.getCode()) {
@@ -76,8 +153,28 @@ public void write(SeaTunnelRow element) throws IOException {
76153

77154
@Override
78155
public void close() throws IOException {
156+
if (arrayMode) {
157+
flush();
158+
}
79159
if (Objects.nonNull(httpClient)) {
80160
httpClient.close();
81161
}
82162
}
163+
164+
@Override
165+
public Optional<Void> prepareCommit() {
166+
if (arrayMode) {
167+
try {
168+
flush();
169+
} catch (IOException e) {
170+
throw new RuntimeException("Failed to flush data in prepareCommit", e);
171+
}
172+
}
173+
return Optional.empty();
174+
}
175+
176+
@VisibleForTesting
177+
protected HttpClientProvider createHttpClient(HttpParameter httpParameter) {
178+
return new HttpClientProvider(httpParameter);
179+
}
83180
}

0 commit comments

Comments
 (0)