Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: close response body on DataSource closure #3851

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
maven/mavencentral/com.apicatalog/carbon-did/0.0.2, Apache-2.0, approved, #9239

Check warning on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / check / Dash-Verify-Licenses

Restricted Dependencies found

Some dependencies are marked 'restricted' - please review them
maven/mavencentral/com.apicatalog/iron-ed25519-cryptosuite-2020/0.8.1, Apache-2.0, approved, #11157
maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.8.1, Apache-2.0, approved, #9234
maven/mavencentral/com.apicatalog/titanium-json-ld/1.0.0, Apache-2.0, approved, clearlydefined
Expand Down Expand Up @@ -81,7 +81,7 @@
maven/mavencentral/com.networknt/json-schema-validator/1.0.76, Apache-2.0, approved, CQ22638
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.37.3, Apache-2.0, approved, #11701
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.12.3, LGPL-2.1+, restricted, clearlydefined
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.12.3, LGPL-2.1-only AND Apache-2.0 AND LGPL-2.1-or-later AND ANTLR-PD AND GPL-2.0-or-later AND LGPL-2.0-or-later AND (Apache-2.0 AND LGPL-2.1-or-later) AND LicenseRef-scancode-proprietary-license, restricted, #13190
maven/mavencentral/com.samskivert/jmustache/1.15, BSD-2-Clause, approved, clearlydefined
maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.12.0, Apache-2.0, approved, #11159
maven/mavencentral/com.squareup.okhttp3/okhttp/4.12.0, Apache-2.0, approved, #11156
Expand Down Expand Up @@ -190,7 +190,7 @@
maven/mavencentral/net.minidev/accessors-smart/2.4.7, Apache-2.0, approved, #7515
maven/mavencentral/net.minidev/json-smart/2.4.7, Apache-2.0, approved, #3288
maven/mavencentral/net.sf.jopt-simple/jopt-simple/5.0.4, MIT, approved, CQ13174
maven/mavencentral/net.sf.saxon/Saxon-HE/12.3, NOASSERTION, restricted, clearlydefined
maven/mavencentral/net.sf.saxon/Saxon-HE/12.3, MPL-2.0-no-copyleft-exception AND (LicenseRef-scancode-proprietary-license AND MPL-2.0-no-copyleft-exception) AND (MPL-2.0-no-copyleft-exception AND X11) AND (MIT AND MPL-2.0-no-copyleft-exception) AND (MPL-1.0 AND MPL-2.0-no-copyleft-exception) AND (Apache-2.0 AND MPL-2.0-no-copyleft-exception) AND MPL-1.0, restricted, #13191
maven/mavencentral/org.antlr/antlr4-runtime/4.11.1, BSD-3-Clause, approved, clearlydefined
maven/mavencentral/org.apache.commons/commons-compress/1.25.0, Apache-2.0, approved, #11600
maven/mavencentral/org.apache.commons/commons-digester3/3.2, Apache-2.0, approved, clearlydefined
Expand Down Expand Up @@ -242,8 +242,8 @@
maven/mavencentral/org.codehaus.plexus/plexus-utils/3.1.1, , approved, CQ16492
maven/mavencentral/org.codehaus.plexus/plexus-utils/3.3.0, , approved, CQ21066
maven/mavencentral/org.eclipse.angus/angus-activation/1.0.0, EPL-2.0 OR GPL-2.0-only with Classpath-exception-2.0, approved, ee4j.angus
maven/mavencentral/org.eclipse.edc/autodoc-processor/0.5.1-SNAPSHOT, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/runtime-metamodel/0.5.1-SNAPSHOT, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/autodoc-processor/0.5.2-SNAPSHOT, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/runtime-metamodel/0.5.2-SNAPSHOT, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.jetty.toolchain/jetty-jakarta-servlet-api/5.0.2, EPL-2.0 OR Apache-2.0, approved, rt.jetty
maven/mavencentral/org.eclipse.jetty.toolchain/jetty-jakarta-websocket-api/2.0.0, EPL-2.0 OR Apache-2.0, approved, rt.jetty
maven/mavencentral/org.eclipse.jetty.websocket/websocket-core-client/11.0.20, EPL-2.0 OR Apache-2.0, approved, rt.jetty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


