Skip to content

Commit

Permalink
Improve threadpool usage and error handling for API key validation (#…
Browse files Browse the repository at this point in the history
…58090) (#59047)

The PR introduces following two changes:

Move API key validation into a new separate threadpool. The new threadpool is created separately with half of the available processors and 1000 in queue size. We could combine it with the existing TokenService's threadpool. Technically it is straightforward, but I am not sure whether it could be a rushed optimization since I am not clear about potential impact on the token service.

On threadpoool saturation, it now fails with EsRejectedExecutionException which in turns gives back a 429, instead of 401 status code to users.
  • Loading branch information
ywangd committed Jul 6, 2020
1 parent 4a791e8 commit 66c0231
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -289,6 +290,8 @@
public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin, NetworkPlugin, ClusterPlugin,
DiscoveryPlugin, MapperPlugin, ExtensiblePlugin {

public static final String SECURITY_CRYPTO_THREAD_POOL_NAME = XPackField.SECURITY + "-crypto";

private static final Logger logger = LogManager.getLogger(Security.class);

private final Settings settings;
Expand Down Expand Up @@ -1027,8 +1030,14 @@ public UnaryOperator<RestHandler> getRestHandlerWrapper(ThreadContext threadCont
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
if (enabled && transportClientMode == false) {
return Collections.singletonList(
new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool"));
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
return org.elasticsearch.common.collect.List.of(
new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000,
"xpack.security.authc.token.thread_pool", false),
new FixedExecutorBuilder(settings, SECURITY_CRYPTO_THREAD_POOL_NAME,
(allocatedProcessors + 1) / 2, 1000,
"xpack.security.crypto.thread_pool", false)
);
}
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -42,6 +43,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -106,6 +108,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.security.authc.Authentication.AuthenticationType;
import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;

public class ApiKeyService {

Expand Down Expand Up @@ -336,14 +339,26 @@ private void loadApiKeyAndValidateCredentials(ThreadContext ctx, ApiKeyCredentia
executeAsyncWithOrigin(ctx, SECURITY_ORIGIN, getRequest, ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
final Map<String, Object> source = response.getSource();
validateApiKeyCredentials(docId, source, credentials, clock, listener);
validateApiKeyCredentials(docId, source, credentials, clock, ActionListener.delegateResponse(listener, (l, e) -> {
if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
} else {
listener.onFailure(e);
}
}));
} else {
listener.onResponse(
AuthenticationResult.unsuccessful("unable to find apikey with id " + credentials.getId(), null));
}
},
e -> listener.onResponse(AuthenticationResult.unsuccessful(
"apikey authentication for id " + credentials.getId() + " encountered a failure", e))),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
} else {
listener.onResponse(AuthenticationResult.unsuccessful(
"apikey authentication for id " + credentials.getId() + " encountered a failure",e));
}
}),
client::get);
}

Expand Down Expand Up @@ -476,23 +491,31 @@ void validateApiKeyCredentials(String docId, Map<String, Object> source, ApiKeyC
}, listener::onFailure),
threadPool.generic(), threadPool.getThreadContext());
} else {
final boolean verified = verifyKeyAgainstHash(apiKeyHash, credentials);
listenableCacheEntry.onResponse(new CachedApiKeyHashResult(verified, credentials.getKey()));
if (verified) {
// move on
validateApiKeyExpiration(source, credentials, clock, listener);
} else {
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
}
verifyKeyAgainstHash(apiKeyHash, credentials, ActionListener.wrap(
verified -> {
listenableCacheEntry.onResponse(new CachedApiKeyHashResult(verified, credentials.getKey()));
if (verified) {
// move on
validateApiKeyExpiration(source, credentials, clock, listener);
} else {
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
}
}, listener::onFailure
));
}
} else {
final boolean verified = verifyKeyAgainstHash(apiKeyHash, credentials);
if (verified) {
// move on
validateApiKeyExpiration(source, credentials, clock, listener);
} else {
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
}
verifyKeyAgainstHash(apiKeyHash, credentials, ActionListener.wrap(
verified -> {
if (verified) {
// move on
validateApiKeyExpiration(source, credentials, clock, listener);
} else {
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
}
},
listener::onFailure
));

}
}
}
Expand Down Expand Up @@ -560,14 +583,16 @@ static ApiKeyCredentials getCredentialsFromHeader(ThreadContext threadContext) {
}

