From 89459220d8631b4ed54046afdecade0f237ac1ac Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 2 Jan 2020 14:22:12 +0100 Subject: [PATCH 1/6] Make EC2 Discovery Plugin Retry Requests Use the default retry condition instead of never retrying in the discovery plugin causing hot retries upstream and add a test that verifies retrying works. Closes #50462 --- .../discovery/ec2/AwsEc2ServiceImpl.java | 4 +- .../discovery/ec2/EC2RetriesTests.java | 269 ++++++++++++++++++ 2 files changed, 271 insertions(+), 2 deletions(-) create mode 100644 plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java index 739b964925c3e..52f5ee9b6c90c 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java @@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; class AwsEc2ServiceImpl implements AwsEc2Service { - + private static final Logger logger = LogManager.getLogger(AwsEc2ServiceImpl.class); private final AtomicReference> lazyClientReference = @@ -79,7 +79,7 @@ static ClientConfiguration buildConfiguration(Logger logger, Ec2ClientSettings c // Increase the number of retries in case of 5xx API responses final Random rand = Randomness.get(); final RetryPolicy retryPolicy = new RetryPolicy( - RetryPolicy.RetryCondition.NO_RETRY_CONDITION, + null, (originalRequest, exception, retriesAttempted) -> { // with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000) logger.warn("EC2 API request failed, retry again. Reason was:", exception); diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java new file mode 100644 index 0000000000000..d8b1d6b3aa44c --- /dev/null +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java @@ -0,0 +1,269 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.ec2; + +import com.amazonaws.http.HttpMethodName; +import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpStatus; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.elasticsearch.Version; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.discovery.SeedHostsProvider; +import org.elasticsearch.discovery.SeedHostsResolver; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.mocksocket.MockHttpServer; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.MockNioTransport; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; + +import javax.xml.XMLConstants; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamWriter; + +import java.io.IOException; +import java.io.StringWriter; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +@SuppressForbidden(reason = "use a http server") +public class EC2RetriesTests extends ESTestCase { + + private HttpServer httpServer; + + private ThreadPool threadPool; + + private MockTransportService transportService; + + @Before + public void setUp() throws Exception { + httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + httpServer.start(); + threadPool = new TestThreadPool(EC2RetriesTests.class.getName()); + final MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, + new NetworkService(Collections.emptyList()), + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService()) { + @Override + public BoundTransportAddress boundAddress() { + return new BoundTransportAddress( + new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)}, + new TransportAddress(InetAddress.getLoopbackAddress(), 9300) + ); + } + }; + transportService = + new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + super.setUp(); + } + + @After + public void tearDown() throws Exception { + try { + IOUtils.close(transportService, () -> terminate(threadPool), () -> httpServer.stop(0)); + } finally { + super.tearDown(); + } + } + + public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException { + final String accessKey = "ec2_access"; + final List hosts = List.of("127.0.0.1:9000"); + final Set failedRequests = Collections.newSetFromMap(new ConcurrentHashMap<>()); + httpServer.createContext("/", exchange -> { + if (exchange.getRequestMethod().equals(HttpMethodName.POST.name())) { + final String request = new String(exchange.getRequestBody().readAllBytes(), UTF_8); + final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent"); + if (userAgent != null && userAgent.startsWith("aws-sdk-java")) { + final String auth = exchange.getRequestHeaders().getFirst("Authorization"); + if (auth == null || auth.contains(accessKey) == false) { + throw new IllegalArgumentException("wrong access key: " + auth); + } + if (failedRequests.add(exchange.getRequestHeaders().getFirst("Amz-sdk-invocation-id"))) { + exchange.sendResponseHeaders(HttpStatus.SC_SERVICE_UNAVAILABLE, -1); + return; + } + // Simulate an EC2 DescribeInstancesResponse + byte[] responseBody = null; + for (NameValuePair parse : URLEncodedUtils.parse(request, UTF_8)) { + if ("Action".equals(parse.getName())) { + responseBody = generateDescribeInstancesResponse(hosts); + break; + } + } + responseBody = responseBody == null ? new byte[0] : responseBody; + exchange.getResponseHeaders().set("Content-Type", "text/xml; charset=UTF-8"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, responseBody.length); + exchange.getResponseBody().write(responseBody); + } + } + }); + + final InetSocketAddress address = httpServer.getAddress(); + final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); + final MockSecureSettings mockSecure = new MockSecureSettings(); + mockSecure.setString(Ec2ClientSettings.ACCESS_KEY_SETTING.getKey(), accessKey); + mockSecure.setString(Ec2ClientSettings.SECRET_KEY_SETTING.getKey(), "ec2_secret"); + try (Ec2DiscoveryPlugin plugin = new Ec2DiscoveryPlugin( + Settings.builder().put(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), endpoint).setSecureSettings(mockSecure).build())) { + final SeedHostsProvider seedHostsProvider = + plugin.getSeedHostProviders(transportService, new NetworkService(Collections.emptyList())).get("ec2").get(); + final SeedHostsResolver resolver = new SeedHostsResolver("test", Settings.EMPTY, transportService, seedHostsProvider); + resolver.start(); + final List addressList = seedHostsProvider.getSeedAddresses(resolver); + assertThat(addressList, Matchers.hasSize(1)); + assertThat(addressList.get(0).toString(), is(hosts.get(0))); + assertThat(failedRequests, hasSize(1)); + } + } + + /** + * Generates a XML response that describe the EC2 instances + * TODO: org.elasticsearch.discovery.ec2.AmazonEC2Fixture uses pretty much the same code. We should dry up that test fixture. + */ + private byte[] generateDescribeInstancesResponse(List nodes) { + final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory(); + xmlOutputFactory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true); + + final StringWriter out = new StringWriter(); + XMLStreamWriter sw; + try { + sw = xmlOutputFactory.createXMLStreamWriter(out); + sw.writeStartDocument(); + + String namespace = "http://ec2.amazonaws.com/doc/2013-02-01/"; + sw.setDefaultNamespace(namespace); + sw.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, "DescribeInstancesResponse", namespace); + { + sw.writeStartElement("requestId"); + sw.writeCharacters(UUID.randomUUID().toString()); + sw.writeEndElement(); + + sw.writeStartElement("reservationSet"); + { + for (String address : nodes) { + sw.writeStartElement("item"); + { + sw.writeStartElement("reservationId"); + sw.writeCharacters(UUID.randomUUID().toString()); + sw.writeEndElement(); + + sw.writeStartElement("instancesSet"); + { + sw.writeStartElement("item"); + { + sw.writeStartElement("instanceId"); + sw.writeCharacters(UUID.randomUUID().toString()); + sw.writeEndElement(); + + sw.writeStartElement("imageId"); + sw.writeCharacters(UUID.randomUUID().toString()); + sw.writeEndElement(); + + sw.writeStartElement("instanceState"); + { + sw.writeStartElement("code"); + sw.writeCharacters("16"); + sw.writeEndElement(); + + sw.writeStartElement("name"); + sw.writeCharacters("running"); + sw.writeEndElement(); + } + sw.writeEndElement(); + + sw.writeStartElement("privateDnsName"); + sw.writeCharacters(address); + sw.writeEndElement(); + + sw.writeStartElement("dnsName"); + sw.writeCharacters(address); + sw.writeEndElement(); + + sw.writeStartElement("instanceType"); + sw.writeCharacters("m1.medium"); + sw.writeEndElement(); + + sw.writeStartElement("placement"); + { + sw.writeStartElement("availabilityZone"); + sw.writeCharacters("use-east-1e"); + sw.writeEndElement(); + + sw.writeEmptyElement("groupName"); + + sw.writeStartElement("tenancy"); + sw.writeCharacters("default"); + sw.writeEndElement(); + } + sw.writeEndElement(); + + sw.writeStartElement("privateIpAddress"); + sw.writeCharacters(address); + sw.writeEndElement(); + + sw.writeStartElement("ipAddress"); + sw.writeCharacters(address); + sw.writeEndElement(); + } + sw.writeEndElement(); + } + sw.writeEndElement(); + } + sw.writeEndElement(); + } + sw.writeEndElement(); + } + sw.writeEndElement(); + + sw.writeEndDocument(); + sw.flush(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return out.toString().getBytes(UTF_8); + } +} From 7a5b21f5d243037d0a747ca3ab0502b84b1862fc Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 2 Jan 2020 14:26:28 +0100 Subject: [PATCH 2/6] nicer --- .../elasticsearch/discovery/ec2/EC2RetriesTests.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java index d8b1d6b3aa44c..ba208d946f84c 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.core.internal.io.IOUtils; @@ -85,15 +84,7 @@ public void setUp() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService()) { - @Override - public BoundTransportAddress boundAddress() { - return new BoundTransportAddress( - new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)}, - new TransportAddress(InetAddress.getLoopbackAddress(), 9300) - ); - } - }; + new NoneCircuitBreakerService()); transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); super.setUp(); From 9c8c589deb08e8ab67ba96ea0eca023756db5dbd Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 2 Jan 2020 14:28:17 +0100 Subject: [PATCH 3/6] nicer --- .../org/elasticsearch/discovery/ec2/EC2RetriesTests.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java index ba208d946f84c..c4cc202b36a61 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java @@ -75,13 +75,15 @@ public class EC2RetriesTests extends ESTestCase { private MockTransportService transportService; + private NetworkService networkService = new NetworkService(Collections.emptyList()); + @Before public void setUp() throws Exception { httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer.start(); threadPool = new TestThreadPool(EC2RetriesTests.class.getName()); final MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, - new NetworkService(Collections.emptyList()), + networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); @@ -139,8 +141,7 @@ public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException { mockSecure.setString(Ec2ClientSettings.SECRET_KEY_SETTING.getKey(), "ec2_secret"); try (Ec2DiscoveryPlugin plugin = new Ec2DiscoveryPlugin( Settings.builder().put(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), endpoint).setSecureSettings(mockSecure).build())) { - final SeedHostsProvider seedHostsProvider = - plugin.getSeedHostProviders(transportService, new NetworkService(Collections.emptyList())).get("ec2").get(); + final SeedHostsProvider seedHostsProvider = plugin.getSeedHostProviders(transportService, networkService).get("ec2").get(); final SeedHostsResolver resolver = new SeedHostsResolver("test", Settings.EMPTY, transportService, seedHostsProvider); resolver.start(); final List addressList = seedHostsProvider.getSeedAddresses(resolver); From 2d835a8cda8e04832b9e06633d389ab0145f60dd Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 2 Jan 2020 14:29:26 +0100 Subject: [PATCH 4/6] nicer --- .../org/elasticsearch/discovery/ec2/EC2RetriesTests.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java index c4cc202b36a61..60fcc2a50aa7f 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java @@ -82,10 +82,8 @@ public void setUp() throws Exception { httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer.start(); threadPool = new TestThreadPool(EC2RetriesTests.class.getName()); - final MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, - networkService, - PageCacheRecycler.NON_RECYCLING_INSTANCE, - new NamedWriteableRegistry(Collections.emptyList()), + final MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, networkService, + PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); From d015576900c80123733e36ce2a8f5a4b8c897b41 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 2 Jan 2020 14:53:16 +0100 Subject: [PATCH 5/6] default retry backoff --- .../discovery/ec2/AwsEc2ServiceImpl.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java index 52f5ee9b6c90c..d86b7e5471a5b 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java @@ -31,11 +31,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.LazyInitializable; -import java.util.Random; import java.util.concurrent.atomic.AtomicReference; class AwsEc2ServiceImpl implements AwsEc2Service { @@ -77,16 +75,7 @@ static ClientConfiguration buildConfiguration(Logger logger, Ec2ClientSettings c clientConfiguration.setProxyPassword(clientSettings.proxyPassword); } // Increase the number of retries in case of 5xx API responses - final Random rand = Randomness.get(); - final RetryPolicy retryPolicy = new RetryPolicy( - null, - (originalRequest, exception, retriesAttempted) -> { - // with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000) - logger.warn("EC2 API request failed, retry again. Reason was:", exception); - return 1000L * (long) (10d * Math.pow(2, retriesAttempted / 2.0d) * (1.0d + rand.nextDouble())); - }, - 10, - false); + final RetryPolicy retryPolicy = new RetryPolicy(null, null, 10, false); clientConfiguration.setRetryPolicy(retryPolicy); clientConfiguration.setSocketTimeout(clientSettings.readTimeoutMillis); return clientConfiguration; From ec56d958a0c424e61668a16e0e4fa7b16d342e1e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 2 Jan 2020 15:44:21 +0100 Subject: [PATCH 6/6] CR: comments --- .../discovery/ec2/AwsEc2ServiceImpl.java | 4 +--- .../discovery/ec2/EC2RetriesTests.java | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java index d86b7e5471a5b..546634c88cf45 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2ServiceImpl.java @@ -25,7 +25,6 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.http.IdleConnectionReaper; -import com.amazonaws.retry.RetryPolicy; import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.AmazonEC2Client; import org.apache.logging.log4j.LogManager; @@ -75,8 +74,7 @@ static ClientConfiguration buildConfiguration(Logger logger, Ec2ClientSettings c clientConfiguration.setProxyPassword(clientSettings.proxyPassword); } // Increase the number of retries in case of 5xx API responses - final RetryPolicy retryPolicy = new RetryPolicy(null, null, 10, false); - clientConfiguration.setRetryPolicy(retryPolicy); + clientConfiguration.setMaxErrorRetry(10); clientConfiguration.setSocketTimeout(clientSettings.readTimeoutMillis); return clientConfiguration; } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java index 60fcc2a50aa7f..df8effd9548b6 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/EC2RetriesTests.java @@ -58,12 +58,13 @@ import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.is; @SuppressForbidden(reason = "use a http server") @@ -102,7 +103,9 @@ public void tearDown() throws Exception { public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException { final String accessKey = "ec2_access"; final List hosts = List.of("127.0.0.1:9000"); - final Set failedRequests = Collections.newSetFromMap(new ConcurrentHashMap<>()); + final Map failedRequests = new ConcurrentHashMap<>(); + // retry the same request 5 times at most + final int maxRetries = randomIntBetween(1, 5); httpServer.createContext("/", exchange -> { if (exchange.getRequestMethod().equals(HttpMethodName.POST.name())) { final String request = new String(exchange.getRequestBody().readAllBytes(), UTF_8); @@ -112,7 +115,8 @@ public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException { if (auth == null || auth.contains(accessKey) == false) { throw new IllegalArgumentException("wrong access key: " + auth); } - if (failedRequests.add(exchange.getRequestHeaders().getFirst("Amz-sdk-invocation-id"))) { + if (failedRequests.compute(exchange.getRequestHeaders().getFirst("Amz-sdk-invocation-id"), + (requestId, count) -> Objects.requireNonNullElse(count, 0) + 1) < maxRetries) { exchange.sendResponseHeaders(HttpStatus.SC_SERVICE_UNAVAILABLE, -1); return; } @@ -128,8 +132,10 @@ public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException { exchange.getResponseHeaders().set("Content-Type", "text/xml; charset=UTF-8"); exchange.sendResponseHeaders(HttpStatus.SC_OK, responseBody.length); exchange.getResponseBody().write(responseBody); + return; } } + fail("did not send response"); }); final InetSocketAddress address = httpServer.getAddress(); @@ -145,7 +151,8 @@ public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException { final List addressList = seedHostsProvider.getSeedAddresses(resolver); assertThat(addressList, Matchers.hasSize(1)); assertThat(addressList.get(0).toString(), is(hosts.get(0))); - assertThat(failedRequests, hasSize(1)); + assertThat(failedRequests, aMapWithSize(1)); + assertThat(failedRequests.values().iterator().next(), is(maxRetries)); } }