Skip to content

Commit 5b3957b

Browse files
authored
[Improve][Connector-V2] Improve http connector (#2833)
[format][json] Support read json ARRAY & MAP [hotfix][build] connector-http-base can't skip maven-shade-plugin [connector][http] Improve http connector * Support `SimpleTextSchema` * Support retry for request http server * Support request http interval in stream mode
1 parent 4aacbcd commit 5b3957b

File tree

17 files changed

+605
-77
lines changed

17 files changed

+605
-77
lines changed

docs/en/connector-v2/sink/Http.md

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,40 @@ Used to launch web hooks using data.
1717

1818
## Options
1919

20-
| name | type | required | default value |
21-
| --- |--------| --- | --- |
22-
| url | String | Yes | - |
23-
| headers | Map | No | - |
20+
| name | type | required | default value |
21+
|------------------------------------|--------|----------|---------------|
22+
| url | String | Yes | - |
23+
| headers | Map | No | - |
24+
| params | Map | No | - |
25+
| retry | int | No | - |
26+
| retry_backoff_multiplier_ms | int | No | 100 |
27+
| retry_backoff_max_ms | int | No | 10000 |
2428

25-
### url [string]
29+
30+
### url [String]
2631

2732
http request url
2833

2934
### headers [Map]
3035

3136
http headers
3237

38+
### params [Map]
39+
40+
http params
41+
42+
### retry [int]
43+
44+
The max retry times if request http return to `IOException`
45+
46+
### retry_backoff_multiplier_ms [int]
47+
48+
The retry-backoff times(millis) multiplier if request http failed
49+
50+
### retry_backoff_max_ms [int]
51+
52+
The maximum retry-backoff times(millis) if request http failed
53+
3354
## Example
3455

3556
simple:

docs/en/connector-v2/source/Http.md

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,26 @@ Used to read data from Http.
1717

1818
## Options
1919

20-
| name | type | required | default value |
21-
|---------------|--------|----------|---------------|
22-
| url | String | Yes | - |
23-
| schema | config | No | - |
24-
| schema.fields | config | No | - |
25-
| format | string | No | json |
26-
| method | String | No | get |
27-
| headers | Map | No | - |
28-
| params | Map | No | - |
29-
| body | String | No | - |
30-
31-
### url [string]
20+
| name | type | required | default value |
21+
|------------------------------------|--------|----------|---------------|
22+
| url | String | Yes | - |
23+
| schema | Config | No | - |
24+
| schema.fields | Config | No | - |
25+
| format | String | No | json |
26+
| method | String | No | get |
27+
| headers | Map | No | - |
28+
| params | Map | No | - |
29+
| body | String | No | - |
30+
| poll_interval_ms | int | No | - |
31+
| retry | int | No | - |
32+
| retry_backoff_multiplier_ms | int | No | 100 |
33+
| retry_backoff_max_ms | int | No | 10000 |
34+
35+
### url [String]
3236

3337
http request url
3438

35-
### method [string]
39+
### method [String]
3640

3741
http request method, only supports GET, POST method.
3842

@@ -48,6 +52,22 @@ http params
4852

4953
http body
5054

55+
### poll_interval_ms [int]
56+
57+
request http api interval(millis) in stream mode
58+
59+
### retry [int]
60+
61+
The max retry times if request http return to `IOException`
62+
63+
### retry_backoff_multiplier_ms [int]
64+
65+
The retry-backoff times(millis) multiplier if request http failed
66+
67+
### retry_backoff_max_ms [int]
68+
69+
The maximum retry-backoff times(millis) if request http failed
70+
5171
### format [String]
5272

5373
the format of upstream data, now only support `json` `text`, default `json`.

seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<properties>
3333
<httpclient.version>4.5.13</httpclient.version>
3434
<httpcore.version>4.4.4</httpcore.version>
35+
<guava-retrying.version>2.0.0</guava-retrying.version>
3536
</properties>
3637

3738
<dependencies>
@@ -58,31 +59,10 @@
5859
<artifactId>httpcore</artifactId>
5960
<version>${httpcore.version}</version>
6061
</dependency>
62+
<dependency>
63+
<groupId>com.github.rholder</groupId>
64+
<artifactId>guava-retrying</artifactId>
65+
<version>${guava-retrying.version}</version>
66+
</dependency>
6167
</dependencies>
62-
63-
<build>
64-
<plugins>
65-
<plugin>
66-
<groupId>org.apache.maven.plugins</groupId>
67-
<artifactId>maven-shade-plugin</artifactId>
68-
<executions>
69-
<execution>
70-
<phase>package</phase>
71-
<goals>
72-
<goal>shade</goal>
73-
</goals>
74-
<!-- base module need skip shading -->
75-
<configuration>
76-
<skip>true</skip>
77-
</configuration>
78-
</execution>
79-
</executions>
80-
</plugin>
81-
<!-- make sure that flatten runs after maven-shade-plugin -->
82-
<plugin>
83-
<groupId>org.codehaus.mojo</groupId>
84-
<artifactId>flatten-maven-plugin</artifactId>
85-
</plugin>
86-
</plugins>
87-
</build>
8868
</project>

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.http.client;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
21+
22+
import com.github.rholder.retry.Attempt;
23+
import com.github.rholder.retry.RetryListener;
24+
import com.github.rholder.retry.Retryer;
25+
import com.github.rholder.retry.RetryerBuilder;
26+
import com.github.rholder.retry.StopStrategies;
27+
import com.github.rholder.retry.WaitStrategies;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.commons.lang3.exception.ExceptionUtils;
2030
import org.apache.http.HttpStatus;
2131
import org.apache.http.NameValuePair;
2232
import org.apache.http.client.config.RequestConfig;
@@ -48,9 +58,10 @@
4858
import java.util.Map;
4959
import java.util.Objects;
5060
import java.util.Set;
61+
import java.util.concurrent.TimeUnit;
5162

63+
@Slf4j
5264
public class HttpClientProvider implements AutoCloseable {
53-
private final CloseableHttpClient httpClient;
5465
private static final String ENCODING = "UTF-8";
5566
private static final String APPLICATION_JSON = "application/json";
5667
private static final int CONNECT_TIMEOUT = 6000 * 2;
@@ -60,13 +71,33 @@ public class HttpClientProvider implements AutoCloseable {
6071
.setConnectTimeout(CONNECT_TIMEOUT)
6172
.setSocketTimeout(SOCKET_TIMEOUT)
6273
.build();
74+
private final CloseableHttpClient httpClient;
75+
private final Retryer<CloseableHttpResponse> retryer;
6376

64-
private HttpClientProvider() {
65-
httpClient = HttpClients.createDefault();
77+
public HttpClientProvider(HttpParameter httpParameter) {
78+
this.httpClient = HttpClients.createDefault();
79+
this.retryer = buildRetryer(httpParameter);
6680
}
6781

68-
public static HttpClientProvider getInstance() {
69-
return Singleton.INSTANCE;
82+
private Retryer<CloseableHttpResponse> buildRetryer(HttpParameter httpParameter) {
83+
if (httpParameter.getRetry() < 1) {
84+
return RetryerBuilder.<CloseableHttpResponse>newBuilder().build();
85+
}
86+
return RetryerBuilder.<CloseableHttpResponse>newBuilder()
87+
.retryIfException(ex -> ExceptionUtils.indexOfType(ex, IOException.class) != -1)
88+
.withStopStrategy(StopStrategies.stopAfterAttempt(httpParameter.getRetry()))
89+
.withWaitStrategy(WaitStrategies.fibonacciWait(httpParameter.getRetryBackoffMultiplierMillis(),
90+
httpParameter.getRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS))
91+
.withRetryListener(new RetryListener() {
92+
@Override
93+
public <V> void onRetry(Attempt<V> attempt) {
94+
if (attempt.hasException()) {
95+
log.warn(String.format("[%d] request http failed",
96+
attempt.getAttemptNumber()), attempt.getExceptionCause());
97+
}
98+
}
99+
})
100+
.build();
70101
}
71102

72103
public HttpResponse execute(String url, String method, Map<String, String> headers, Map<String, String> params) throws Exception {
@@ -275,9 +306,9 @@ public HttpResponse doDelete(String url, Map<String, String> params) throws Exce
275306
return doPost(url, params);
276307
}
277308

278-
private HttpResponse getResponse(HttpRequestBase request) throws IOException {
309+
private HttpResponse getResponse(HttpRequestBase request) throws Exception {
279310
// execute request
280-
try (CloseableHttpResponse httpResponse = httpClient.execute(request)) {
311+
try (CloseableHttpResponse httpResponse = retryWithException(request)) {
281312
// get return result
282313
if (httpResponse != null && httpResponse.getStatusLine() != null) {
283314
String content = "";
@@ -290,6 +321,10 @@ private HttpResponse getResponse(HttpRequestBase request) throws IOException {
290321
return new HttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
291322
}
292323

324+
private CloseableHttpResponse retryWithException(HttpRequestBase request) throws Exception {
325+
return retryer.call(() -> httpClient.execute(request));
326+
}
327+
293328
private void addParameters(URIBuilder builder, Map<String, String> params) {
294329
if (Objects.isNull(params) || params.isEmpty()) {
295330
return;
@@ -333,8 +368,4 @@ public void close() throws IOException {
333368
httpClient.close();
334369
}
335370
}
336-
337-
private static class Singleton {
338-
private static final HttpClientProvider INSTANCE = new HttpClientProvider();
339-
}
340371
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,8 @@ public class HttpConfig {
2727
public static final String SCHEMA = "schema";
2828
public static final String FORMAT = "format";
2929
public static final String DEFAULT_FORMAT = "json";
30+
public static final String POLL_INTERVAL_MILLS = "poll_interval_ms";
31+
public static final String RETRY = "retry";
32+
public static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms";
33+
public static final String RETRY_BACKOFF_MAX_MS = "retry_backoff_max_ms";
3034
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,18 @@
2626
import java.util.stream.Collectors;
2727

2828
@Data
29+
@SuppressWarnings("MagicNumber")
2930
public class HttpParameter implements Serializable {
3031
private String url;
3132
private String method;
3233
private Map<String, String> headers;
3334
private Map<String, String> params;
3435
private String body;
36+
private int pollIntervalMillis;
37+
private int retry;
38+
private int retryBackoffMultiplierMillis = 100;
39+
private int retryBackoffMaxMillis = 10000;
40+
3541
public void buildWithConfig(Config pluginConfig) {
3642
// set url
3743
this.setUrl(pluginConfig.getString(HttpConfig.URL));
@@ -53,5 +59,17 @@ public void buildWithConfig(Config pluginConfig) {
5359
if (pluginConfig.hasPath(HttpConfig.BODY)) {
5460
this.setBody(pluginConfig.getString(HttpConfig.BODY));
5561
}
62+
if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS)) {
63+
this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS));
64+
}
65+
if (pluginConfig.hasPath(HttpConfig.RETRY)) {
66+
this.setRetry(pluginConfig.getInt(HttpConfig.RETRY));
67+
if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) {
68+
this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS));
69+
}
70+
if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) {
71+
this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS));
72+
}
73+
}
5674
}
5775
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@
3434

3535
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
3636
private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkWriter.class);
37-
protected final HttpClientProvider httpClient = HttpClientProvider.getInstance();
37+
protected final HttpClientProvider httpClient;
3838
protected final SeaTunnelRowType seaTunnelRowType;
3939
protected final HttpParameter httpParameter;
4040
protected final SerializationSchema serializationSchema;
4141

4242
public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter httpParameter) {
4343
this.seaTunnelRowType = seaTunnelRowType;
4444
this.httpParameter = httpParameter;
45+
this.httpClient = new HttpClientProvider(httpParameter);
4546
this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
4647
}
4748

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.http.source;
19+
20+
import org.apache.seatunnel.api.serialization.DeserializationSchema;
21+
import org.apache.seatunnel.api.source.Collector;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
23+
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
24+
25+
import com.fasterxml.jackson.databind.JsonNode;
26+
import com.fasterxml.jackson.databind.node.ArrayNode;
27+
import lombok.AllArgsConstructor;
28+
29+
import java.io.IOException;
30+
31+
@AllArgsConstructor
32+
public class DeserializationCollector {
33+
34+
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
35+
36+
public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
37+
if (deserializationSchema instanceof JsonDeserializationSchema) {
38+
collectJson(message, (JsonDeserializationSchema) deserializationSchema, out);
39+
} else {
40+
SeaTunnelRow deserialize = deserializationSchema.deserialize(message);
41+
out.collect(deserialize);
42+
}
43+
}
44+
45+
private void collectJson(byte[] message,
46+
JsonDeserializationSchema jsonDeserializationSchema,
47+
Collector<SeaTunnelRow> out) throws IOException {
48+
JsonNode jsonNode = jsonDeserializationSchema.convertBytes(message);
49+
if (jsonNode.isArray()) {
50+
ArrayNode arrayNode = (ArrayNode) jsonNode;
51+
for (int i = 0; i < arrayNode.size(); i++) {
52+
SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(arrayNode.get(i));
53+
out.collect(deserialize);
54+
}
55+
} else {
56+
SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(jsonNode);
57+
out.collect(deserialize);
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)