Skip to content

Commit

Permalink
Move hashing on API key creation to crypto thread pool (#74165) (#74417)
Browse files Browse the repository at this point in the history
The changes in #74106 make API keys cached on creation time. It helps avoid the
expensive hashing operation on initial authentication when a request using the
key hits the same node that creates the key. Since the more expensive hashing
on authentication time is handled by a dedicated "crypto" thread pool (#58090),
it is expected that usage of the "crypto" thread pool to be reduced.

This PR moves the hashing on creation time to the "crypto" thread pool so that
a similar (before #74106) usage level of "crypto" thread pool is maintained. It
also has the benefit to avoid costly operations in the transport_worker thread,
which is generally preferred.

Relates: #74106
  • Loading branch information
ywangd committed Jun 22, 2021
1 parent 80a6f56 commit 03fbc35
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ public void testDerivedKeys() throws ExecutionException, InterruptedException {
assertApiKeyNotCreated(client,"key-5");
}

public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOException, InterruptedException, ExecutionException {
public void testCreationAndAuthenticationReturns429WhenThreadPoolIsSaturated() throws Exception {
final String nodeName = randomFrom(internalCluster().getNodeNames());
final Settings settings = internalCluster().getInstance(Settings.class, nodeName);
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
Expand Down Expand Up @@ -1059,9 +1059,17 @@ public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOExc
final Request authRequest = new Request("GET", "_security/_authenticate");
authRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader(
"Authorization", "ApiKey " + base64ApiKeyKeyValue).build());
final ResponseException responseException = expectThrows(ResponseException.class, () -> restClient.performRequest(authRequest));
assertThat(responseException.getMessage(), containsString("429 Too Many Requests"));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(429));
final ResponseException e1 = expectThrows(ResponseException.class, () -> restClient.performRequest(authRequest));
assertThat(e1.getMessage(), containsString("429 Too Many Requests"));
assertThat(e1.getResponse().getStatusLine().getStatusCode(), is(429));

final Request createApiKeyRequest = new Request("POST", "_security/api_key");
createApiKeyRequest.setJsonEntity("{\"name\":\"key\"}");
createApiKeyRequest.setOptions(createApiKeyRequest.getOptions().toBuilder()
.addHeader("Authorization", basicAuthHeaderValue(TEST_SUPERUSER, TEST_PASSWORD_SECURE_STRING)));
final ResponseException e2 = expectThrows(ResponseException.class, () -> restClient.performRequest(createApiKeyRequest));
assertThat(e2.getMessage(), containsString("429 Too Many Requests"));
assertThat(e2.getResponse().getStatusLine().getStatusCode(), is(429));
} finally {
blockingLatch.countDown();
if (lastTaskFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,37 +286,41 @@ private void createApiKeyAndIndexIt(Authentication authentication, CreateApiKeyR
Version.V_6_7_0);
}

try (XContentBuilder builder = newDocument(apiKey, request.getName(), authentication,
roleDescriptorSet, created, expiration,
request.getRoleDescriptors(), version, request.getMetadata())) {

final IndexRequest indexRequest =
client.prepareIndex(SECURITY_MAIN_ALIAS, SINGLE_MAPPING_NAME)
.setSource(builder)
.setRefreshPolicy(request.getRefreshPolicy())
.request();
final BulkRequest bulkRequest = toSingleItemBulkRequest(indexRequest);

securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client, SECURITY_ORIGIN, BulkAction.INSTANCE, bulkRequest,
TransportSingleItemBulkWriteAction.<IndexResponse>wrapBulkResponse(ActionListener.wrap(
indexResponse -> {
final ListenableFuture<CachedApiKeyHashResult> listenableFuture = new ListenableFuture<>();
listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey));
apiKeyAuthCache.put(indexResponse.getId(), listenableFuture);
listener.onResponse(
new CreateApiKeyResponse(request.getName(), indexResponse.getId(), apiKey, expiration));
},
listener::onFailure))));
} catch (IOException e) {
listener.onFailure(e);
}
computeHashForApiKey(apiKey, listener.delegateFailure((l, apiKeyHashChars) -> {
try (XContentBuilder builder = newDocument(apiKeyHashChars, request.getName(), authentication,
roleDescriptorSet, created, expiration,
request.getRoleDescriptors(), version, request.getMetadata())) {

final IndexRequest indexRequest =
client.prepareIndex(SECURITY_MAIN_ALIAS, SINGLE_MAPPING_NAME)
.setSource(builder)
.setRefreshPolicy(request.getRefreshPolicy())
.request();
final BulkRequest bulkRequest = toSingleItemBulkRequest(indexRequest);

securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client, SECURITY_ORIGIN, BulkAction.INSTANCE, bulkRequest,
TransportSingleItemBulkWriteAction.<IndexResponse>wrapBulkResponse(ActionListener.wrap(
indexResponse -> {
final ListenableFuture<CachedApiKeyHashResult> listenableFuture = new ListenableFuture<>();
listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey));
apiKeyAuthCache.put(indexResponse.getId(), listenableFuture);
listener.onResponse(
new CreateApiKeyResponse(request.getName(), indexResponse.getId(), apiKey, expiration));
},
listener::onFailure))));
} catch (IOException e) {
listener.onFailure(e);
} finally {
Arrays.fill(apiKeyHashChars, (char) 0);
}
}));
}