// Protected instance method so this can be mocked
protected boolean verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials) {
final char[] apiKeyHashChars = apiKeyHash.toCharArray();
try {
protected void verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials, ActionListener<Boolean> listener) {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> {
Hasher hasher = Hasher.resolveFromHash(apiKeyHash.toCharArray());
return hasher.verify(credentials.getKey(), apiKeyHashChars);
} finally {
Arrays.fill(apiKeyHashChars, (char) 0);
}
final char[] apiKeyHashChars = apiKeyHash.toCharArray();
try {
return hasher.verify(credentials.getKey(), apiKeyHashChars);
} finally {
Arrays.fill(apiKeyHashChars, (char) 0);
}
}));
}

private Instant getApiKeyExpiration(Instant now, CreateApiKeyRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.SecurityIntegTestCase;
Expand All @@ -43,6 +49,7 @@
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand All @@ -53,13 +60,17 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -82,6 +93,11 @@ public Settings nodeSettings(int nodeOrdinal) {
.build();
}

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Before
public void waitForSecurityIndexWritable() throws Exception {
assertSecurityIndexActive();
Expand Down Expand Up @@ -839,6 +855,64 @@ public void testDerivedKeys() throws ExecutionException, InterruptedException {
assertApiKeyNotCreated(client,"key-5");
}

public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOException, InterruptedException {
final String nodeName = randomFrom(internalCluster().getNodeNames());
final Settings settings = internalCluster().getInstance(Settings.class, nodeName);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);

final RoleDescriptor descriptor = new RoleDescriptor("auth_only", new String[] { }, null, null);
final Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
SecurityClient securityClient = new SecurityClient(client);
final CreateApiKeyResponse createApiKeyResponse = securityClient.prepareCreateApiKey()
.setName("auth only key")
.setRoleDescriptors(Collections.singletonList(descriptor))
.get();

assertNotNull(createApiKeyResponse.getId());
assertNotNull(createApiKeyResponse.getKey());

final List<NodeInfo> nodeInfos = client().admin().cluster().prepareNodesInfo().get().getNodes().stream()
.filter(nodeInfo -> nodeInfo.getNode().getName().equals(nodeName))
.collect(Collectors.toList());

final ExecutorService executorService = threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME);
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final int numberOfThreads = (allocatedProcessors + 1) / 2;
final CountDownLatch blockingLatch = new CountDownLatch(1);
final CountDownLatch readyLatch = new CountDownLatch(numberOfThreads);

for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(() -> {
readyLatch.countDown();
try {
blockingLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
// Fill the whole queue for the crypto thread pool
final int queueSize = 1000;
IntStream.range(0, queueSize).forEach(i -> executorService.submit(() -> {}));
readyLatch.await();

try (RestClient restClient = createRestClient(nodeInfos, null, "http")) {
final String base64ApiKeyKeyValue = Base64.getEncoder().encodeToString(
(createApiKeyResponse.getId() + ":" + createApiKeyResponse.getKey().toString()).getBytes(StandardCharsets.UTF_8));

final Request authRequest = new Request("GET", "_security/_authenticate");
authRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader(
"Authorization", "ApiKey " + base64ApiKeyKeyValue).build());
final ResponseException responseException = expectThrows(ResponseException.class, () -> restClient.performRequest(authRequest));
assertThat(responseException.getMessage(), containsString("429 Too Many Requests"));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(429));
} finally {
blockingLatch.countDown();
}
}

private void assertApiKeyNotCreated(Client client, String keyName) throws ExecutionException, InterruptedException {
new RefreshRequestBuilder(client, RefreshAction.INSTANCE).setIndices(SECURITY_MAIN_ALIAS).execute().get();
PlainActionFuture<GetApiKeyResponse> getApiKeyResponseListener = new PlainActionFuture<>();
Expand Down
Loading

0 comments on commit 66c0231

Please sign in to comment.