Skip to content

Commit

Permalink
Support transform for RCS 2.0 (#95169)
Browse files Browse the repository at this point in the history
This PR adds support for transform jobs with remote indices for RCS 2.0.
  • Loading branch information
ywangd committed Apr 13, 2023
1 parent 91a0fd1 commit d5e49f3
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,11 @@ public static void assertOK(Response response) {
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
}

public static ObjectPath assertOKAndCreateObjectPath(Response response) throws IOException {
assertOK(response);
return ObjectPath.createFromResponse(response);
}

/**
* Assert that the index in question has the given number of documents present
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.ilm.action.GetLifecycleAction;
import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
import org.elasticsearch.xpack.core.ilm.action.StartILMAction;
Expand Down Expand Up @@ -156,7 +157,8 @@ public class ClusterPrivilegeResolver {

private static final Set<String> CROSS_CLUSTER_ACCESS_PATTERN = Set.of(
RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
RemoteClusterNodesAction.NAME
RemoteClusterNodesAction.NAME,
XPackInfoAction.NAME
);
private static final Set<String> MANAGE_ENRICH_AUTOMATON = Set.of("cluster:admin/xpack/enrich/*");

Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/security/qa/multi-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ apply plugin: 'elasticsearch.bwc-test'

dependencies {
clusterModules(project(":modules:analysis-common"))
clusterModules(project(":modules:reindex")) // need for deleting transform jobs
clusterModules(project(":x-pack:plugin:transform"))
}

tasks.named("javaRestTest") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.AfterClass;
import org.junit.BeforeClass;

Expand All @@ -35,7 +35,6 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

Expand All @@ -45,6 +44,7 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa
protected static final SecureString PASS = new SecureString("x-pack-test-password".toCharArray());
protected static final String REMOTE_SEARCH_USER = "remote_search_user";
protected static final String REMOTE_METRIC_USER = "remote_metric_user";
protected static final String REMOTE_TRANSFORM_USER = "remote_transform_user";
protected static final String REMOTE_SEARCH_ROLE = "remote_search";

protected static LocalClusterConfigProvider commonClusterConfig = cluster -> cluster.module("analysis-common")
Expand Down Expand Up @@ -188,13 +188,12 @@ protected void configureRemoteCluster(
assertBusy(() -> {
final Response remoteInfoResponse = adminClient().performRequest(remoteInfoRequest);
assertOK(remoteInfoResponse);
final Map<String, Object> remoteInfoMap = responseAsMap(remoteInfoResponse);
assertThat(remoteInfoMap, hasKey(clusterAlias));
assertThat(ObjectPath.eval(clusterAlias + ".connected", remoteInfoMap), is(true));
final ObjectPath remoteInfoObjectPath = assertOKAndCreateObjectPath(remoteInfoResponse);
assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".connected"), is(true));
if (false == isProxyMode) {
assertThat(ObjectPath.eval(clusterAlias + ".num_nodes_connected", remoteInfoMap), equalTo(numberOfFcNodes));
assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".num_nodes_connected"), equalTo(numberOfFcNodes));
}
final String credentialsValue = ObjectPath.eval(clusterAlias + ".cluster_credentials", remoteInfoMap);
final String credentialsValue = remoteInfoObjectPath.evaluate(clusterAlias + ".cluster_credentials");
if (basicSecurity) {
assertThat(credentialsValue, nullValue());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.remotecluster;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.core.Strings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class RemoteClusterSecurityTransformIT extends AbstractRemoteClusterSecurityTestCase {

private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();

static {
fulfillingCluster = ElasticsearchCluster.local()
.name("fulfilling-cluster")
.apply(commonClusterConfig)
.module("transform")
.module("reindex")
.setting("remote_cluster_server.enabled", "true")
.setting("remote_cluster.port", "0")
.setting("xpack.security.remote_cluster_server.ssl.enabled", "true")
.setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
.setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
.setting("xpack.security.authc.token.enabled", "true")
.keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
.build();

queryCluster = ElasticsearchCluster.local()
.name("query-cluster")
.apply(commonClusterConfig)
.module("transform")
.module("reindex")
.setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
.setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
.setting("xpack.security.authc.token.enabled", "true")
.keystore("cluster.remote.my_remote_cluster.credentials", () -> {
API_KEY_MAP_REF.compareAndSet(null, createCrossClusterAccessApiKey("""
[
{
"names": ["shared-transform-index"],
"privileges": ["read", "read_cross_cluster", "view_index_metadata"]
}
]"""));
return (String) API_KEY_MAP_REF.get().get("encoded");
})
.rolesFile(Resource.fromClasspath("roles.yml"))
.user(REMOTE_TRANSFORM_USER, PASS.toString(), "transform_admin,transform_remote_shared_index")
.build();
}

@ClassRule
// Use a RuleChain to ensure that fulfilling cluster is started before query cluster
public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);

public void testCrossClusterTransform() throws Exception {
configureRemoteClusters();

// Fulfilling cluster
{
final Request createIndexRequest1 = new Request("PUT", "shared-transform-index");
createIndexRequest1.setJsonEntity("""
{
"mappings": {
"properties": {
"user": { "type": "keyword" },
"stars": { "type": "integer" },
"coolness": { "type": "integer" }
}
}
}""");
assertOK(performRequestAgainstFulfillingCluster(createIndexRequest1));

// Index some documents, so we can attempt to transform them from the querying cluster
final Request bulkRequest1 = new Request("POST", "/_bulk?refresh=true");
bulkRequest1.setJsonEntity(Strings.format("""
{"index": {"_index": "shared-transform-index"}}
{"user": "a", "stars": 1}
{"index": {"_index": "shared-transform-index"}}
{"user": "a", "stars": 4}
{"index": {"_index": "shared-transform-index"}}
{"user": "a", "stars": 5}
{"index": {"_index": "shared-transform-index"}}
{"user": "b", "stars": 2}
{"index": {"_index": "shared-transform-index"}}
{"user": "b", "stars": 3}
{"index": {"_index": "shared-transform-index"}}
{"user": "a", "stars": 5}
{"index": {"_index": "shared-transform-index"}}
{"user": "b", "stars": 1}
{"index": {"_index": "shared-transform-index"}}
{"user": "a", "stars": 3}
{"index": {"_index": "shared-transform-index"}}
{"user": "c", "stars": 4}
"""));
assertOK(performRequestAgainstFulfillingCluster(bulkRequest1));

// Create another index that the transform user does not have access
final Request createIndexRequest2 = new Request("PUT", "private-transform-index");
createIndexRequest2.setJsonEntity("""
{
"mappings": {
"properties": {
"user": { "type": "keyword" },
"stars": { "type": "integer" },
"coolness": { "type": "integer" }
}
}
}""");
assertOK(performRequestAgainstFulfillingCluster(createIndexRequest2));
}

// Query cluster
{
// Create a transform
final var putTransformRequest = new Request("PUT", "/_transform/simple-remote-transform");
putTransformRequest.setJsonEntity("""
{
"source": { "index": "my_remote_cluster:shared-transform-index" },
"dest": { "index": "simple-remote-transform" },
"pivot": {
"group_by": { "user": {"terms": {"field": "user"}}},
"aggs": {"avg_stars": {"avg": {"field": "stars"}}}
}
}
""");
assertOK(performRequestWithRemoteTransformUser(putTransformRequest));
final ObjectPath getTransformObjPath = assertOKAndCreateObjectPath(
performRequestWithRemoteTransformUser(new Request("GET", "/_transform/simple-remote-transform"))
);
assertThat(getTransformObjPath.evaluate("count"), equalTo(1));
assertThat(getTransformObjPath.evaluate("transforms.0.id"), equalTo("simple-remote-transform"));

// Start the transform
assertOK(performRequestWithRemoteTransformUser(new Request("POST", "/_transform/simple-remote-transform/_start")));

// Get the stats
final Request transformStatsRequest = new Request("GET", "/_transform/simple-remote-transform/_stats");
final ObjectPath transformStatsObjPath = assertOKAndCreateObjectPath(
performRequestWithRemoteTransformUser(transformStatsRequest)
);
assertThat(transformStatsObjPath.evaluate("node_failures"), nullValue());
assertThat(transformStatsObjPath.evaluate("count"), equalTo(1));
assertThat(transformStatsObjPath.evaluate("transforms.0.id"), equalTo("simple-remote-transform"));

// Stop the transform and force it to complete
assertOK(
performRequestWithRemoteTransformUser(
new Request("POST", "/_transform/simple-remote-transform/_stop?wait_for_completion=true&wait_for_checkpoint=true")
)
);

// Get stats again
final ObjectPath transformStatsObjPath2 = assertOKAndCreateObjectPath(
performRequestWithRemoteTransformUser(transformStatsRequest)
);
assertThat(transformStatsObjPath2.evaluate("node_failures"), nullValue());
assertThat(transformStatsObjPath2.evaluate("count"), equalTo(1));
assertThat(transformStatsObjPath2.evaluate("transforms.0.state"), equalTo("stopped"));
assertThat(transformStatsObjPath2.evaluate("transforms.0.checkpointing.last.checkpoint"), equalTo(1));

// Ensure transformed data is available locally
final ObjectPath searchObjPath = assertOKAndCreateObjectPath(
performRequestWithRemoteTransformUser(
new Request("GET", "/simple-remote-transform/_search?sort=user&rest_total_hits_as_int=true")
)
);
assertThat(searchObjPath.evaluate("hits.total"), equalTo(3));
assertThat(searchObjPath.evaluate("hits.hits.0._index"), equalTo("simple-remote-transform"));
assertThat(searchObjPath.evaluate("hits.hits.0._source.avg_stars"), equalTo(3.6));
assertThat(searchObjPath.evaluate("hits.hits.0._source.user"), equalTo("a"));
assertThat(searchObjPath.evaluate("hits.hits.1._source.avg_stars"), equalTo(2.0));
assertThat(searchObjPath.evaluate("hits.hits.1._source.user"), equalTo("b"));
assertThat(searchObjPath.evaluate("hits.hits.2._source.avg_stars"), equalTo(4.0));
assertThat(searchObjPath.evaluate("hits.hits.2._source.user"), equalTo("c"));

// Preview
assertOK(performRequestWithRemoteTransformUser(new Request("GET", "/_transform/simple-remote-transform/_preview")));

// Delete the transform
assertOK(performRequestWithRemoteTransformUser(new Request("DELETE", "/_transform/simple-remote-transform")));

// Create a transform targeting an index without permission
final var putTransformRequest2 = new Request("PUT", "/_transform/invalid");
putTransformRequest2.setJsonEntity("""
{
"source": { "index": "my_remote_cluster:private-transform-index" },
"dest": { "index": "simple-remote-transform" },
"pivot": {
"group_by": { "user": {"terms": {"field": "user"}}},
"aggs": {"avg_stars": {"avg": {"field": "stars"}}}
}
}
""");
assertOK(performRequestWithRemoteTransformUser(putTransformRequest2));
// It errors when trying to preview it
final ResponseException e = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteTransformUser(new Request("GET", "/_transform/invalid/_preview"))
);
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("Source indices have been deleted or closed"));
}
}

private Response performRequestWithRemoteTransformUser(final Request request) throws IOException {
request.setOptions(
RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(REMOTE_TRANSFORM_USER, PASS))
);
return client().performRequest(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,11 @@ read_remote_shared_metrics:
privileges: [ 'read', 'read_cross_cluster' ]
clusters: [ 'my_*' ]

transform_remote_shared_index:
indices:
- names: [ 'simple-remote-transform' ]
privileges: ['create_index', 'index', 'read']
remote_indices:
- names: [ 'shared-transform-index' ]
privileges: [ 'read', 'read_cross_cluster', 'view_index_metadata' ]
clusters: [ 'my_*' ]
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,16 @@ public void getRoleDescriptorsIntersectionForRemoteCluster(
+ subject.getUser().principal()
+ "] is an internal user and we should never try to retrieve its roles descriptors towards a remote cluster";
assert false : message;
logger.warn(message);
listener.onFailure(new IllegalArgumentException(message));
return;
}

final AuthorizationEngine authorizationEngine = getAuthorizationEngineForSubject(subject);
final AuthorizationInfo authorizationInfo = threadContext.getTransient(AUTHORIZATION_INFO_KEY);
assert authorizationInfo != null : "authorization info must be available in thread context";
// AuthZ info can be null for persistent tasks
if (threadContext.<AuthorizationInfo>getTransient(AUTHORIZATION_INFO_KEY) == null) {
logger.debug("authorization info not available in thread context, resolving it for subject [{}]", subject);
}
authorizationEngine.resolveAuthorizationInfo(
subject,
wrapPreservingContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.audit.AuditUtil;
import org.elasticsearch.xpack.security.authc.CrossClusterAccessAuthenticationService;
Expand Down Expand Up @@ -88,7 +90,10 @@ final class CrossClusterAccessServerTransportFilter extends ServerTransportFilte
ResolveIndexAction.NAME,
FieldCapabilitiesAction.NAME,
FieldCapabilitiesAction.NAME + "[n]",
"indices:data/read/eql"
"indices:data/read/eql",
// transform
XPackInfoAction.NAME,
GetCheckpointAction.NAME
)
).collect(Collectors.toUnmodifiableSet());
}
Expand Down

0 comments on commit d5e49f3

Please sign in to comment.