Skip to content

Commit

Permalink
[SPARK][FLINK][JAVA] http timeout fix
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Feb 28, 2024
1 parent 6b60a06 commit 3b114f9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Changelog

## [Unreleased](https://github.com/OpenLineage/OpenLineage/compare/1.9.1...HEAD)

### Added
* **Flink: bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18** (https://github.com/OpenLineage/OpenLineage/pull/2472) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18.*

### Fixed
* **Spark: fix `HttpTransport` timeout.** [`#2433`](https://github.com/OpenLineage/OpenLineage/pull/2433) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Timeout for HTTP transport is provided in milliseconds as stated in documentation (This changes current behaviour).*

## [1.9.1](https://github.com/OpenLineage/OpenLineage/compare/1.8.0...1.9.1) - 2024-02-26
### Added
* **Airflow: add support for `JobTypeJobFacet` properties** [`#2412`](https://github.com/OpenLineage/OpenLineage/pull/2412) [@mattiabertorello](https://github.com/mattiabertorello)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private static CloseableHttpClient withTimeout(Double timeout) {
if (timeout == null) {
timeoutMs = 5000;
} else {
timeoutMs = (int) (timeout * 1000);
timeoutMs = (int) Math.round(timeout);
}

RequestConfig config =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -34,12 +35,15 @@
import java.util.stream.Collectors;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;

class HttpTransportTest {

Expand Down Expand Up @@ -295,4 +299,21 @@ void clientEmitsJobEventHttpTransport() throws IOException {

verify(http, times(1)).execute(any());
}

@Test
void testTimeout() {
HttpConfig config = new HttpConfig();
config.setUrl(URI.create("https://localhost:1500/api/v1/lineage"));
config.setTimeout(3000d); // 3 seconds

Builder builder = mock(Builder.class);
try (MockedStatic mocked = mockStatic(RequestConfig.class)) {
when(RequestConfig.custom()).thenReturn(builder);
when(builder.setConnectTimeout(3000)).thenReturn(builder);
when(builder.setConnectionRequestTimeout(3000)).thenReturn(builder);
when(builder.setSocketTimeout(3000)).thenReturn(builder);

new HttpTransport(config);
}
}
}

0 comments on commit 3b114f9

Please sign in to comment.