/**
* package-private for testing
*/
XContentBuilder newDocument(SecureString apiKey, String name, Authentication authentication, Set<RoleDescriptor> userRoles,
XContentBuilder newDocument(char[] apiKeyHashChars, String name, Authentication authentication, Set<RoleDescriptor> userRoles,
Instant created, Instant expiration, List<RoleDescriptor> keyRoles,
Version version, @Nullable Map<String, Object> metadata) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
Expand All @@ -328,15 +332,13 @@ XContentBuilder newDocument(SecureString apiKey, String name, Authentication aut


byte[] utf8Bytes = null;
final char[] keyHash = hasher.hash(apiKey);
try {
utf8Bytes = CharArrays.toUtf8Bytes(keyHash);
utf8Bytes = CharArrays.toUtf8Bytes(apiKeyHashChars);
builder.field("api_key_hash").utf8Value(utf8Bytes, 0, utf8Bytes.length);
} finally {
if (utf8Bytes != null) {
Arrays.fill(utf8Bytes, (byte) 0);
}
Arrays.fill(keyHash, (char) 0);
}

// Save role_descriptors
Expand Down Expand Up @@ -772,6 +774,10 @@ public static boolean isApiKeyAuthentication(Authentication authentication) {
}
}

void computeHashForApiKey(SecureString apiKey, ActionListener<char[]> listener) {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> hasher.hash(apiKey)));
}

// Protected instance method so this can be mocked
protected void verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials, ActionListener<Boolean> listener) {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ Version.CURRENT, randomFrom(AuthenticationType.REALM, AuthenticationType.TOKEN,
AuthenticationType.ANONYMOUS), Collections.emptyMap());
}
final Map<String, Object> metadata = ApiKeyTests.randomMetadata();
XContentBuilder docSource = service.newDocument(new SecureString(key.toCharArray()), "test", authentication,
XContentBuilder docSource = service.newDocument(
getFastStoredHashAlgoForTests().hash(new SecureString(key.toCharArray())),"test", authentication,
Collections.singleton(SUPERUSER_ROLE_DESCRIPTOR), Instant.now(), Instant.now().plus(expiry), keyRoles,
Version.CURRENT, metadata);
if (invalidated) {
Expand Down Expand Up @@ -976,6 +977,25 @@ public void testAuthWillTerminateIfHashingThreadPoolIsSaturated() throws IOExcep
assertThat(authenticationResult.getMessage(), containsString("server is too busy to respond"));
}

public void testCreationWillFailIfHashingThreadPoolIsSaturated() {
final EsRejectedExecutionException rejectedExecutionException = new EsRejectedExecutionException("rejected");
final ExecutorService mockExecutorService = mock(ExecutorService.class);
when(threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME)).thenReturn(mockExecutorService);
Mockito.doAnswer(invocationOnMock -> {
final AbstractRunnable actionRunnable = (AbstractRunnable) invocationOnMock.getArguments()[0];
actionRunnable.onRejection(rejectedExecutionException);
return null;
}).when(mockExecutorService).execute(any(Runnable.class));

final Authentication authentication = mock(Authentication.class);
final CreateApiKeyRequest createApiKeyRequest = new CreateApiKeyRequest(randomAlphaOfLengthBetween(3, 8), null, null);
ApiKeyService service = createApiKeyService(Settings.EMPTY);
final PlainActionFuture<CreateApiKeyResponse> future = new PlainActionFuture<>();
service.createApiKey(authentication, createApiKeyRequest, org.elasticsearch.core.Set.of(), future);
final EsRejectedExecutionException e = expectThrows(EsRejectedExecutionException.class, future::actionGet);
assertThat(e, is(rejectedExecutionException));
}

public void testCachedApiKeyValidationWillNotBeBlockedByUnCachedApiKey() throws IOException, ExecutionException, InterruptedException {
final String apiKey1 = randomAlphaOfLength(16);
final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey1.toCharArray()));
Expand Down Expand Up @@ -1130,7 +1150,7 @@ public static Authentication createApiKeyAuthentication(ApiKeyService apiKeyServ
List<RoleDescriptor> keyRoles,
Version version) throws Exception {
XContentBuilder keyDocSource = apiKeyService.newDocument(
new SecureString(randomAlphaOfLength(16).toCharArray()), "test", authentication,
getFastStoredHashAlgoForTests().hash(new SecureString(randomAlphaOfLength(16).toCharArray())), "test", authentication,
userRoles, Instant.now(), Instant.now().plus(Duration.ofSeconds(3600)), keyRoles, Version.CURRENT,
randomBoolean() ? null : org.elasticsearch.core.Map.of(
randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)));
Expand Down

0 comments on commit 03fbc35

Please sign in to comment.