Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## [Unreleased]
- Allow config control of log HTTP request, response and header logging content

## [0.23.0] - 2025-11-07

Expand Down
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,26 @@ an example of a customised grant type token request. The supplied `token request
a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will
be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`.

## Logging the http content
Debug level logging has been added for class `com.getindata.connectors.http.internal.HttpLogger`. To enable this, alter the log4j properties.
This logging puts out log entries for the HTTP requests and responses. This can be useful for diagnostics to confirm that HTTP requests have been issued and what
that HTTP responses or an exception has occurred (for example connection Refused).

Logging HTTP may not be appropriate for production systems; where sensitive information is not allowed into the logs. But in development environments it is useful
to be able to see HTTP content. Sensitive information can occur in the headers for example authentication tokens and passwords. Also the HTTP request and response bodies
could sensitive. The default minimal logging should be used in production. For development, you can specify config option `gid.connector.http.logging.level`.
This dictates the amount of content that debug logging will show around HTTP calls; the valid values are:

| log level | Request method | URI | HTTP Body | Response status code | Headers |
|-------------|----------------|-----|-----------|----------------------|---------|
| MIN | Y | Y | N | Y | N |
| REQRESPONSE | Y | Y | Y | Y | N |
| MAX | Y | Y | Y | Y | Y |

Notes:
- you can customize what is traced for lookups using the `gid.connector.http.source.lookup.request-callback`.
- where there is an N in the table the output is obfuscated.

### Restrictions at this time
* No authentication is applied to the token request.
* The processing does not use the refresh token if it present.
Expand All @@ -562,6 +582,7 @@ be requested if the current time is later than the cached token expiry time minu
| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. |
| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. |
| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). |
| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
Expand Down Expand Up @@ -609,6 +630,7 @@ be requested if the current time is later than the cached token expiry time minu
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
| format | required | Specify what format to use. |
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
Expand Down
115 changes: 115 additions & 0 deletions src/main/java/com/getindata/connectors/http/internal/HttpLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.getindata.connectors.http.internal;

import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringJoiner;

import lombok.extern.slf4j.Slf4j;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupSourceRequestEntry;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;

@Slf4j
public class HttpLogger {

private final HttpLoggingLevelType httpLoggingLevelType;

public static HttpLogger getHttpLogger(Properties properties) {
return new HttpLogger(properties);
}

public void logRequest(HttpRequest httpRequest) {
log.debug(createStringForRequest(httpRequest));
}

public void logResponse(HttpResponse<String> response) {
log.debug(createStringForResponse(response));
}

public void logRequestBody(String body) {
log.debug(createStringForBody(body));
}

public void logExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) {
log.debug(createStringForExceptionResponse(request, e));
}

private HttpLogger(Properties properties) {
String code = (String) properties.get(HTTP_LOGGING_LEVEL);
this.httpLoggingLevelType = HttpLoggingLevelType.valueOfStr(code);
}

String createStringForRequest(HttpRequest httpRequest) {
String headersForLog = getHeadersForLog(httpRequest.headers());
return String.format("HTTP %s Request: URL: %s, Headers: %s",
httpRequest.method(),
httpRequest.uri().toString(),
headersForLog
);
}

private String getHeadersForLog(HttpHeaders httpHeaders) {
if (httpHeaders == null) return "None";
Map<String, List<String>> headersMap = httpHeaders.map();
if (headersMap.isEmpty()) return "None";
if (this.httpLoggingLevelType == HttpLoggingLevelType.MAX) {
StringJoiner headers = new StringJoiner(";");
for (Map.Entry<String, List<String>> reqHeaders : headersMap.entrySet()) {
StringJoiner values = new StringJoiner(";");
for (String value : reqHeaders.getValue()) {
values.add(value);
}
String header = reqHeaders.getKey() + ":[" + values + "]";
headers.add(header);
}
return headers.toString();
}
return "***";
}

String createStringForResponse(HttpResponse<String> response) {
String headersForLog = getHeadersForLog(response.headers());

String bodyForLog = "***";
if (response.body() == null || response.body().isEmpty()) {
bodyForLog = "None";
} else {
if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
bodyForLog = response.body().toString();
}
}
return String.format("HTTP %s Response: URL: %s,"
+ " Response Headers: %s, status code: %s, Response Body: %s",
response.request().method(),
response.uri(),
headersForLog,
response.statusCode(),
bodyForLog
);
}

