Skip to content

Commit

Permalink
Grace period for user profile activation (#89566)
Browse files Browse the repository at this point in the history
The user profile document is updated on each activate call even when
there is no actual content change because it always updates the
last_synchronized timestamp. This behaviour is intentional to track the
user's last login time (since Kibana calls to the activate API on user
login). Client must explicitly handle retries for version conflicts.
This is generally desirable. However, on each login there are often
multiple web components trying to call this API concurrently. This
results into more frequent version conflicts. Since these updates occur
in a short period of time, updating last_synchronized for each of them
does not really contribute a lot for tracking user login.

This PR introduces a grace period for the update behaviour (30 seconds
non-configurable) so that the update (on activate) is only performed
when either of the following is true:

* There are actual content changes
* Or it has been more than 30 seconds since last update
  • Loading branch information
ywangd committed Sep 1, 2022
1 parent c2b34a9 commit 69a31da
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 20 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/89566.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89566
summary: Grace period for user profile activation
area: Security
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

package org.elasticsearch.xpack.security.profile;

import org.elasticsearch.client.Node;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -20,6 +22,7 @@

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -30,6 +33,7 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -352,6 +356,56 @@ public void testXpackUsageOutput() throws IOException {
assertThat(otherDomainRealms, containsInAnyOrder("saml1", "ad1"));
}

public void testActivateGracePeriodIsPerNode() throws IOException {
final Request activateProfileRequest = new Request("POST", "_security/profile/_activate");
activateProfileRequest.setJsonEntity("""
{
"grant_type": "password",
"username": "rac-user",
"password": "x-pack-test-password"
}""");

final RestClient client = adminClient();
final List<Node> originalNodes = client.getNodes();
assertThat(originalNodes.size(), greaterThan(1));
final Node node0 = originalNodes.get(0);
// Find a different node other than node0.
// Because all nodes of a testcluster runs on the same physical host, the different node
// should have the same hostname but listens on a different port.
// A single node can have both ipv4 and ipv6 addresses. If we do not filter for the
// same hostname, we might find the same node again (e.g. node0 but has an ipv6 address).
final Node node1 = originalNodes.subList(1, originalNodes.size() - 1)
.stream()
.filter(node -> node.getHost().getHostName().equals(node0.getHost().getHostName()))
.findFirst()
.orElseThrow();

try {
// Initial activate with node0
client.setNodes(List.of(node0));
final Map<String, Object> responseMap0 = responseAsMap(client.performRequest(activateProfileRequest));

final Instant start = Instant.now();
// Activate again with the same host (node0) should fall within the grace period and skip actual update
final Map<String, Object> responseMap1 = responseAsMap(client.performRequest(activateProfileRequest));
assumeTrue("Test is running too slow", start.plus(30, ChronoUnit.SECONDS).isAfter(Instant.now()));
assertThat(responseMap1.get("_doc"), equalTo(responseMap0.get("_doc")));

// Activate with different host (node1) should actually update since node name changes in RealmRef
client.setNodes(List.of(node1));
final Map<String, Object> responseMap2 = responseAsMap(client.performRequest(activateProfileRequest));
assumeTrue("Test is running too slow", start.plus(30, ChronoUnit.SECONDS).isAfter(Instant.now()));
assertThat(responseMap2.get("_doc"), not(equalTo(responseMap0.get("_doc"))));

// Activate again with node1 should see no update
final Map<String, Object> responseMap3 = responseAsMap(client.performRequest(activateProfileRequest));
assertTrue("Test is running too slow", Instant.now().toEpochMilli() - (long) responseMap2.get("last_synchronized") < 30_000L);
assertThat(responseMap3.get("_doc"), equalTo(responseMap2.get("_doc")));
} finally {
client.setNodes(originalNodes);
}
}

public void testGetUsersWithProfileUid() throws IOException {
final String username = randomAlphaOfLengthBetween(3, 8);
final Request putUserRequest = new Request("PUT", "_security/user/" + username);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.core.security.user.User;

import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -42,12 +43,15 @@
import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.INTERNAL_SECURITY_PROFILE_INDEX_8;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_PROFILE_ALIAS;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -341,6 +345,64 @@ public void testConcurrentCreationOfNewProfiles() throws InterruptedException {
}
}

