Skip to content

Commit

Permalink
User Profile - Use security origin for BWC cases (#86345)
Browse files Browse the repository at this point in the history
The security profile action origin and internal user is not available
before version 8.3.0. This PR makes all profile actions to use the
existing security origin if the cluster has any node that is older than
8.3.0.

The change makes it possible to use User Profile features in a mixed
cluster as long as the request always hits a newer (newer than 8.3) node
first.
  • Loading branch information
ywangd committed May 3, 2022
1 parent bbd5c84 commit ba91f26
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ Collection<Object> createComponents(
getClock(),
client,
systemIndices.getProfileIndexManager(),
clusterService,
threadPool
);
components.add(profileService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -75,10 +76,12 @@
import java.util.stream.Collectors;

import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.toSingleItemBulkRequest;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_PROFILE_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.security.authc.Authentication.isFileOrNativeRealm;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_PROFILE_ALIAS;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.VERSION_SECURITY_PROFILE_ORIGIN;

public class ProfileService {
private static final Logger logger = LogManager.getLogger(ProfileService.class);
Expand All @@ -90,13 +93,22 @@ public class ProfileService {
private final Clock clock;
private final Client client;
private final SecurityIndexManager profileIndex;
private final ClusterService clusterService;
private final ThreadPool threadPool;

public ProfileService(Settings settings, Clock clock, Client client, SecurityIndexManager profileIndex, ThreadPool threadPool) {
public ProfileService(
Settings settings,
Clock clock,
Client client,
SecurityIndexManager profileIndex,
ClusterService clusterService,
ThreadPool threadPool
) {
this.settings = settings;
this.clock = clock;
this.client = client;
this.profileIndex = profileIndex;
this.clusterService = clusterService;
this.threadPool = threadPool;
}

Expand Down Expand Up @@ -191,7 +203,7 @@ public void suggestProfile(SuggestProfilesRequest request, ActionListener<Sugges
listener::onFailure,
() -> executeAsyncWithOrigin(
client,
SECURITY_PROFILE_ORIGIN,
getActionOrigin(),
SearchAction.INSTANCE,
searchRequest,
ActionListener.wrap(searchResponse -> {
Expand Down Expand Up @@ -281,26 +293,20 @@ private void getVersionedDocument(String uid, ActionListener<VersionedDocument>
final GetRequest getRequest = new GetRequest(SECURITY_PROFILE_ALIAS, uidToDocId(uid));
frozenProfileIndex.checkIndexVersionThenExecute(
listener::onFailure,
() -> executeAsyncWithOrigin(
client,
SECURITY_PROFILE_ORIGIN,
GetAction.INSTANCE,
getRequest,
ActionListener.wrap(response -> {
if (false == response.isExists()) {
logger.debug("profile with uid [{}] does not exist", uid);
listener.onResponse(null);
return;
}
listener.onResponse(
new VersionedDocument(
buildProfileDocument(response.getSourceAsBytesRef()),
response.getPrimaryTerm(),
response.getSeqNo()
)
);
}, listener::onFailure)
)
() -> executeAsyncWithOrigin(client, getActionOrigin(), GetAction.INSTANCE, getRequest, ActionListener.wrap(response -> {
if (false == response.isExists()) {
logger.debug("profile with uid [{}] does not exist", uid);
listener.onResponse(null);
return;
}
listener.onResponse(
new VersionedDocument(
buildProfileDocument(response.getSourceAsBytesRef()),
response.getPrimaryTerm(),
response.getSeqNo()
)
);
}, listener::onFailure))
);
});
}
Expand Down Expand Up @@ -340,7 +346,7 @@ void searchVersionedDocumentForSubject(Subject subject, ActionListener<Versioned
listener::onFailure,
() -> executeAsyncWithOrigin(
client,
SECURITY_PROFILE_ORIGIN,
getActionOrigin(),
SearchAction.INSTANCE,
searchRequest,
ActionListener.wrap(searchResponse -> {
Expand Down Expand Up @@ -412,7 +418,7 @@ private void createNewProfile(Subject subject, String uid, ActionListener<Profil
listener::onFailure,
() -> executeAsyncWithOrigin(
client,
SECURITY_PROFILE_ORIGIN,
getActionOrigin(),
BulkAction.INSTANCE,
bulkRequest,
TransportSingleItemBulkWriteAction.<IndexResponse>wrapBulkResponse(ActionListener.wrap(indexResponse -> {
Expand Down Expand Up @@ -564,7 +570,7 @@ void doUpdate(UpdateRequest updateRequest, ActionListener<UpdateResponse> listen
listener::onFailure,
() -> executeAsyncWithOrigin(
client,
SECURITY_PROFILE_ORIGIN,
getActionOrigin(),
UpdateAction.INSTANCE,
updateRequest,
ActionListener.wrap(updateResponse -> {
Expand All @@ -576,6 +582,15 @@ void doUpdate(UpdateRequest updateRequest, ActionListener<UpdateResponse> listen
);
}

private String getActionOrigin() {
// profile origin and user is not available before v8.3.0
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(VERSION_SECURITY_PROFILE_ORIGIN)) {
return SECURITY_PROFILE_ORIGIN;
} else {
return SECURITY_ORIGIN;
}
}

private static String uidToDocId(String uid) {
return DOC_ID_PREFIX + uid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class SecuritySystemIndices {

public static final String INTERNAL_SECURITY_PROFILE_INDEX_8 = ".security-profile-8";
public static final String SECURITY_PROFILE_ALIAS = ".security-profile";
public static final Version VERSION_SECURITY_PROFILE_ORIGIN = Version.V_8_3_0;

private final Logger logger = LogManager.getLogger(SecuritySystemIndices.class);

Expand Down Expand Up @@ -737,8 +738,25 @@ private SystemIndexDescriptor getSecurityProfileIndexDescriptor() {
.setAliasName(SECURITY_PROFILE_ALIAS)
.setIndexFormat(INTERNAL_PROFILE_INDEX_FORMAT)
.setVersionMetaKey(SECURITY_VERSION_STRING)
.setOrigin(SECURITY_PROFILE_ORIGIN)
.setOrigin(SECURITY_PROFILE_ORIGIN) // new origin since 8.3
.setThreadPools(ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS)
.setMinimumNodeVersion(VERSION_SECURITY_PROFILE_ORIGIN)
.setPriorSystemIndexDescriptors(
List.of(
SystemIndexDescriptor.builder()
.setIndexPattern(".security-profile-[0-9]+*")
.setPrimaryIndex(INTERNAL_SECURITY_PROFILE_INDEX_8)
.setDescription("Contains user profile documents")
.setMappings(getProfileIndexMappings())
.setSettings(getProfileIndexSettings())
.setAliasName(SECURITY_PROFILE_ALIAS)
.setIndexFormat(INTERNAL_PROFILE_INDEX_FORMAT)
.setVersionMetaKey(SECURITY_VERSION_STRING)
.setOrigin(SECURITY_ORIGIN)
.setThreadPools(ExecutorNames.CRITICAL_SYSTEM_INDEX_THREAD_POOLS)
.build()
)
)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.security.profile;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -25,6 +26,9 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -39,6 +43,7 @@
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -68,6 +73,7 @@

import static org.elasticsearch.common.util.concurrent.ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME;
import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_PROFILE_ORIGIN;
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_PROFILE_ALIAS;
Expand Down Expand Up @@ -124,6 +130,7 @@ public class ProfileServiceTests extends ESTestCase {
private Client client;
private SecurityIndexManager profileIndex;
private ProfileService profileService;
private Version minNodeVersion;

@Before
public void prepare() {
Expand All @@ -146,7 +153,14 @@ public void prepare() {
new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(SECURITY_PROFILE_ALIAS)
);
this.profileIndex = SecurityMocks.mockSecurityIndexManager(SECURITY_PROFILE_ALIAS);
this.profileService = new ProfileService(Settings.EMPTY, Clock.systemUTC(), client, profileIndex, threadPool);
final ClusterService clusterService = mock(ClusterService.class);
final ClusterState clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
final DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(clusterState.nodes()).thenReturn(discoveryNodes);
minNodeVersion = VersionUtils.randomVersionBetween(random(), Version.V_7_17_0, Version.CURRENT);
when(discoveryNodes.getMinNodeVersion()).thenReturn(minNodeVersion);
this.profileService = new ProfileService(Settings.EMPTY, Clock.systemUTC(), client, profileIndex, clusterService, threadPool);
}

@After
Expand All @@ -157,7 +171,10 @@ public void stopThreadPool() {
public void testGetProfileByUid() {
final String uid = randomAlphaOfLength(20);
doAnswer(invocation -> {
assertThat(threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME), equalTo(SECURITY_PROFILE_ORIGIN));
assertThat(
threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
equalTo(minNodeVersion.onOrAfter(Version.V_8_3_0) ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
);
final GetRequest getRequest = (GetRequest) invocation.getArguments()[1];
@SuppressWarnings("unchecked")
final ActionListener<GetResponse> listener = (ActionListener<GetResponse>) invocation.getArguments()[2];
Expand Down Expand Up @@ -316,7 +333,10 @@ public void testBuildSearchRequest() {
public void testSecurityProfileOrigin() {
// Activate profile
doAnswer(invocation -> {
assertThat(threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME), equalTo(SECURITY_PROFILE_ORIGIN));
assertThat(
threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
equalTo(minNodeVersion.onOrAfter(Version.V_8_3_0) ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
);
@SuppressWarnings("unchecked")
final ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
listener.onResponse(SearchResponse.empty(() -> 1L, SearchResponse.Clusters.EMPTY));
Expand All @@ -329,7 +349,10 @@ public void testSecurityProfileOrigin() {

final RuntimeException expectedException = new RuntimeException("expected");
doAnswer(invocation -> {
assertThat(threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME), equalTo(SECURITY_PROFILE_ORIGIN));
assertThat(
threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
equalTo(minNodeVersion.onOrAfter(Version.V_8_3_0) ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
);
final ActionListener<?> listener = (ActionListener<?>) invocation.getArguments()[2];
listener.onFailure(expectedException);
return null;
Expand All @@ -342,7 +365,10 @@ public void testSecurityProfileOrigin() {

// Update
doAnswer(invocation -> {
assertThat(threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME), equalTo(SECURITY_PROFILE_ORIGIN));
assertThat(
threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
equalTo(minNodeVersion.onOrAfter(Version.V_8_3_0) ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
);
final ActionListener<?> listener = (ActionListener<?>) invocation.getArguments()[2];
listener.onFailure(expectedException);
return null;
Expand All @@ -354,7 +380,10 @@ public void testSecurityProfileOrigin() {

// Suggest
doAnswer(invocation -> {
assertThat(threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME), equalTo(SECURITY_PROFILE_ORIGIN));
assertThat(
threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
equalTo(minNodeVersion.onOrAfter(Version.V_8_3_0) ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
);
final ActionListener<?> listener = (ActionListener<?>) invocation.getArguments()[2];
listener.onFailure(expectedException);
return null;
Expand Down
3 changes: 3 additions & 0 deletions x-pack/qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.VersionProperties
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask
Expand Down Expand Up @@ -87,6 +88,8 @@ BuildParams.bwcVersions.withWireCompatible { bwcVersion, baseName ->
keystore 'xpack.watcher.encryption_key', file("${project.projectDir}/src/test/resources/system_key")
setting 'xpack.watcher.encrypt_sensitive_data', 'true'

requiresFeature 'es.user_profile_feature_flag_enabled', Version.fromString("8.1.0")

// Old versions of the code contain an invalid assertion that trips
// during tests. Versions 5.6.9 and 6.2.4 have been fixed by removing
// the assertion, but this is impossible for released versions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
"Test User Profile feature will work in a mixed cluster":

- skip:
features: node_selector
version: " - 7.99.99"
reason: "https://github.com/elastic/elasticsearch/issues/86373"

- do:
node_selector:
version: " 8.3.0 - "
security.activate_user_profile:
body: >
{
"grant_type": "password",
"username": "test_user",
"password" : "x-pack-test-password"
}
- is_true: uid
- match: { "user.username" : "test_user" }
- set: { uid: profile_uid }

- do:
node_selector:
version: " 8.3.0 - "
security.get_user_profile:
uid: "$profile_uid"

- length: { $body: 1 }
- is_true: "$profile_uid"
- set: { $profile_uid: profile }
- match: { $profile.uid : "$profile_uid" }
- match: { $profile.user.username : "test_user" }

0 comments on commit ba91f26

Please sign in to comment.