import okhttp3.MediaType;
import okhttp3.ResponseBody;
import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory;
import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
Expand All @@ -25,8 +26,10 @@
import org.eclipse.edc.spi.monitor.Monitor;

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static java.lang.String.format;
Expand All @@ -45,6 +48,10 @@ public class HttpDataSource implements DataSource {
private Monitor monitor;
private EdcHttpClient httpClient;
private HttpRequestFactory requestFactory;
private final AtomicReference<ResponseBodyStream> responseBodyStream = new AtomicReference<>();

private HttpDataSource() {
}

@Override
public StreamResult<Stream<Part>> openPartStream() {
Expand All @@ -58,8 +65,10 @@ public StreamResult<Stream<Part>> openPartStream() {
if (body == null) {
throw new EdcException(format("Received empty response body transferring HTTP data for request %s: %s", requestId, response.code()));
}
var stream = body.byteStream();
responseBodyStream.set(new ResponseBodyStream(body, stream));
var mediaType = Optional.ofNullable(body.contentType()).map(MediaType::toString).orElse(OCTET_STREAM);
return success(Stream.of(new HttpPart(name, body.byteStream(), mediaType)));
return success(Stream.of(new HttpPart(name, stream, mediaType)));
} else {
try {
if (NOT_AUTHORIZED == response.code() || FORBIDDEN == response.code()) {
Expand All @@ -83,11 +92,20 @@ public StreamResult<Stream<Part>> openPartStream() {

}

private HttpDataSource() {
}

@Override
public void close() {
var bodyStream = responseBodyStream.get();
if (bodyStream != null) {
bodyStream.responseBody().close();
try {
bodyStream.stream().close();
} catch (IOException e) {
// do nothing
}
}
}

private record ResponseBodyStream(ResponseBody responseBody, InputStream stream) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,104 +14,115 @@

package org.eclipse.edc.connector.dataplane.http.pipeline;

import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.Interceptor;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory;
import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailureArgument;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.TypeManager;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;

import static okhttp3.Protocol.HTTP_1_1;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.NOT_AUTHORIZED;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.junit.testfixtures.TestUtils.testHttpClient;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class HttpDataSourceTest {

private static final ObjectMapper MAPPER = new TypeManager().getMapper();

private String requestId;
private String url;
private final HttpRequestFactory requestFactory = mock(HttpRequestFactory.class);

@BeforeEach
public void setUp() {
requestId = UUID.randomUUID().toString();
url = "http://some.test.url/";
}
private final HttpRequestFactory requestFactory = mock();

@Test
void verifyCallSuccess() throws IOException {
var json = MAPPER.writeValueAsString(Map.of("key1", "Value1"));
var responseBody = ResponseBody.create(json, MediaType.parse("application/json"));
void verifyCallSuccess() {
var responseBody = ResponseBody.create("{}", MediaType.parse("application/json"));

var interceptor = new CustomInterceptor(200, responseBody, "Test message");
var params = mock(HttpRequestParams.class);
var request = new Request.Builder().url(url).get().build();
var request = dummyRequest();
var source = defaultBuilder(interceptor).params(params).requestFactory(requestFactory).build();

when(requestFactory.toRequest(any())).thenReturn(request);

var parts = source.openPartStream().getContent().toList();

var interceptedRequest = interceptor.getInterceptedRequest();
assertThat(interceptedRequest).isEqualTo(request);
assertThat(parts).hasSize(1);
var part = parts.get(0);
assertThat(part.mediaType()).startsWith("application/json");
try (var is = part.openStream()) {
assertThat(new String(is.readAllBytes())).isEqualTo(json);
}
assertThat(parts).hasSize(1).first().satisfies(part -> {
assertThat(part.mediaType()).startsWith("application/json");
assertThat(part.openStream()).hasContent("{}");
});

verify(requestFactory).toRequest(any());
}

@ParameterizedTest
@MethodSource
void verifyCallFailed(StreamFailureArgument argument) {
var message = "Test message";
var body = "Test body";
var interceptor = new CustomInterceptor(argument.getCode(), ResponseBody.create(body, MediaType.parse("text/plain")), message);
var params = mock(HttpRequestParams.class);
var request = new Request.Builder().url(url).get().build();
var source = defaultBuilder(interceptor).params(params).requestFactory(requestFactory).build();

when(requestFactory.toRequest(any())).thenReturn(request);
@ArgumentsSource(StreamFailureArguments.class)
void verifyCallFailed(int code, StreamFailure.Reason reason) {
var responseBody = ResponseBody.create("Test body", MediaType.parse("text/plain"));
var interceptor = new CustomInterceptor(code, responseBody, "Test message");
var source = defaultBuilder(interceptor).params(mock()).requestFactory(requestFactory).build();
when(requestFactory.toRequest(any())).thenReturn(dummyRequest());

var result = source.openPartStream();
assertThat(result.failed()).isTrue();
assertThat(result.reason()).isEqualTo(argument.getReason());

assertThat(result).isFailed().extracting(StreamFailure::getReason).isEqualTo(reason);
verify(requestFactory).toRequest(any());
}

static Stream<StreamFailureArgument> verifyCallFailed() {
return Stream.of(
new StreamFailureArgument(400, GENERAL_ERROR),
new StreamFailureArgument(401, NOT_AUTHORIZED),
new StreamFailureArgument(403, NOT_AUTHORIZED),
new StreamFailureArgument(500, GENERAL_ERROR));
@Test
void close_shouldCloseResponseBodyAndStream() throws IOException {
InputStream stream = mock();
var responseBody = spy(ResponseBody.create("{}", MediaType.parse("application/json")));
when(responseBody.byteStream()).thenReturn(stream);
var interceptor = new CustomInterceptor(200, responseBody, "Test message");
var source = defaultBuilder(interceptor).params(mock()).requestFactory(requestFactory).build();
when(requestFactory.toRequest(any())).thenReturn(dummyRequest());

source.openPartStream();
source.close();

verify(responseBody).close();
verify(stream).close();
}

@NotNull
private Request dummyRequest() {
return new Request.Builder().url("http://some.test.url/").get().build();
}

private static class StreamFailureArguments implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
return Stream.of(
arguments(400, GENERAL_ERROR),
arguments(401, NOT_AUTHORIZED),
arguments(403, NOT_AUTHORIZED),
arguments(500, GENERAL_ERROR)
);
}
}

private HttpDataSource.Builder defaultBuilder(Interceptor interceptor) {
Expand All @@ -120,7 +131,7 @@ private HttpDataSource.Builder defaultBuilder(Interceptor interceptor) {
.httpClient(httpClient)
.name("test-name")
.monitor(mock(Monitor.class))
.requestId(requestId);
.requestId(UUID.randomUUID().toString());
}

static final class CustomInterceptor implements Interceptor {
Expand Down

This file was deleted.

Loading