Skip to content

Commit

Permalink
Make EC2 Discovery Plugin Retry Requests (#50550) (#50558)
Browse files Browse the repository at this point in the history
Use the default retry condition instead of never retrying in the discovery plugin causing hot retries upstream and add a test that verifies retrying works.

Closes #50462
  • Loading branch information
original-brownbear committed Jan 2, 2020
1 parent b36a8ab commit 8092a49
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,18 @@
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.http.IdleConnectionReaper;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.LazyInitializable;

import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;

class AwsEc2ServiceImpl implements AwsEc2Service {

private static final Logger logger = LogManager.getLogger(AwsEc2ServiceImpl.class);

private final AtomicReference<LazyInitializable<AmazonEc2Reference, ElasticsearchException>> lazyClientReference =
Expand Down Expand Up @@ -77,17 +74,7 @@ static ClientConfiguration buildConfiguration(Logger logger, Ec2ClientSettings c
clientConfiguration.setProxyPassword(clientSettings.proxyPassword);
}
// Increase the number of retries in case of 5xx API responses
final Random rand = Randomness.get();
final RetryPolicy retryPolicy = new RetryPolicy(
RetryPolicy.RetryCondition.NO_RETRY_CONDITION,
(originalRequest, exception, retriesAttempted) -> {
// with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000)
logger.warn("EC2 API request failed, retry again. Reason was:", exception);
return 1000L * (long) (10d * Math.pow(2, retriesAttempted / 2.0d) * (1.0d + rand.nextDouble()));
},
10,
false);
clientConfiguration.setRetryPolicy(retryPolicy);
clientConfiguration.setMaxErrorRetry(10);
clientConfiguration.setSocketTimeout(clientSettings.readTimeoutMillis);
return clientConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.ec2;

import com.amazonaws.http.HttpMethodName;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.SeedHostsProvider;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;

import javax.xml.XMLConstants;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;

import java.io.IOException;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.is;

@SuppressForbidden(reason = "use a http server")
public class EC2RetriesTests extends ESTestCase {

private HttpServer httpServer;

private ThreadPool threadPool;

private MockTransportService transportService;

private NetworkService networkService = new NetworkService(Collections.emptyList());

@Before
public void setUp() throws Exception {
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
threadPool = new TestThreadPool(EC2RetriesTests.class.getName());
final MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService());
transportService =
new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
super.setUp();
}

@After
public void tearDown() throws Exception {
try {
IOUtils.close(transportService, () -> terminate(threadPool), () -> httpServer.stop(0));
} finally {
super.tearDown();
}
}

public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException {
final String accessKey = "ec2_access";
final List<String> hosts = Collections.singletonList("127.0.0.1:9000");
final Map<String, Integer> failedRequests = new ConcurrentHashMap<>();
// retry the same request 5 times at most
final int maxRetries = randomIntBetween(1, 5);
httpServer.createContext("/", exchange -> {
if (exchange.getRequestMethod().equals(HttpMethodName.POST.name())) {
final String request = Streams.readFully(exchange.getRequestBody()).utf8ToString();
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
if (userAgent != null && userAgent.startsWith("aws-sdk-java")) {
final String auth = exchange.getRequestHeaders().getFirst("Authorization");
if (auth == null || auth.contains(accessKey) == false) {
throw new IllegalArgumentException("wrong access key: " + auth);
}
if (failedRequests.compute(exchange.getRequestHeaders().getFirst("Amz-sdk-invocation-id"),
(requestId, count) -> (count == null ? 0 : count) + 1) < maxRetries) {
exchange.sendResponseHeaders(HttpStatus.SC_SERVICE_UNAVAILABLE, -1);
return;
}
// Simulate an EC2 DescribeInstancesResponse
byte[] responseBody = null;
for (NameValuePair parse : URLEncodedUtils.parse(request, UTF_8)) {
if ("Action".equals(parse.getName())) {
responseBody = generateDescribeInstancesResponse(hosts);
break;
}
}
responseBody = responseBody == null ? new byte[0] : responseBody;
exchange.getResponseHeaders().set("Content-Type", "text/xml; charset=UTF-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, responseBody.length);
exchange.getResponseBody().write(responseBody);
return;
}
}
fail("did not send response");
});

final InetSocketAddress address = httpServer.getAddress();
final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
final MockSecureSettings mockSecure = new MockSecureSettings();
mockSecure.setString(Ec2ClientSettings.ACCESS_KEY_SETTING.getKey(), accessKey);
mockSecure.setString(Ec2ClientSettings.SECRET_KEY_SETTING.getKey(), "ec2_secret");
try (Ec2DiscoveryPlugin plugin = new Ec2DiscoveryPlugin(
Settings.builder().put(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), endpoint).setSecureSettings(mockSecure).build())) {
final SeedHostsProvider seedHostsProvider = plugin.getSeedHostProviders(transportService, networkService).get("ec2").get();
final SeedHostsResolver resolver = new SeedHostsResolver("test", Settings.EMPTY, transportService, seedHostsProvider);
resolver.start();
final List<TransportAddress> addressList = seedHostsProvider.getSeedAddresses(null);
assertThat(addressList, Matchers.hasSize(1));
assertThat(addressList.get(0).toString(), is(hosts.get(0)));
assertThat(failedRequests, aMapWithSize(1));
assertThat(failedRequests.values().iterator().next(), is(maxRetries));
}
}

/**
* Generates a XML response that describe the EC2 instances
* TODO: org.elasticsearch.discovery.ec2.AmazonEC2Fixture uses pretty much the same code. We should dry up that test fixture.
*/
private byte[] generateDescribeInstancesResponse(List<String> nodes) {
final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
xmlOutputFactory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);

final StringWriter out = new StringWriter();
XMLStreamWriter sw;
try {
sw = xmlOutputFactory.createXMLStreamWriter(out);
sw.writeStartDocument();

String namespace = "http://ec2.amazonaws.com/doc/2013-02-01/";
sw.setDefaultNamespace(namespace);
sw.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, "DescribeInstancesResponse", namespace);
{
sw.writeStartElement("requestId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();

sw.writeStartElement("reservationSet");
{
for (String address : nodes) {
sw.writeStartElement("item");
{
sw.writeStartElement("reservationId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();

sw.writeStartElement("instancesSet");
{
sw.writeStartElement("item");
{
sw.writeStartElement("instanceId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();

sw.writeStartElement("imageId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();

sw.writeStartElement("instanceState");
{
sw.writeStartElement("code");
sw.writeCharacters("16");
sw.writeEndElement();

sw.writeStartElement("name");
sw.writeCharacters("running");
sw.writeEndElement();
}
sw.writeEndElement();

sw.writeStartElement("privateDnsName");
sw.writeCharacters(address);
sw.writeEndElement();

sw.writeStartElement("dnsName");
sw.writeCharacters(address);
sw.writeEndElement();

sw.writeStartElement("instanceType");
sw.writeCharacters("m1.medium");
sw.writeEndElement();

sw.writeStartElement("placement");
{
sw.writeStartElement("availabilityZone");
sw.writeCharacters("use-east-1e");
sw.writeEndElement();

sw.writeEmptyElement("groupName");

sw.writeStartElement("tenancy");
sw.writeCharacters("default");
sw.writeEndElement();
}
sw.writeEndElement();

sw.writeStartElement("privateIpAddress");
sw.writeCharacters(address);
sw.writeEndElement();

sw.writeStartElement("ipAddress");
sw.writeCharacters(address);
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();

sw.writeEndDocument();
sw.flush();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return out.toString().getBytes(UTF_8);
}
}

0 comments on commit 8092a49

Please sign in to comment.