Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK][FLINK][JAVA] http timeout fix #2475

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# Changelog

## [Unreleased](https://github.com/OpenLineage/OpenLineage/compare/1.9.1...HEAD)
### Added
* **Flink: improve Cassandra lineage metadata** (https://github.com/OpenLineage/OpenLineage/pull/2479) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Use Cassandra cluster info as dataset namespace, and combine keyspace with table name as dataset name.*
* **Flink: bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18** (https://github.com/OpenLineage/OpenLineage/pull/2472) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18.*

### Fixed
* **Spark: fix `HttpTransport` timeout.** [`#2475`](https://github.com/OpenLineage/OpenLineage/pull/2475) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Existing `timeout` config parameter is ambiguous: implementation treats value as double in seconds, although documentation claims it's milliseconds. New config param `timeoutInMillis` is added. Existing `timeout` gets removed from docs and will be deprecated in 1.13.*


## [1.9.1](https://github.com/OpenLineage/OpenLineage/compare/1.8.0...1.9.1) - 2024-02-26
### Added
* **Airflow: add support for `JobTypeJobFacet` properties** [`#2412`](https://github.com/OpenLineage/OpenLineage/pull/2412) [@mattiabertorello](https://github.com/mattiabertorello)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
public final class HttpConfig implements TransportConfig {
@Getter @Setter private URI url;
@Getter @Setter private @Nullable String endpoint;
@Getter @Setter private @Nullable Double timeout;

@Getter @Setter
private @Nullable Double
timeout; // deprecated, will be removed in 1.13, assumes timeout is in seconds

@Getter @Setter private @Nullable Integer timeoutInMillis;
@Getter @Setter private @Nullable TokenProvider auth;
@Getter @Setter private @Nullable Map<String, String> urlParams;
@Getter @Setter private @Nullable Map<String, String> headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,19 @@ public final class HttpTransport extends Transport implements Closeable {
private final Map<String, String> headers;

public HttpTransport(@NonNull final HttpConfig httpConfig) {
this(withTimeout(httpConfig.getTimeout()), httpConfig);
this(withTimeout(httpConfig), httpConfig);
}

private static CloseableHttpClient withTimeout(Double timeout) {
private static CloseableHttpClient withTimeout(HttpConfig httpConfig) {
int timeoutMs;
if (timeout == null) {
timeoutMs = 5000;
if (httpConfig.getTimeout() != null) {
// deprecated approach, value in seconds as double provided
timeoutMs = (int) (httpConfig.getTimeout() * 1000);
} else if (httpConfig.getTimeoutInMillis() != null) {
timeoutMs = httpConfig.getTimeoutInMillis();
} else {
timeoutMs = (int) (timeout * 1000);
// default one
timeoutMs = 5000;
}

RequestConfig config =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -34,12 +35,15 @@
import java.util.stream.Collectors;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;

class HttpTransportTest {

Expand Down Expand Up @@ -295,4 +299,38 @@ void clientEmitsJobEventHttpTransport() throws IOException {

verify(http, times(1)).execute(any());
}

@Test
void testTimeout() {
HttpConfig config = new HttpConfig();
config.setUrl(URI.create("https://localhost:1500/api/v1/lineage"));
config.setTimeout(2.5d); // 2.5 seconds

Builder builder = mock(Builder.class);
try (MockedStatic mocked = mockStatic(RequestConfig.class)) {
when(RequestConfig.custom()).thenReturn(builder);
when(builder.setConnectTimeout(2500)).thenReturn(builder);
when(builder.setConnectionRequestTimeout(2500)).thenReturn(builder);
when(builder.setSocketTimeout(2500)).thenReturn(builder);

new HttpTransport(config);
}
}

@Test
void testTimeoutInMillis() {
HttpConfig config = new HttpConfig();
config.setUrl(URI.create("https://localhost:1500/api/v1/lineage"));
config.setTimeoutInMillis(3000); // 3 seconds

Builder builder = mock(Builder.class);
try (MockedStatic mocked = mockStatic(RequestConfig.class)) {
when(RequestConfig.custom()).thenReturn(builder);
when(builder.setConnectTimeout(3000)).thenReturn(builder);
when(builder.setConnectionRequestTimeout(3000)).thenReturn(builder);
when(builder.setSocketTimeout(3000)).thenReturn(builder);

new HttpTransport(config);
}
}
}