From 41c56e22ad482255bf4aac61432bdf43a2f146d0 Mon Sep 17 00:00:00 2001 From: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Date: Tue, 31 Aug 2021 13:36:28 -0700 Subject: [PATCH 1/5] Attempt to create a test that replicates leaking buffers when response isn't read --- .../core/http/netty/NettyAsyncHttpClient.java | 18 +-- .../netty/NettyAsyncHttpResponseTests.java | 2 + .../HttpResponseDrainsBufferTests.java | 142 ++++++++++++++++++ 3 files changed, 153 insertions(+), 9 deletions(-) create mode 100644 sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java diff --git a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java index a177fc0db63cc..29e72c9593cff 100644 --- a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java +++ b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java @@ -177,11 +177,11 @@ private static BiFunction { /* - * If we are eagerly reading the response into memory we can ignore the disable buffer copy flag as we - * MUST deeply copy the buffer to ensure it can safely be used downstream. + * If the response is being eagerly read into memory the flag for buffer copying can be ignored as the + * response MUST be deeply copied to ensure it can safely be used downstream. */ if (eagerlyReadResponse) { - // Setup the body flux and dispose the connection once it has been received. + // Set up the body flux and dispose the connection once it has been received. Flux body = reactorNettyConnection.inbound().receive().asByteBuffer() .doFinally(ignored -> closeConnection(reactorNettyConnection)); @@ -197,15 +197,15 @@ private static BiFunction + * These tests are isolated from other {@link HttpResponse} tests as they require running the garbage collector to force + * the JVM to destroy buffers that no longer have pointers to them. + */ +@Isolated +@Execution(ExecutionMode.SAME_THREAD) +public class HttpResponseDrainsBufferTests { + private static ResourceLeakDetector.Level originalLevel; + private static WireMockServer wireMockServer; + + private static final String LONG_BODY_PATH = "/long"; + private static final byte[] LONG_BODY = new byte[16 * 1024 * 1024]; // 16 MB + + static { + new SecureRandom().nextBytes(LONG_BODY); + } + + @BeforeAll + public static void setupMockServer() { + originalLevel = ResourceLeakDetector.getLevel(); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + + wireMockServer = new WireMockServer(wireMockConfig() + .dynamicPort() + .disableRequestJournal() + .gzipDisabled(true)); + + wireMockServer.stubFor(get(LONG_BODY_PATH).willReturn(aResponse().withBody(LONG_BODY))); + wireMockServer.start(); + } + + @AfterAll + public static void tearDownMockServer() { + ResourceLeakDetector.setLevel(originalLevel); + if (wireMockServer != null) { + wireMockServer.shutdown(); + } + } + + @Test + @SuppressWarnings("deprecation") + public void closeHttpResponseBeforeConsumingBody() throws InterruptedException { + Collection> leakDetectors = new ConcurrentLinkedDeque<>(); + ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new ResourceLeakDetectorFactory() { + @Override + public ResourceLeakDetector newResourceLeakDetector(Class resource, int samplingInterval, + long maxActive) { + TestResourceLeakDetector leakDetector = new TestResourceLeakDetector<>(resource, samplingInterval, + maxActive); + leakDetectors.add(leakDetector); + return leakDetector; + } + }); + + HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); + + StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, url(wireMockServer))) + .flatMap(response -> { + assertNotNull(response.getHeaders()); + return Mono.fromRunnable(response::close); + })) + .verifyComplete(); + + // GC twice to ensure full cleanup. + Thread.sleep(2000); + Runtime.getRuntime().gc(); + + Thread.sleep(2000); + Runtime.getRuntime().gc(); + + assertEquals(0, leakDetectors.stream().mapToInt(TestResourceLeakDetector::getReportedLeakCount).sum()); + } + + @SuppressWarnings("deprecation") + private static final class TestResourceLeakDetector extends ResourceLeakDetector { + private final AtomicInteger reportTracedLeakCount = new AtomicInteger(); + private final AtomicInteger reportUntracedLeakCount = new AtomicInteger(); + + TestResourceLeakDetector(Class resource, int samplingInterval, long maxActive) { + super(resource, samplingInterval, maxActive); + } + + @Override + protected void reportTracedLeak(String resourceType, String records) { + super.reportTracedLeak(resourceType, records); + } + + @Override + protected void reportUntracedLeak(String resourceType) { + super.reportUntracedLeak(resourceType); + } + + public int getReportedLeakCount() { + return reportTracedLeakCount.get() + reportUntracedLeakCount.get(); + } + } + + private static URL url(WireMockServer server) { + try { + return new URL("http://localhost:" + server.port() + LONG_BODY_PATH); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } +} From 0311f3bc5c7a89355aeace8efc5b913dc5cb4474 Mon Sep 17 00:00:00 2001 From: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Date: Tue, 31 Aug 2021 16:20:55 -0700 Subject: [PATCH 2/5] Add tests to ensure response is drained when HttpResponse is closed --- .../http/netty/implementation/Utility.java | 4 +- .../HttpResponseDrainsBufferTests.java | 79 ++++++++++++++----- 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java index aeea5f869089d..56381a7da6ae0 100644 --- a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java +++ b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java @@ -34,9 +34,7 @@ public static ByteBuffer deepCopyBuffer(ByteBuf byteBuf) { * @param reactorNettyConnection The connection to close. */ public static void closeConnection(Connection reactorNettyConnection) { - if (!reactorNettyConnection.isDisposed()) { - reactorNettyConnection.channel().eventLoop().execute(reactorNettyConnection::dispose); - } + reactorNettyConnection.channel().eventLoop().execute(reactorNettyConnection::dispose); } private Utility() { diff --git a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java index 81c6f9be02938..c10133b5d3e38 100644 --- a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java +++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java @@ -12,7 +12,9 @@ import io.netty.util.ResourceLeakDetector; import io.netty.util.ResourceLeakDetectorFactory; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; @@ -26,12 +28,12 @@ import java.util.Collection; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; /** * Tests that closing the {@link HttpResponse} drains the network buffers. @@ -52,6 +54,10 @@ public class HttpResponseDrainsBufferTests { new SecureRandom().nextBytes(LONG_BODY); } + private ResourceLeakDetectorFactory originalLeakDetectorFactory; + private final TestResourceLeakDetectorFactory testResourceLeakDetectorFactory = + new TestResourceLeakDetectorFactory(); + @BeforeAll public static void setupMockServer() { originalLevel = ResourceLeakDetector.getLevel(); @@ -66,6 +72,17 @@ public static void setupMockServer() { wireMockServer.start(); } + @BeforeEach + public void setupLeakDetectorFactory() { + originalLeakDetectorFactory = ResourceLeakDetectorFactory.instance(); + ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(testResourceLeakDetectorFactory); + } + + @AfterEach + public void resetLeakDetectorFactory() { + ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(originalLeakDetectorFactory); + } + @AfterAll public static void tearDownMockServer() { ResourceLeakDetector.setLevel(originalLevel); @@ -75,28 +92,28 @@ public static void tearDownMockServer() { } @Test - @SuppressWarnings("deprecation") - public void closeHttpResponseBeforeConsumingBody() throws InterruptedException { - Collection> leakDetectors = new ConcurrentLinkedDeque<>(); - ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new ResourceLeakDetectorFactory() { - @Override - public ResourceLeakDetector newResourceLeakDetector(Class resource, int samplingInterval, - long maxActive) { - TestResourceLeakDetector leakDetector = new TestResourceLeakDetector<>(resource, samplingInterval, - maxActive); - leakDetectors.add(leakDetector); - return leakDetector; - } - }); + public void closeHttpResponseWithoutConsumingBody() throws InterruptedException { + runScenario(response -> Mono.fromRunnable(response::close)); + } + + @Test + public void closeHttpResponseWithConsumingPartialBody() throws InterruptedException { + runScenario(response -> response.getBody().next().flatMap(ignored -> Mono.fromRunnable(response::close))); + } + @Test + public void closeHttpResponseWithConsumingFullBody() throws InterruptedException { + runScenario(response -> response.getBodyAsByteArray().flatMap(ignored -> Mono.fromRunnable(response::close))); + } + + private void runScenario(Function> responseConsumer) throws InterruptedException { HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); + Mono requestMaker = httpClient.send(new HttpRequest(HttpMethod.GET, url(wireMockServer))) + .flatMap(responseConsumer) + .repeat(100) + .then(); - StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, url(wireMockServer))) - .flatMap(response -> { - assertNotNull(response.getHeaders()); - return Mono.fromRunnable(response::close); - })) - .verifyComplete(); + StepVerifier.create(requestMaker).verifyComplete(); // GC twice to ensure full cleanup. Thread.sleep(2000); @@ -105,7 +122,25 @@ public ResourceLeakDetector newResourceLeakDetector(Class resource, in Thread.sleep(2000); Runtime.getRuntime().gc(); - assertEquals(0, leakDetectors.stream().mapToInt(TestResourceLeakDetector::getReportedLeakCount).sum()); + assertEquals(0, testResourceLeakDetectorFactory.getTotalReportedLeakCount()); + } + + private static final class TestResourceLeakDetectorFactory extends ResourceLeakDetectorFactory { + private final Collection> createdDetectors = new ConcurrentLinkedDeque<>(); + + @Override + @SuppressWarnings("deprecation") // API is deprecated but abstract + public ResourceLeakDetector newResourceLeakDetector(Class resource, int samplingInterval, + long maxActive) { + TestResourceLeakDetector leakDetector = new TestResourceLeakDetector<>(resource, samplingInterval, + maxActive); + createdDetectors.add(leakDetector); + return leakDetector; + } + + public int getTotalReportedLeakCount() { + return createdDetectors.stream().mapToInt(TestResourceLeakDetector::getReportedLeakCount).sum(); + } } @SuppressWarnings("deprecation") @@ -119,11 +154,13 @@ private static final class TestResourceLeakDetector extends ResourceLeakDetec @Override protected void reportTracedLeak(String resourceType, String records) { + reportTracedLeakCount.incrementAndGet(); super.reportTracedLeak(resourceType, records); } @Override protected void reportUntracedLeak(String resourceType) { + reportUntracedLeakCount.incrementAndGet(); super.reportUntracedLeak(resourceType); } From 65397a85e50aab643ee75d77bfbf4e565863c6c4 Mon Sep 17 00:00:00 2001 From: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Date: Tue, 31 Aug 2021 16:25:10 -0700 Subject: [PATCH 3/5] Add test to ensure closing is idempotent --- .../HttpResponseDrainsBufferTests.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java index c10133b5d3e38..5d3bd4946186a 100644 --- a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java +++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java @@ -25,6 +25,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.security.SecureRandom; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; @@ -106,6 +107,18 @@ public void closeHttpResponseWithConsumingFullBody() throws InterruptedException runScenario(response -> response.getBodyAsByteArray().flatMap(ignored -> Mono.fromRunnable(response::close))); } + @Test + public void closingHttpResponseIsIdempotent() { + HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); + StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, url(wireMockServer))) + .flatMap(response -> Mono.fromRunnable(response::close).thenReturn(response)) + .delayElement(Duration.ofSeconds(2)) + .flatMap(response -> Mono.fromRunnable(response::close)) + .delayElement(Duration.ofSeconds(2)) + .then()) + .verifyComplete(); + } + private void runScenario(Function> responseConsumer) throws InterruptedException { HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); Mono requestMaker = httpClient.send(new HttpRequest(HttpMethod.GET, url(wireMockServer))) From 180106883d0099d915729c1a41ac55a4ba231d31 Mon Sep 17 00:00:00 2001 From: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Date: Wed, 1 Sep 2021 15:39:18 -0700 Subject: [PATCH 4/5] Update NettyAsyncHttpResponse.close logic and fixed failing tests --- .../core/http/netty/NettyAsyncHttpClient.java | 43 +++--- .../http/netty/implementation/Utility.java | 18 ++- .../http/netty/NettyAsyncHttpClientTests.java | 31 +---- .../netty/NettyAsyncHttpResponseTests.java | 14 -- .../HttpResponseDrainsBufferTests.java | 84 ++++++------ .../test/implementation/RestProxyTests.java | 127 +++++++----------- 6 files changed, 136 insertions(+), 181 deletions(-) diff --git a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java index 29e72c9593cff..c2b4cf163071f 100644 --- a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java +++ b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/NettyAsyncHttpClient.java @@ -31,11 +31,8 @@ import reactor.util.retry.Retry; import javax.net.ssl.SSLException; -import java.nio.ByteBuffer; import java.time.Duration; import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import static com.azure.core.http.netty.implementation.Utility.closeConnection; @@ -51,6 +48,7 @@ * @see NettyAsyncHttpClientBuilder */ class NettyAsyncHttpClient implements HttpClient { + private static final String AZURE_EAGERLY_READ_RESPONSE = "azure-eagerly-read-response"; private static final String AZURE_RESPONSE_TIMEOUT = "azure-response-timeout"; final boolean disableBufferCopy; @@ -67,7 +65,7 @@ class NettyAsyncHttpClient implements HttpClient { * @param disableBufferCopy Determines whether deep cloning of response buffers should be disabled. */ NettyAsyncHttpClient(reactor.netty.http.client.HttpClient nettyClient, boolean disableBufferCopy, - long readTimeout, long writeTimeout, long responseTimeout) { + long readTimeout, long writeTimeout, long responseTimeout) { this.nettyClient = nettyClient; this.disableBufferCopy = disableBufferCopy; this.readTimeout = readTimeout; @@ -89,23 +87,21 @@ public Mono send(HttpRequest request, Context context) { Objects.requireNonNull(request.getUrl(), "'request.getUrl()' cannot be null."); Objects.requireNonNull(request.getUrl().getProtocol(), "'request.getUrl().getProtocol()' cannot be null."); - boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false); - - Optional requestResponseTimeout = context.getData(AZURE_RESPONSE_TIMEOUT); - long effectiveResponseTimeout = requestResponseTimeout - .map(timeoutDuration -> ((Duration) timeoutDuration).toMillis()) - .orElse(this.responseTimeout); + boolean effectiveEagerlyReadResponse = (boolean) context.getData(AZURE_EAGERLY_READ_RESPONSE).orElse(false); + long effectiveResponseTimeout = context.getData(AZURE_RESPONSE_TIMEOUT) + .filter(timeoutDuration -> timeoutDuration instanceof Duration) + .map(timeoutDuration -> ((Duration) timeoutDuration).toMillis()) + .orElse(this.responseTimeout); return nettyClient .doOnRequest((r, connection) -> addWriteTimeoutHandler(connection, writeTimeout)) - .doAfterRequest((r, connection) -> - addResponseTimeoutHandler(connection, effectiveResponseTimeout)) + .doAfterRequest((r, connection) -> addResponseTimeoutHandler(connection, effectiveResponseTimeout)) .doOnResponse((response, connection) -> addReadTimeoutHandler(connection, readTimeout)) .doAfterResponseSuccess((response, connection) -> removeReadTimeoutHandler(connection)) .request(HttpMethod.valueOf(request.getHttpMethod().toString())) .uri(request.getUrl().toString()) .send(bodySendDelegate(request)) - .responseConnection(responseDelegate(request, disableBufferCopy, eagerlyReadResponse)) + .responseConnection(responseDelegate(request, disableBufferCopy, effectiveEagerlyReadResponse)) .single() .onErrorMap(throwable -> { // The exception was an SSLException that was caused by a failure to connect to a proxy. @@ -144,14 +140,15 @@ private static BiFunction> bod // adding a header twice that isn't allowed, such as User-Agent, check against the initial request // header names. If our request header already exists in the Netty request we overwrite it initially // then append our additional values if it is a multi-value header. - final AtomicBoolean first = new AtomicBoolean(true); - hdr.getValuesList().forEach(value -> { - if (first.compareAndSet(true, false)) { + boolean first = true; + for (String value : hdr.getValuesList()) { + if (first) { + first = false; reactorNettyRequest.header(hdr.getName(), value); } else { reactorNettyRequest.addHeader(hdr.getName(), value); } - }); + } } else { hdr.getValuesList().forEach(value -> reactorNettyRequest.addHeader(hdr.getName(), value)); } @@ -182,13 +179,11 @@ private static BiFunction body = reactorNettyConnection.inbound().receive().asByteBuffer() - .doFinally(ignored -> closeConnection(reactorNettyConnection)); - - return FluxUtil.collectBytesFromNetworkResponse(body, + return FluxUtil.collectBytesFromNetworkResponse( + reactorNettyConnection.inbound().receive().asByteBuffer(), new NettyToAzureCoreHttpHeadersWrapper(reactorNettyResponse.responseHeaders())) + .doFinally(ignored -> closeConnection(reactorNettyConnection)) .map(bytes -> new NettyAsyncHttpBufferedResponse(reactorNettyResponse, restRequest, bytes)); - } else { return Mono.just(new NettyAsyncHttpResponse(reactorNettyResponse, reactorNettyConnection, restRequest, disableBufferCopy)); @@ -209,7 +204,7 @@ private static void addWriteTimeoutHandler(Connection connection, long timeoutMi */ private static void addResponseTimeoutHandler(Connection connection, long timeoutMillis) { connection.removeHandler(WriteTimeoutHandler.HANDLER_NAME) - .addHandlerLast(ResponseTimeoutHandler.HANDLER_NAME, new ResponseTimeoutHandler(timeoutMillis)); + .addHandlerLast(ResponseTimeoutHandler.HANDLER_NAME, new ResponseTimeoutHandler(timeoutMillis)); } /* @@ -218,7 +213,7 @@ private static void addResponseTimeoutHandler(Connection connection, long timeou */ private static void addReadTimeoutHandler(Connection connection, long timeoutMillis) { connection.removeHandler(ResponseTimeoutHandler.HANDLER_NAME) - .addHandlerLast(ReadTimeoutHandler.HANDLER_NAME, new ReadTimeoutHandler(timeoutMillis)); + .addHandlerLast(ReadTimeoutHandler.HANDLER_NAME, new ReadTimeoutHandler(timeoutMillis)); } /* diff --git a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java index 56381a7da6ae0..49e70acaf3cad 100644 --- a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java +++ b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java @@ -5,6 +5,7 @@ import io.netty.buffer.ByteBuf; import reactor.netty.Connection; +import reactor.netty.channel.ChannelOperations; import java.nio.ByteBuffer; @@ -34,7 +35,22 @@ public static ByteBuffer deepCopyBuffer(ByteBuf byteBuf) { * @param reactorNettyConnection The connection to close. */ public static void closeConnection(Connection reactorNettyConnection) { - reactorNettyConnection.channel().eventLoop().execute(reactorNettyConnection::dispose); + // ChannelOperations is generally the default implementation of Connection used. + // + // Using the specific subclass allows for a finer grain handling. + if (reactorNettyConnection instanceof ChannelOperations) { + ChannelOperations channelOperations = (ChannelOperations) reactorNettyConnection; + + // Given that this is an HttpResponse the only time this will be called is when the outbound has completed. + // + // From there the only thing that needs to be checked is whether the inbound has been disposed (completed), + // and if not disposed it (aka drain it). + if (!channelOperations.isInboundDisposed()) { + channelOperations.channel().eventLoop().execute(channelOperations::discard); + } + } else if (!reactorNettyConnection.isDisposed()) { + reactorNettyConnection.channel().eventLoop().execute(reactorNettyConnection::dispose); + } } private Utility() { diff --git a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpClientTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpClientTests.java index 3651b7c5e3e8e..d1793d61166c3 100644 --- a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpClientTests.java +++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpClientTests.java @@ -40,8 +40,6 @@ import java.net.URL; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -255,21 +253,14 @@ public void testConcurrentRequests() { HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); int numberOfRequests = 100; // 100 = 100MB of data - byte[] expectedDigest = digest(); - - Mono numberOfBytesMono; - numberOfBytesMono = Flux.range(1, numberOfRequests) + Mono numberOfBytesMono = Flux.range(1, numberOfRequests) .parallel(25) .runOn(Schedulers.boundedElastic()) .flatMap(ignored -> httpClient.send(new HttpRequest(HttpMethod.GET, url(server, LONG_BODY_PATH))) - .flatMapMany(response -> { - MessageDigest md = md5Digest(); - return response.getBody() - .doOnNext(buffer -> md.update(buffer.duplicate())) - .doOnComplete(() -> assertArrayEquals(expectedDigest, md.digest())); - })) + .flatMap(HttpResponse::getBodyAsByteArray) + .doOnNext(bytes -> assertArrayEquals(LONG_BODY, bytes))) .sequential() - .map(ByteBuffer::remaining) + .map(bytes -> (long) bytes.length) .reduce(0L, Long::sum); StepVerifier.create(numberOfBytesMono) @@ -278,20 +269,6 @@ public void testConcurrentRequests() { .verify(Duration.ofSeconds(60)); } - private static MessageDigest md5Digest() { - try { - return MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - } - - private static byte[] digest() { - MessageDigest md = md5Digest(); - md.update(NettyAsyncHttpClientTests.LONG_BODY); - return md.digest(); - } - /** * Tests that deep copying the buffers returned by Netty will make the stream returned to the customer resilient to * Netty reclaiming them once the 'onNext' operator chain has completed. diff --git a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpResponseTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpResponseTests.java index 4d14cb75960e2..c3426e3bd47cd 100644 --- a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpResponseTests.java +++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/NettyAsyncHttpResponseTests.java @@ -14,8 +14,6 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.parallel.Execution; -import org.junit.jupiter.api.parallel.Isolated; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -160,16 +158,6 @@ public void getBodyAsStringWithCharset() { .verifyComplete(); } - @Test - public void close() { - Connection connection = mock(Connection.class); - when(connection.isDisposed()).thenReturn(true); - - new NettyAsyncHttpResponse(null, connection, REQUEST, false).close(); - - verify(connection, times(1)).isDisposed(); - } - @ParameterizedTest @MethodSource("verifyDisposalSupplier") public void verifyDisposal(String methodName, Class[] argumentTypes, Object[] argumentValues) @@ -195,7 +183,6 @@ public void verifyDisposal(String methodName, Class[] argumentTypes, Object[] Connection connection = mock(Connection.class); when(connection.inbound()).thenReturn(nettyInbound); - when(connection.isDisposed()).thenReturn(false); when(connection.channel()).thenReturn(channel); NettyAsyncHttpResponse response = new NettyAsyncHttpResponse(reactorNettyResponse, connection, REQUEST, @@ -208,7 +195,6 @@ public void verifyDisposal(String methodName, Class[] argumentTypes, Object[] ((Flux) object).blockLast(); } - verify(connection, times(1)).isDisposed(); verify(eventLoop, times(1)).execute(any()); } diff --git a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java index 5d3bd4946186a..56da0ba2d6ff7 100644 --- a/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java +++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/implementation/HttpResponseDrainsBufferTests.java @@ -19,11 +19,11 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.api.parallel.Isolated; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; -import java.net.MalformedURLException; -import java.net.URL; import java.security.SecureRandom; import java.time.Duration; import java.util.Collection; @@ -45,11 +45,12 @@ @Isolated @Execution(ExecutionMode.SAME_THREAD) public class HttpResponseDrainsBufferTests { + private static final String LONG_BODY_PATH = "/long"; + private static final byte[] LONG_BODY = new byte[4 * 1024 * 1024]; // 4 MB + private static ResourceLeakDetector.Level originalLevel; private static WireMockServer wireMockServer; - - private static final String LONG_BODY_PATH = "/long"; - private static final byte[] LONG_BODY = new byte[16 * 1024 * 1024]; // 16 MB + private static String url; static { new SecureRandom().nextBytes(LONG_BODY); @@ -67,10 +68,13 @@ public static void setupMockServer() { wireMockServer = new WireMockServer(wireMockConfig() .dynamicPort() .disableRequestJournal() + .asynchronousResponseEnabled(true) .gzipDisabled(true)); wireMockServer.stubFor(get(LONG_BODY_PATH).willReturn(aResponse().withBody(LONG_BODY))); wireMockServer.start(); + + url = wireMockServer.baseUrl() + LONG_BODY_PATH; } @BeforeEach @@ -93,51 +97,63 @@ public static void tearDownMockServer() { } @Test - public void closeHttpResponseWithoutConsumingBody() throws InterruptedException { + public void closeHttpResponseWithoutConsumingBody() { runScenario(response -> Mono.fromRunnable(response::close)); } @Test - public void closeHttpResponseWithConsumingPartialBody() throws InterruptedException { + public void closeHttpResponseWithConsumingPartialBody() { runScenario(response -> response.getBody().next().flatMap(ignored -> Mono.fromRunnable(response::close))); } @Test - public void closeHttpResponseWithConsumingFullBody() throws InterruptedException { + public void closeHttpResponseWithConsumingFullBody() { runScenario(response -> response.getBodyAsByteArray().flatMap(ignored -> Mono.fromRunnable(response::close))); } - @Test - public void closingHttpResponseIsIdempotent() { - HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); - StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, url(wireMockServer))) - .flatMap(response -> Mono.fromRunnable(response::close).thenReturn(response)) - .delayElement(Duration.ofSeconds(2)) - .flatMap(response -> Mono.fromRunnable(response::close)) - .delayElement(Duration.ofSeconds(2)) - .then()) - .verifyComplete(); - } - - private void runScenario(Function> responseConsumer) throws InterruptedException { + private void runScenario(Function> responseConsumer) { HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); - Mono requestMaker = httpClient.send(new HttpRequest(HttpMethod.GET, url(wireMockServer))) - .flatMap(responseConsumer) - .repeat(100) + Mono requestMaker = Flux.generate(() -> 0, (callCount, sink) -> { + if (callCount == 100) { + sink.complete(); + return callCount; + } + + sink.next(callCount); + return callCount + 1; + }).concatMap(ignored -> httpClient.send(new HttpRequest(HttpMethod.GET, url)).flatMap(responseConsumer)) + .parallel(10) + .runOn(Schedulers.boundedElastic()) .then(); StepVerifier.create(requestMaker).verifyComplete(); - // GC twice to ensure full cleanup. - Thread.sleep(2000); - Runtime.getRuntime().gc(); - - Thread.sleep(2000); - Runtime.getRuntime().gc(); + try { + // GC twice to ensure full cleanup. + Thread.sleep(1000); + Runtime.getRuntime().gc(); + + Thread.sleep(1000); + Runtime.getRuntime().gc(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } assertEquals(0, testResourceLeakDetectorFactory.getTotalReportedLeakCount()); } + @Test + public void closingHttpResponseIsIdempotent() { + HttpClient httpClient = new NettyAsyncHttpClientProvider().createInstance(); + StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, url)) + .flatMap(response -> Mono.fromRunnable(response::close).thenReturn(response)) + .delayElement(Duration.ofSeconds(1)) + .flatMap(response -> Mono.fromRunnable(response::close)) + .delayElement(Duration.ofSeconds(1)) + .then()) + .verifyComplete(); + } + private static final class TestResourceLeakDetectorFactory extends ResourceLeakDetectorFactory { private final Collection> createdDetectors = new ConcurrentLinkedDeque<>(); @@ -181,12 +197,4 @@ public int getReportedLeakCount() { return reportTracedLeakCount.get() + reportUntracedLeakCount.get(); } } - - private static URL url(WireMockServer server) { - try { - return new URL("http://localhost:" + server.port() + LONG_BODY_PATH); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } } diff --git a/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java b/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java index f6959384722ad..274996dfe88ac 100644 --- a/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java +++ b/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java @@ -41,8 +41,6 @@ import com.azure.core.test.implementation.entities.HttpBinHeaders; import com.azure.core.test.implementation.entities.HttpBinJSON; import com.azure.core.util.FluxUtil; -import com.azure.core.util.serializer.JacksonAdapter; -import com.azure.core.util.serializer.SerializerAdapter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; @@ -63,6 +61,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -156,7 +155,7 @@ public void asyncRequestWithByteArrayReturnTypeAndParameterizedHostAndPath() { public void syncRequestWithEmptyByteArrayReturnTypeAndParameterizedHostAndPath() { final byte[] result = createService(Service2.class).getByteArray("localhost", 0); - // If no body then for async returns Mono.empty() for sync return null. + // If there isn't a body then for async returns Mono.empty() for sync return null. assertNull(result); } @@ -174,11 +173,7 @@ private interface Service3 { @Test public void syncGetRequestWithNoReturn() { - try { - createService(Service3.class).getNothing(); - } catch (Throwable throwable) { - fail("Received unexpected exception."); - } + assertDoesNotThrow(() -> createService(Service3.class).getNothing()); } @Test @@ -593,16 +588,14 @@ public void asyncPutRequestWithBodyAndMoreThanContentLength() { @Test public void syncPutRequestWithUnexpectedResponse() { - try { - createService(Service9.class).putWithUnexpectedResponse("I'm the body!"); - fail("Expected HttpResponseException would be thrown."); - } catch (HttpResponseException e) { - assertNotNull(e.getValue()); - assertTrue(e.getValue() instanceof LinkedHashMap); - - @SuppressWarnings("unchecked") final LinkedHashMap expectedBody = (LinkedHashMap) e.getValue(); - assertEquals("I'm the body!", expectedBody.get("data")); - } + HttpResponseException e = assertThrows(HttpResponseException.class, + () -> createService(Service9.class).putWithUnexpectedResponse("I'm the body!")); + + assertNotNull(e.getValue()); + assertTrue(e.getValue() instanceof LinkedHashMap); + + @SuppressWarnings("unchecked") final LinkedHashMap expectedBody = (LinkedHashMap) e.getValue(); + assertEquals("I'm the body!", expectedBody.get("data")); } @Test @@ -621,15 +614,11 @@ public void asyncPutRequestWithUnexpectedResponse() { @Test public void syncPutRequestWithUnexpectedResponseAndExceptionType() { - try { - createService(Service9.class).putWithUnexpectedResponseAndExceptionType("I'm the body!"); - fail("Expected HttpResponseException would be thrown."); - } catch (MyRestException e) { - assertNotNull(e.getValue()); - assertEquals("I'm the body!", e.getValue().data()); - } catch (Throwable e) { - fail("Expected MyRestException would be thrown. Instead got " + e.getClass().getSimpleName()); - } + MyRestException e = assertThrows(MyRestException.class, () -> + createService(Service9.class).putWithUnexpectedResponseAndExceptionType("I'm the body!")); + + assertNotNull(e.getValue()); + assertEquals("I'm the body!", e.getValue().data()); } @Test @@ -645,15 +634,11 @@ public void asyncPutRequestWithUnexpectedResponseAndExceptionType() { @Test public void syncPutRequestWithUnexpectedResponseAndDeterminedExceptionType() { - try { - createService(Service9.class).putWithUnexpectedResponseAndDeterminedExceptionType("I'm the body!"); - fail("Expected HttpResponseException would be thrown."); - } catch (MyRestException e) { - assertNotNull(e.getValue()); - assertEquals("I'm the body!", e.getValue().data()); - } catch (Throwable e) { - fail("Expected MyRestException would be thrown. Instead got " + e.getClass().getSimpleName()); - } + MyRestException e = assertThrows(MyRestException.class, + () -> createService(Service9.class).putWithUnexpectedResponseAndDeterminedExceptionType("I'm the body!")); + + assertNotNull(e.getValue()); + assertEquals("I'm the body!", e.getValue().data()); } @Test @@ -669,15 +654,11 @@ public void asyncPutRequestWithUnexpectedResponseAndDeterminedExceptionType() { @Test public void syncPutRequestWithUnexpectedResponseAndFallthroughExceptionType() { - try { - createService(Service9.class).putWithUnexpectedResponseAndFallthroughExceptionType("I'm the body!"); - fail("Expected HttpResponseException would be thrown."); - } catch (MyRestException e) { - assertNotNull(e.getValue()); - assertEquals("I'm the body!", e.getValue().data()); - } catch (Throwable e) { - fail("Expected MyRestException would be thrown. Instead got " + e.getClass().getSimpleName()); - } + MyRestException e = assertThrows(MyRestException.class, + () -> createService(Service9.class).putWithUnexpectedResponseAndFallthroughExceptionType("I'm the body!")); + + assertNotNull(e.getValue()); + assertEquals("I'm the body!", e.getValue().data()); } @Test @@ -693,18 +674,14 @@ public void asyncPutRequestWithUnexpectedResponseAndFallthroughExceptionType() { @Test public void syncPutRequestWithUnexpectedResponseAndNoFallthroughExceptionType() { - try { - createService(Service9.class).putWithUnexpectedResponseAndNoFallthroughExceptionType("I'm the body!"); - fail("Expected HttpResponseException would be thrown."); - } catch (HttpResponseException e) { - assertNotNull(e.getValue()); - assertTrue(e.getValue() instanceof LinkedHashMap); - - @SuppressWarnings("unchecked") final LinkedHashMap expectedBody = (LinkedHashMap) e.getValue(); - assertEquals("I'm the body!", expectedBody.get("data")); - } catch (Throwable e) { - fail("Expected MyRestException would be thrown. Instead got " + e.getClass().getSimpleName()); - } + HttpResponseException e = assertThrows(HttpResponseException.class, + () -> createService(Service9.class).putWithUnexpectedResponseAndNoFallthroughExceptionType("I'm the body!")); + + assertNotNull(e.getValue()); + assertTrue(e.getValue() instanceof LinkedHashMap); + + @SuppressWarnings("unchecked") final LinkedHashMap expectedBody = (LinkedHashMap) e.getValue(); + assertEquals("I'm the body!", expectedBody.get("data")); } @Test @@ -1011,7 +988,7 @@ public void service18GetStatus200() { @Test public void service18GetStatus200WithExpectedResponse200() { - createService(Service18.class).getStatus200WithExpectedResponse200(); + assertDoesNotThrow(() -> createService(Service18.class).getStatus200WithExpectedResponse200()); } @Test @@ -1021,7 +998,7 @@ public void service18GetStatus300() { @Test public void service18GetStatus300WithExpectedResponse300() { - createService(Service18.class).getStatus300WithExpectedResponse300(); + assertDoesNotThrow(() -> createService(Service18.class).getStatus300WithExpectedResponse300()); } @Test @@ -1031,7 +1008,7 @@ public void service18GetStatus400() { @Test public void service18GetStatus400WithExpectedResponse400() { - createService(Service18.class).getStatus400WithExpectedResponse400(); + assertDoesNotThrow(() -> createService(Service18.class).getStatus400WithExpectedResponse400()); } @Test @@ -1041,7 +1018,7 @@ public void service18GetStatus500() { @Test public void service18GetStatus500WithExpectedResponse500() { - createService(Service18.class).getStatus500WithExpectedResponse500(); + assertDoesNotThrow(() -> createService(Service18.class).getStatus500WithExpectedResponse500()); } @Host("http://localhost") @@ -1521,12 +1498,10 @@ interface UnexpectedOKService { @Test public void unexpectedHTTPOK() { - try { - createService(UnexpectedOKService.class).getBytes(); - fail(); - } catch (HttpResponseException e) { - assertEquals("Status code 200, (1024-byte body)", e.getMessage()); - } + HttpResponseException e = assertThrows(HttpResponseException.class, + () -> createService(UnexpectedOKService.class).getBytes()); + + assertEquals("Status code 200, (1024-byte body)", e.getMessage()); } @Host("https://www.example.com") @@ -1557,17 +1532,17 @@ interface DownloadService { @Test public void simpleDownloadTest() { - try (StreamResponse response = createService(DownloadService.class).getBytes()) { - StepVerifier.create(response.getValue().map(ByteBuffer::remaining).reduce(0, Integer::sum)) - .assertNext(count -> assertEquals(30720, count)) - .verifyComplete(); - } + StepVerifier.create(Flux.using(() -> createService(DownloadService.class).getBytes(), + response -> response.getValue().map(ByteBuffer::remaining).reduce(0, Integer::sum), + StreamResponse::close)) + .assertNext(count -> assertEquals(30720, count)) + .verifyComplete(); } @Test public void rawFluxDownloadTest() { StepVerifier.create(createService(DownloadService.class).getBytesFlux() - .map(ByteBuffer::remaining).reduce(0, Integer::sum)) + .map(ByteBuffer::remaining).reduce(0, Integer::sum)) .assertNext(count -> assertEquals(30720, count)) .verifyComplete(); } @@ -1597,7 +1572,7 @@ public void fluxUploadTest() throws Exception { .build(); // Response response = RestProxy - .create(FluxUploadService.class, httpPipeline, SERIALIZER).put(stream, Files.size(filePath)); + .create(FluxUploadService.class, httpPipeline).put(stream, Files.size(filePath)); assertEquals("The quick brown fox jumps over the lazy dog", response.getValue().data()); } @@ -1721,7 +1696,7 @@ protected T createService(Class serviceClass, HttpClient httpClient) { .httpClient(httpClient) .build(); - return RestProxy.create(serviceClass, httpPipeline, SERIALIZER); + return RestProxy.create(serviceClass, httpPipeline); } private static void assertMatchWithHttpOrHttps(String url1, String url2) { @@ -1735,6 +1710,4 @@ private static void assertMatchWithHttpOrHttps(String url1, String url2) { } fail("'" + url2 + "' does not match with '" + s1 + "' or '" + s2 + "'."); } - - private static final SerializerAdapter SERIALIZER = new JacksonAdapter(); } From 89874f1d82d79bf9f9b122625c99bbabf51ecf35 Mon Sep 17 00:00:00 2001 From: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Date: Tue, 7 Sep 2021 15:12:04 -0700 Subject: [PATCH 5/5] Docs update --- .../java/com/azure/core/http/netty/implementation/Utility.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java index 49e70acaf3cad..8ae2668e8ce49 100644 --- a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java +++ b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/Utility.java @@ -44,7 +44,7 @@ public static void closeConnection(Connection reactorNettyConnection) { // Given that this is an HttpResponse the only time this will be called is when the outbound has completed. // // From there the only thing that needs to be checked is whether the inbound has been disposed (completed), - // and if not disposed it (aka drain it). + // and if not dispose it (aka drain it). if (!channelOperations.isInboundDisposed()) { channelOperations.channel().eventLoop().execute(channelOperations::discard); }