public void testConcurrentActivateUpdates() throws InterruptedException {
final Authentication.RealmRef realmRef = randomRealmRef();
final User originalUser = new User(randomAlphaOfLengthBetween(5, 12));
final Authentication originalAuthentication = Authentication.newRealmAuthentication(originalUser, realmRef);

final ProfileService profileService = getInstanceFromRandomNode(ProfileService.class);
final PlainActionFuture<Profile> originalFuture = new PlainActionFuture<>();
profileService.activateProfile(originalAuthentication, originalFuture);
final Profile originalProfile = originalFuture.actionGet();

final Thread[] threads = new Thread[randomIntBetween(5, 10)];
final CountDownLatch readyLatch = new CountDownLatch(threads.length);
final CountDownLatch startLatch = new CountDownLatch(1);
final Set<Profile> updatedProfiles = ConcurrentHashMap.newKeySet();
final Authentication updatedAuthentication = Authentication.newRealmAuthentication(
new User(originalUser.principal(), "foo"),
realmRef
);
// All concurrent activations should succeed because we handle version conflict error and check whether update
// can be skipped. In this case, they can be skipped because all updates are for the same content.
// Due to the concurrency nature, there is no guarantee whether an update can succeed or succeed with error handling.
// We can only be sure that at least one of them will succeed.
// Other updates may succeed or they may succeed with the error handling. So we cannot assert that the document
// is only updated once. What we can assert is that they will all be successful (one way or another).
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try {
final PlainActionFuture<Profile> future = new PlainActionFuture<>();
profileService.activateProfile(updatedAuthentication, future);
readyLatch.countDown();
startLatch.await();
final Profile updatedProfile = future.actionGet();
assertThat(updatedProfile.uid(), equalTo(originalProfile.uid()));
assertThat(updatedProfile.user().roles(), contains("foo"));
updatedProfiles.add(updatedProfile);
} catch (Exception e) {
logger.error(e);
fail("caught error when activating existing profile: " + e);
}
});
threads[i].start();
}

if (readyLatch.await(20, TimeUnit.SECONDS)) {
startLatch.countDown();
for (Thread thread : threads) {
thread.join();
}
}

assertThat(updatedProfiles, not(emptyIterable()));
final Profile updatedProfile = updatedProfiles.stream().max(Comparator.comparingLong(Profile::lastSynchronized)).orElseThrow();
// Update again, this time it should simply skip due to grace period of 30 seconds
final PlainActionFuture<Profile> future = new PlainActionFuture<>();
profileService.activateProfile(updatedAuthentication, future);
assertThat(future.actionGet(), equalTo(updatedProfile));
}

public void testDifferentiator() {
String lastUid = null;
final int differentiatorLimit = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class ProfileService {
private static final String DOC_ID_PREFIX = "profile_";
private static final BackoffPolicy DEFAULT_BACKOFF = BackoffPolicy.exponentialBackoff();
private static final int DIFFERENTIATOR_UPPER_LIMIT = 9;
private static final long ACTIVATE_INTERVAL_IN_MS = TimeValue.timeValueSeconds(30).millis();

private final Settings settings;
private final Clock clock;
Expand Down Expand Up @@ -306,7 +308,7 @@ public void setEnabled(String uid, boolean enabled, RefreshPolicy refreshPolicy,
listener.onFailure(e);
return;
}
doUpdate(buildUpdateRequest(uid, builder, refreshPolicy, -1, -1), listener.map(updateResponse -> AcknowledgedResponse.TRUE));
doUpdate(buildUpdateRequest(uid, builder, refreshPolicy), listener.map(updateResponse -> AcknowledgedResponse.TRUE));
}

public void searchProfilesForSubjects(List<Subject> subjects, ActionListener<SubjectSearchResultsAndErrors<Profile>> listener) {
Expand Down Expand Up @@ -584,7 +586,7 @@ private SearchRequest buildSearchRequestForSubject(Subject subject) {
});
boolQuery.minimumShouldMatch(1);
}
return client.prepareSearch(SECURITY_PROFILE_ALIAS).setQuery(boolQuery).request();
return client.prepareSearch(SECURITY_PROFILE_ALIAS).setQuery(boolQuery).seqNoAndPrimaryTerm(true).request();
}