private String createStringForExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) {
HttpRequest httpRequest = request.getHttpRequest();
return String.format("HTTP %s Exception Response: URL: %s Exception %s",
httpRequest.method(),
httpRequest.uri(),
e
);
}

String createStringForBody(String body) {
String bodyForLog = "***";
if (body == null || body.isEmpty()) {
bodyForLog = "None";
} else {
if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
bodyForLog = body.toString();
}
}
return bodyForLog;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.getindata.connectors.http.internal;

public enum HttpLoggingLevelType {
MIN,
REQRESPONSE,
MAX;

public static HttpLoggingLevelType valueOfStr(String code) {
if (code == null) {
return MIN;
} else {
return valueOf(code);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public final class HttpConnectorConfigConstants {
public static final String SOURCE_PROXY_PASSWORD =
SOURCE_LOOKUP_PREFIX + "proxy.password";

public static final String HTTP_LOGGING_LEVEL =
GID_CONNECTOR_HTTP + "logging.level";

public static final String SINK_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "sink.request.timeout";

Expand All @@ -118,6 +121,7 @@ public final class HttpConnectorConfigConstants {
public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";


// -----------------------------------------------------


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
}

var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
String previousReqeustMethod = requestsToSubmit.get(0).method;
String previousRequestMethod = requestsToSubmit.get(0).method;
List<HttpSinkRequestEntry> requestBatch = new ArrayList<>(httpRequestBatchSize);

for (var entry : requestsToSubmit) {
if (requestBatch.size() == httpRequestBatchSize
|| !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
|| !previousRequestMethod.equalsIgnoreCase(entry.method)) {
// break batch and submit
responseFutures.add(sendBatch(endpointUrl, requestBatch));
requestBatch.clear();
}
requestBatch.add(entry);
previousReqeustMethod = entry.method;
previousRequestMethod = entry.method;
}

// submit anything that left
Expand All @@ -84,9 +84,9 @@ int getBatchSize() {

private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
String endpointUrl,
List<HttpSinkRequestEntry> reqeustBatch) {
List<HttpSinkRequestEntry> requestBatch) {

HttpRequest httpRequest = buildHttpRequest(reqeustBatch, URI.create(endpointUrl));
HttpRequest httpRequest = buildHttpRequest(requestBatch, URI.create(endpointUrl));
return httpClient
.sendAsync(
httpRequest.getHttpRequest(),
Expand All @@ -102,19 +102,19 @@ private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
);
}

private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, URI endpointUri) {
private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> requestBatch, URI endpointUri) {

try {
var method = reqeustBatch.get(0).method;
List<byte[]> elements = new ArrayList<>(reqeustBatch.size());
var method = requestBatch.get(0).method;
List<byte[]> elements = new ArrayList<>(requestBatch.size());

BodyPublisher publisher;
// By default, Java's BodyPublishers.ofByteArrays(elements) will just put Jsons
// into the HTTP body without any context.
// What we do here is we pack every Json/byteArray into Json Array hence '[' and ']'
// at the end, and we separate every element with comma.
elements.add(BATCH_START_BYTES);
for (HttpSinkRequestEntry entry : reqeustBatch) {
for (HttpSinkRequestEntry entry : requestBatch) {
elements.add(entry.element);
elements.add(BATCH_ELEMENT_DELIM_BYTES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.HttpLogger;
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
Expand All @@ -40,6 +41,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {

private final RequestSubmitter requestSubmitter;

private final Properties properties;

public JavaNetSinkHttpClient(
Properties properties,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
Expand Down Expand Up @@ -69,6 +72,7 @@ public JavaNetSinkHttpClient(
properties,
headersAndValues
);
this.properties = properties;
}

@Override
Expand Down Expand Up @@ -98,10 +102,9 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
for (var response : responses) {
var sinkRequestEntry = response.getHttpRequest();
var optResponse = response.getResponse();

HttpLogger.getHttpLogger(properties).logResponse(response.getResponse().get());
httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);

// TODO Add response processor here and orchestrate it with statusCodeChecker.
if (optResponse.isEmpty() ||
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public PerRequestSubmitter(
HttpClient httpClient) {

super(properties, headersAndValues, httpClient);

}

@Override
Expand All @@ -40,8 +41,7 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(

for (var entry : requestToSubmit) {
HttpRequest httpRequest = buildHttpRequest(entry, endpointUri);
var response = httpClient
.sendAsync(
var response = httpClient.sendAsync(
httpRequest.getHttpRequest(),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
Expand Down
Loading
Loading