Skip to content

Commit cc95000

Browse files
[Improve][Connector-V2] Http client provider improve (#2312)
* [Improve][Connector-V2] Fix 'Singleton' word error * [Improve][Connector-V2]: Improve http client provider code structure 1. Remove hard code 2. Http post method support request with json body
1 parent e99e6ee commit cc95000

File tree

2 files changed

+93
-34
lines changed

2 files changed

+93
-34
lines changed

seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java

Lines changed: 90 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,37 @@
2929
import org.apache.http.client.methods.HttpPut;
3030
import org.apache.http.client.methods.HttpRequestBase;
3131
import org.apache.http.client.utils.URIBuilder;
32+
import org.apache.http.entity.ContentType;
33+
import org.apache.http.entity.StringEntity;
3234
import org.apache.http.impl.client.CloseableHttpClient;
3335
import org.apache.http.impl.client.HttpClients;
36+
import org.apache.http.message.BasicHeader;
3437
import org.apache.http.message.BasicNameValuePair;
38+
import org.apache.http.protocol.HTTP;
3539
import org.apache.http.util.EntityUtils;
3640

3741
import java.io.IOException;
3842
import java.io.UnsupportedEncodingException;
3943
import java.util.ArrayList;
44+
import java.util.Collections;
4045
import java.util.HashMap;
4146
import java.util.List;
47+
import java.util.Locale;
4248
import java.util.Map;
4349
import java.util.Objects;
4450
import java.util.Set;
4551

4652
public class HttpClientProvider implements AutoCloseable {
4753
private final CloseableHttpClient httpClient;
4854
private static final String ENCODING = "UTF-8";
55+
private static final String APPLICATION_JSON = "application/json";
4956
private static final int CONNECT_TIMEOUT = 6000 * 2;
5057
private static final int SOCKET_TIMEOUT = 6000 * 10;
5158
private static final int INITIAL_CAPACITY = 16;
59+
private static final RequestConfig REQUEST_CONFIG = RequestConfig.custom()
60+
.setConnectTimeout(CONNECT_TIMEOUT)
61+
.setSocketTimeout(SOCKET_TIMEOUT)
62+
.build();
5263

5364
private HttpClientProvider() {
5465
httpClient = HttpClients.createDefault();
@@ -59,9 +70,21 @@ public static HttpClientProvider getInstance() {
5970
}
6071

6172
public HttpResponse execute(String url, String method, Map<String, String> headers, Map<String, String> params) throws Exception {
62-
if ("POST".equals(method)) {
73+
// convert method option to uppercase
74+
method = method.toUpperCase(Locale.ROOT);
75+
if (HttpPost.METHOD_NAME.equals(method)) {
6376
return doPost(url, headers, params);
6477
}
78+
if (HttpGet.METHOD_NAME.equals(method)) {
79+
return doGet(url, headers, params);
80+
}
81+
if (HttpPut.METHOD_NAME.equals(method)) {
82+
return doPut(url, params);
83+
}
84+
if (HttpDelete.METHOD_NAME.equals(method)) {
85+
return doDelete(url, params);
86+
}
87+
// if http method that user assigned is not support by http provider, default do get
6588
return doGet(url, headers, params);
6689
}
6790

@@ -73,7 +96,7 @@ public HttpResponse execute(String url, String method, Map<String, String> heade
7396
* @throws Exception information
7497
*/
7598
public HttpResponse doGet(String url) throws Exception {
76-
return doGet(url, null, null);
99+
return doGet(url, Collections.emptyMap(), Collections.emptyMap());
77100
}
78101

79102
/**
@@ -85,7 +108,7 @@ public HttpResponse doGet(String url) throws Exception {
85108
* @throws Exception information
86109
*/
87110
public HttpResponse doGet(String url, Map<String, String> params) throws Exception {
88-
return doGet(url, null, params);
111+
return doGet(url, Collections.emptyMap(), params);
89112
}
90113

91114
/**
@@ -100,18 +123,15 @@ public HttpResponse doGet(String url, Map<String, String> params) throws Excepti
100123
public HttpResponse doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
101124
// Create access address
102125
URIBuilder uriBuilder = new URIBuilder(url);
126+
// add parameter to uri
103127
addParameters(uriBuilder, params);
104-
105-
/**
106-
* setConnectTimeout:Set the connection timeout, in milliseconds.
107-
* setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
108-
* If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
109-
*/
110-
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
128+
// create a new http get
111129
HttpGet httpGet = new HttpGet(uriBuilder.build());
112-
httpGet.setConfig(requestConfig);
113-
130+
// set default request config
131+
httpGet.setConfig(REQUEST_CONFIG);
132+
// set request header
114133
addHeaders(httpGet, headers);
134+
// return http response
115135
return getResponse(httpGet);
116136
}
117137

@@ -123,7 +143,7 @@ public HttpResponse doGet(String url, Map<String, String> headers, Map<String, S
123143
* @throws Exception information
124144
*/
125145
public HttpResponse doPost(String url) throws Exception {
126-
return doPost(url, null, null);
146+
return doPost(url, Collections.emptyMap(), Collections.emptyMap());
127147
}
128148

129149
/**
@@ -135,7 +155,7 @@ public HttpResponse doPost(String url) throws Exception {
135155
* @throws Exception information
136156
*/
137157
public HttpResponse doPost(String url, Map<String, String> params) throws Exception {
138-
return doPost(url, null, params);
158+
return doPost(url, Collections.emptyMap(), params);
139159
}
140160

141161
/**
@@ -148,19 +168,47 @@ public HttpResponse doPost(String url, Map<String, String> params) throws Except
148168
* @throws Exception information
149169
*/
150170
public HttpResponse doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
171+
// create a new http get
151172
HttpPost httpPost = new HttpPost(url);
152-
/**
153-
* setConnectTimeout:Set the connection timeout, in milliseconds.
154-
* setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
155-
* If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
156-
*/
157-
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
158-
httpPost.setConfig(requestConfig);
173+
// set default request config
174+
httpPost.setConfig(REQUEST_CONFIG);
159175
// set request header
160176
addHeaders(httpPost, headers);
161-
162-
// Encapsulate request parameters
177+
// set request params
163178
addParameters(httpPost, params);
179+
// return http response
180+
return getResponse(httpPost);
181+
}
182+
183+
/**
184+
* Send a post request with request body and without headers
185+
* @param url request address
186+
* @param body request body conetent
187+
* @return http response result
188+
* @throws Exception information
189+
*/
190+
public HttpResponse doPost(String url, String body) throws Exception {
191+
return doPost(url, Collections.emptyMap(), body);
192+
}
193+
194+
/**
195+
* Send a post request with request headers and request body
196+
* @param url request address
197+
* @param headers request header map
198+
* @param body request body content
199+
* @return http response result
200+
* @throws Exception information
201+
*/
202+
public HttpResponse doPost(String url, Map<String, String> headers, String body) throws Exception {
203+
// create a new http post
204+
HttpPost httpPost = new HttpPost(url);
205+
// set default request config
206+
httpPost.setConfig(REQUEST_CONFIG);
207+
// set request header
208+
addHeaders(httpPost, headers);
209+
// add body in request
210+
addBody(httpPost, body);
211+
// return http response
164212
return getResponse(httpPost);
165213
}
166214

@@ -172,7 +220,7 @@ public HttpResponse doPost(String url, Map<String, String> headers, Map<String,
172220
* @throws Exception information
173221
*/
174222
public HttpResponse doPut(String url) throws Exception {
175-
return doPut(url, null);
223+
return doPut(url, Collections.emptyMap());
176224
}
177225

178226
/**
@@ -184,12 +232,13 @@ public HttpResponse doPut(String url) throws Exception {
184232
* @throws Exception information
185233
*/
186234
public HttpResponse doPut(String url, Map<String, String> params) throws Exception {
187-
235+
// create a new http put
188236
HttpPut httpPut = new HttpPut(url);
189-
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
190-
httpPut.setConfig(requestConfig);
191-
237+
// set default request config
238+
httpPut.setConfig(REQUEST_CONFIG);
239+
// set request params
192240
addParameters(httpPut, params);
241+
// return http response
193242
return getResponse(httpPut);
194243
}
195244

@@ -201,10 +250,11 @@ public HttpResponse doPut(String url, Map<String, String> params) throws Excepti
201250
* @throws Exception information
202251
*/
203252
public HttpResponse doDelete(String url) throws Exception {
204-
253+
// create a new http delete
205254
HttpDelete httpDelete = new HttpDelete(url);
206-
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
207-
httpDelete.setConfig(requestConfig);
255+
// set default request config
256+
httpDelete.setConfig(REQUEST_CONFIG);
257+
// return http response
208258
return getResponse(httpDelete);
209259
}
210260

@@ -244,7 +294,7 @@ private void addParameters(URIBuilder builder, Map<String, String> params) {
244294
if (Objects.isNull(params) || params.isEmpty()) {
245295
return;
246296
}
247-
params.forEach((k, v) -> builder.setParameter(k, v));
297+
params.forEach(builder::setParameter);
248298
}
249299

250300
private void addParameters(HttpEntityEnclosingRequestBase request, Map<String, String> params) throws UnsupportedEncodingException {
@@ -267,7 +317,14 @@ private void addHeaders(HttpRequestBase request, Map<String, String> headers) {
267317
if (Objects.isNull(headers) || headers.isEmpty()) {
268318
return;
269319
}
270-
headers.forEach((k, v) -> request.addHeader(k, v));
320+
headers.forEach(request::addHeader);
321+
}
322+
323+
private void addBody(HttpEntityEnclosingRequestBase request, String body) {
324+
request.addHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON);
325+
StringEntity entity = new StringEntity(body, ContentType.APPLICATION_JSON);
326+
entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON));
327+
request.setEntity(entity);
271328
}
272329

273330
@Override

seatunnel-connectors-v2/connector-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpResponse.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.http.client;
1919

20+
import org.apache.http.HttpStatus;
21+
2022
import java.io.Serializable;
2123

2224
public class HttpResponse implements Serializable {
2325

2426
private static final long serialVersionUID = 2168152194164783950L;
2527

26-
public static final int STATUS_OK = 200;
28+
public static final int STATUS_OK = HttpStatus.SC_OK;
2729
/**
2830
* response status code
2931
*/

0 commit comments

Comments
 (0)