private static final Pattern VALID_LITERAL_USERNAME = Pattern.compile("^[a-zA-Z0-9][a-zA-Z0-9-]{0,255}$");
Expand Down Expand Up @@ -639,7 +641,7 @@ void createNewProfile(Subject subject, String uid, ActionListener<Profile> liste
);
listener.onResponse(versionedDocument.toProfile(Set.of()));
}, e -> {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// Document already exists with the specified ID, get the document with the ID
// and check whether it is the right profile for the subject
getOrCreateProfileWithBackoff(subject, profileDocument, DEFAULT_BACKOFF.iterator(), listener);
Expand Down Expand Up @@ -770,25 +772,73 @@ private DomainConfig getDomainConfigForSubject(Subject subject) {
}
}

private void updateProfileForActivate(Subject subject, VersionedDocument versionedDocument, ActionListener<Profile> listener)
// package private for testing
void updateProfileForActivate(Subject subject, VersionedDocument currentVersionedDocumentBySearch, ActionListener<Profile> listener)
throws IOException {
final ProfileDocument profileDocument = updateWithSubject(versionedDocument.doc, subject);
final ProfileDocument newProfileDocument = updateWithSubject(currentVersionedDocumentBySearch.doc, subject);

if (shouldSkipUpdateForActivate(currentVersionedDocumentBySearch.doc, newProfileDocument)) {
logger.debug(
"skip user profile activate update because last_synchronized [{}] is within grace period",
currentVersionedDocumentBySearch.doc.lastSynchronized()
);
listener.onResponse(currentVersionedDocumentBySearch.toProfile(Set.of()));
return;
}

doUpdate(
buildUpdateRequest(
profileDocument.uid(),
wrapProfileDocumentWithoutApplicationData(profileDocument),
RefreshPolicy.WAIT_UNTIL,
versionedDocument.primaryTerm,
versionedDocument.seqNo
newProfileDocument.uid(),
wrapProfileDocumentWithoutApplicationData(newProfileDocument),
RefreshPolicy.WAIT_UNTIL
),
listener.map(
updateResponse -> new VersionedDocument(profileDocument, updateResponse.getPrimaryTerm(), updateResponse.getSeqNo())
.toProfile(Set.of())
)
ActionListener.wrap(updateResponse -> {
listener.onResponse(
new VersionedDocument(newProfileDocument, updateResponse.getPrimaryTerm(), updateResponse.getSeqNo()).toProfile(
Set.of()
)
);
}, updateException -> {
// The document may have been updated concurrently by another thread. Get it and check whether the updated content
// already has what is required by this thread. If so, simply return the updated profile.
if (ExceptionsHelper.unwrapCause(updateException) instanceof VersionConflictEngineException) {
getVersionedDocument(currentVersionedDocumentBySearch.doc.uid(), ActionListener.wrap(versionedDocumentByGet -> {
if (shouldSkipUpdateForActivate(versionedDocumentByGet.doc, newProfileDocument)) {
logger.debug(
"suppress version conflict for activate update because last_synchronized [{}] is within grace period",
versionedDocumentByGet.doc.lastSynchronized()
);
listener.onResponse(versionedDocumentByGet.toProfile(Set.of()));
} else {
listener.onFailure(updateException);
}
}, getException -> {
getException.addSuppressed(updateException);
listener.onFailure(getException);
}));
} else {
listener.onFailure(updateException);
}
})
);
}

// If the profile content does not change and it is recently updated within last 30 seconds, do not update it again
// to avoid potential excessive version conflicts
boolean shouldSkipUpdateForActivate(ProfileDocument currentProfileDocument, ProfileDocument newProfileDocument) {
assert newProfileDocument.enabled() : "new profile document must be enabled";
if (newProfileDocument.user().equals(currentProfileDocument.user())
&& newProfileDocument.enabled() == currentProfileDocument.enabled()
&& newProfileDocument.lastSynchronized() - currentProfileDocument.lastSynchronized() < ACTIVATE_INTERVAL_IN_MS) {
return true;
}
return false;
}

private UpdateRequest buildUpdateRequest(String uid, XContentBuilder builder, RefreshPolicy refreshPolicy) {
return buildUpdateRequest(uid, builder, refreshPolicy, -1, -1);
}

private UpdateRequest buildUpdateRequest(
String uid,
XContentBuilder builder,
Expand Down

0 comments on commit 69a31da

Please sign in to comment.