Skip to content

Commit

Permalink
Security: propagate auth result to listeners (#36900)
Browse files Browse the repository at this point in the history
After #30794, our caching realms limit each principal to a single auth
attempt at a time. This prevents hammering of external servers but can
cause a significant performance hit when requests need to go through a
realm that takes a long time to attempt to authenticate in order to get
to the realm that actually authenticates. In order to address this,
this change will propagate failed results to listeners if they use the
same set of credentials that the authentication attempt used. This does
prevent these stalled requests from retrying the authentication attempt
but the implementation does allow for new requests to retry the
attempt.
  • Loading branch information
jaymode committed Jan 8, 2019
1 parent dd69553 commit 1514bbc
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.security.authc.support;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.settings.SecureString;
Expand All @@ -29,7 +30,7 @@

public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm implements CachingRealm {

private final Cache<String, ListenableFuture<UserWithHash>> cache;
private final Cache<String, ListenableFuture<CachedResult>> cache;
private final ThreadPool threadPool;
private final boolean authenticationEnabled;
final Hasher cacheHasher;
Expand All @@ -40,7 +41,7 @@ protected CachingUsernamePasswordRealm(RealmConfig config, ThreadPool threadPool
this.threadPool = threadPool;
final TimeValue ttl = this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING);
if (ttl.getNanos() > 0) {
cache = CacheBuilder.<String, ListenableFuture<UserWithHash>>builder()
cache = CacheBuilder.<String, ListenableFuture<CachedResult>>builder()
.setExpireAfterWrite(ttl)
.setMaximumWeight(this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_MAX_USERS_SETTING))
.build();
Expand Down Expand Up @@ -122,58 +123,61 @@ private void authenticateWithCache(UsernamePasswordToken token, ActionListener<A
assert cache != null;
try {
final AtomicBoolean authenticationInCache = new AtomicBoolean(true);
final ListenableFuture<UserWithHash> listenableCacheEntry = cache.computeIfAbsent(token.principal(), k -> {
final ListenableFuture<CachedResult> listenableCacheEntry = cache.computeIfAbsent(token.principal(), k -> {
authenticationInCache.set(false);
return new ListenableFuture<>();
});
if (authenticationInCache.get()) {
// there is a cached or an inflight authenticate request
listenableCacheEntry.addListener(ActionListener.wrap(authenticatedUserWithHash -> {
if (authenticatedUserWithHash != null && authenticatedUserWithHash.verify(token.credentials())) {
// cached credential hash matches the credential hash for this forestalled request
handleCachedAuthentication(authenticatedUserWithHash.user, ActionListener.wrap(cacheResult -> {
if (cacheResult.isAuthenticated()) {
logger.debug("realm [{}] authenticated user [{}], with roles [{}]",
name(), token.principal(), cacheResult.getUser().roles());
} else {
logger.debug("realm [{}] authenticated user [{}] from cache, but then failed [{}]",
name(), token.principal(), cacheResult.getMessage());
}
listener.onResponse(cacheResult);
}, listener::onFailure));
listenableCacheEntry.addListener(ActionListener.wrap(cachedResult -> {
final boolean credsMatch = cachedResult.verify(token.credentials());
if (cachedResult.authenticationResult.isAuthenticated()) {
if (credsMatch) {
// cached credential hash matches the credential hash for this forestalled request
handleCachedAuthentication(cachedResult.user, ActionListener.wrap(cacheResult -> {
if (cacheResult.isAuthenticated()) {
logger.debug("realm [{}] authenticated user [{}], with roles [{}]",
name(), token.principal(), cacheResult.getUser().roles());
} else {
logger.debug("realm [{}] authenticated user [{}] from cache, but then failed [{}]",
name(), token.principal(), cacheResult.getMessage());
}
listener.onResponse(cacheResult);
}, listener::onFailure));
} else {
// its credential hash does not match the
// hash of the credential for this forestalled request.
// clear cache and try to reach the authentication source again because password
// might have changed there and the local cached hash got stale
cache.invalidate(token.principal(), listenableCacheEntry);
authenticateWithCache(token, listener);
}
} else if (credsMatch) {
// not authenticated but instead of hammering reuse the result. a new
// request will trigger a retried auth
listener.onResponse(cachedResult.authenticationResult);
} else {
// The inflight request has failed or its credential hash does not match the
// hash of the credential for this forestalled request.
// clear cache and try to reach the authentication source again because password
// might have changed there and the local cached hash got stale
cache.invalidate(token.principal(), listenableCacheEntry);
authenticateWithCache(token, listener);
}
}, e -> {
// the inflight request failed, so try again, but first (always) make sure cache
// is cleared of the failed authentication
cache.invalidate(token.principal(), listenableCacheEntry);
authenticateWithCache(token, listener);
}), threadPool.executor(ThreadPool.Names.GENERIC), threadPool.getThreadContext());
}, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC), threadPool.getThreadContext());
} else {
// attempt authentication against the authentication source
doAuthenticate(token, ActionListener.wrap(authResult -> {
if (authResult.isAuthenticated() && authResult.getUser().enabled()) {
// compute the credential hash of this successful authentication request
final UserWithHash userWithHash = new UserWithHash(authResult.getUser(), token.credentials(), cacheHasher);
// notify any forestalled request listeners; they will not reach to the
// authentication request and instead will use this hash for comparison
listenableCacheEntry.onResponse(userWithHash);
} else {
// notify any forestalled request listeners; they will retry the request
listenableCacheEntry.onResponse(null);
if (authResult.isAuthenticated() == false || authResult.getUser().enabled() == false) {
// a new request should trigger a new authentication
cache.invalidate(token.principal(), listenableCacheEntry);
}
// notify the listener of the inflight authentication request; this request is not retried
// notify any forestalled request listeners; they will not reach to the
// authentication request and instead will use this result if they contain
// the same credentials
listenableCacheEntry.onResponse(new CachedResult(authResult, cacheHasher, authResult.getUser(), token.credentials()));
listener.onResponse(authResult);
}, e -> {
// notify any staved off listeners; they will retry the request
cache.invalidate(token.principal(), listenableCacheEntry);
// notify any staved off listeners; they will propagate this error
listenableCacheEntry.onFailure(e);
// notify the listener of the inflight authentication request; this request is not retried
// notify the listener of the inflight authentication request
listener.onFailure(e);
}));
}
Expand Down Expand Up @@ -225,35 +229,31 @@ private void lookupWithCache(String username, ActionListener<User> listener) {
assert cache != null;
try {
final AtomicBoolean lookupInCache = new AtomicBoolean(true);
final ListenableFuture<UserWithHash> listenableCacheEntry = cache.computeIfAbsent(username, key -> {
final ListenableFuture<CachedResult> listenableCacheEntry = cache.computeIfAbsent(username, key -> {
lookupInCache.set(false);
return new ListenableFuture<>();
});
if (false == lookupInCache.get()) {
// attempt lookup against the user directory
doLookupUser(username, ActionListener.wrap(user -> {
if (user != null) {
// user found
final UserWithHash userWithHash = new UserWithHash(user, null, null);
// notify forestalled request listeners
listenableCacheEntry.onResponse(userWithHash);
} else {
final CachedResult result = new CachedResult(AuthenticationResult.notHandled(), cacheHasher, user, null);
if (user == null) {
// user not found, invalidate cache so that subsequent requests are forwarded to
// the user directory
cache.invalidate(username, listenableCacheEntry);
// notify forestalled request listeners
listenableCacheEntry.onResponse(null);
}
// notify forestalled request listeners
listenableCacheEntry.onResponse(result);
}, e -> {
// the next request should be forwarded, not halted by a failed lookup attempt
cache.invalidate(username, listenableCacheEntry);
// notify forestalled listeners
listenableCacheEntry.onFailure(e);
}));
}
listenableCacheEntry.addListener(ActionListener.wrap(userWithHash -> {
if (userWithHash != null) {
listener.onResponse(userWithHash.user);
listenableCacheEntry.addListener(ActionListener.wrap(cachedResult -> {
if (cachedResult.user != null) {
listener.onResponse(cachedResult.user);
} else {
listener.onResponse(null);
}
Expand All @@ -265,16 +265,21 @@ private void lookupWithCache(String username, ActionListener<User> listener) {

protected abstract void doLookupUser(String username, ActionListener<User> listener);

private static class UserWithHash {
final User user;
final char[] hash;
private static class CachedResult {
private final AuthenticationResult authenticationResult;
private final User user;
private final char[] hash;

UserWithHash(User user, SecureString password, Hasher hasher) {
this.user = Objects.requireNonNull(user);
private CachedResult(AuthenticationResult result, Hasher hasher, @Nullable User user, @Nullable SecureString password) {
this.authenticationResult = Objects.requireNonNull(result);
if (authenticationResult.isAuthenticated() && user == null) {
throw new IllegalArgumentException("authentication cannot be successful with a null user");
}
this.user = user;
this.hash = password == null ? null : hasher.hash(password);
}

boolean verify(SecureString password) {
private boolean verify(SecureString password) {
return hash != null && Hasher.verifyHash(password, hash);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public void setup() {
}

@After
public void stop() throws InterruptedException {
public void stop() {
if (threadPool != null) {
terminate(threadPool);
}
}

public void testCacheSettings() throws Exception {
public void testCacheSettings() {
String cachingHashAlgo = Hasher.values()[randomIntBetween(0, Hasher.values().length - 1)].name().toLowerCase(Locale.ROOT);
int maxUsers = randomIntBetween(10, 100);
TimeValue ttl = TimeValue.timeValueMinutes(randomIntBetween(10, 20));
Expand Down Expand Up @@ -352,7 +352,7 @@ private void sleepUntil(long until) throws InterruptedException {
}
}

public void testAuthenticateContract() throws Exception {
public void testAuthenticateContract() {
Realm realm = new FailingAuthenticationRealm(globalSettings, threadPool);
PlainActionFuture<AuthenticationResult> future = new PlainActionFuture<>();
realm.authenticate(new UsernamePasswordToken("user", new SecureString("pass")), future);
Expand All @@ -366,7 +366,7 @@ public void testAuthenticateContract() throws Exception {
assertThat(e.getMessage(), containsString("whatever exception"));
}

public void testLookupContract() throws Exception {
public void testLookupContract() {
Realm realm = new FailingAuthenticationRealm(globalSettings, threadPool);
PlainActionFuture<User> future = new PlainActionFuture<>();
realm.lookupUser("user", future);
Expand All @@ -380,7 +380,7 @@ public void testLookupContract() throws Exception {
assertThat(e.getMessage(), containsString("lookup exception"));
}

public void testReturnDifferentObjectFromCache() throws Exception {
public void testReturnDifferentObjectFromCache() {
final AtomicReference<User> userArg = new AtomicReference<>();
final AtomicReference<AuthenticationResult> result = new AtomicReference<>();
Realm realm = new AlwaysAuthenticateCachingRealm(globalSettings, threadPool) {
Expand Down Expand Up @@ -473,6 +473,71 @@ protected void doLookupUser(String username, ActionListener<User> listener) {
assertEquals(1, authCounter.get());
}

public void testUnauthenticatedResultPropagatesWithSameCreds() throws Exception {
final String username = "username";
final SecureString password = SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;
final AtomicInteger authCounter = new AtomicInteger(0);
final Hasher pwdHasher = Hasher.resolve(randomFrom("pbkdf2", "pbkdf2_1000", "bcrypt", "bcrypt9"));
final String passwordHash = new String(pwdHasher.hash(password));
RealmConfig config = new RealmConfig(new RealmConfig.RealmIdentifier("caching", "test_realm"), globalSettings,
TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY));

final int numberOfProcessors = Runtime.getRuntime().availableProcessors();
final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3);
final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
List<Thread> threads = new ArrayList<>(numberOfThreads);
final SecureString credsToUse = new SecureString(randomAlphaOfLength(12).toCharArray());
final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm(config, threadPool) {
@Override
protected void doAuthenticate(UsernamePasswordToken token, ActionListener<AuthenticationResult> listener) {
authCounter.incrementAndGet();
// do something slow
if (pwdHasher.verify(token.credentials(), passwordHash.toCharArray())) {
listener.onFailure(new IllegalStateException("password auth should never succeed"));
} else {
listener.onResponse(AuthenticationResult.unsuccessful("password verification failed", null));
}
}

@Override
protected void doLookupUser(String username, ActionListener<User> listener) {
listener.onFailure(new UnsupportedOperationException("this method should not be called"));
}
};
for (int i = 0; i < numberOfThreads; i++) {
threads.add(new Thread(() -> {
try {
latch.countDown();
latch.await();
final UsernamePasswordToken token = new UsernamePasswordToken(username, credsToUse);

realm.authenticate(token, ActionListener.wrap((result) -> {
if (result.isAuthenticated()) {
throw new IllegalStateException("invalid password led to an authenticated result: " + result);
}
assertThat(result.getMessage(), containsString("password verification failed"));
}, (e) -> {
logger.error("caught exception", e);
fail("unexpected exception - " + e);
}));

} catch (InterruptedException e) {
logger.error("thread was interrupted", e);
Thread.currentThread().interrupt();
}
}));
}

for (Thread thread : threads) {
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
assertEquals(1, authCounter.get());
}

public void testCacheConcurrency() throws Exception {
final String username = "username";
final SecureString password = SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;
Expand Down Expand Up @@ -704,27 +769,4 @@ protected void doLookupUser(String username, ActionListener<User> listener) {
listener.onResponse(new User(username, new String[]{"lookupRole1", "lookupRole2"}));
}
}

static class LookupNotSupportedRealm extends CachingUsernamePasswordRealm {

public final AtomicInteger authInvocationCounter = new AtomicInteger(0);
public final AtomicInteger lookupInvocationCounter = new AtomicInteger(0);

LookupNotSupportedRealm(Settings globalSettings, ThreadPool threadPool) {
super(new RealmConfig(new RealmConfig.RealmIdentifier("caching", "lookup-notsupported-test"), globalSettings,
TestEnvironment.newEnvironment(globalSettings), threadPool.getThreadContext()), threadPool);
}

@Override
protected void doAuthenticate(UsernamePasswordToken token, ActionListener<AuthenticationResult> listener) {
authInvocationCounter.incrementAndGet();
listener.onResponse(AuthenticationResult.success(new User(token.principal(), new String[]{"testRole1", "testRole2"})));
}

@Override
protected void doLookupUser(String username, ActionListener<User> listener) {
lookupInvocationCounter.incrementAndGet();
listener.onFailure(new UnsupportedOperationException("don't call lookup if lookup isn't supported!!!"));
}
}
}

0 comments on commit 1514bbc

Please sign in to comment.