Skip to content

Commit

Permalink
Tests for RCS with multile fulfilling clusters (#94904)
Browse files Browse the repository at this point in the history
This PR adds a new abstract test suite to test common CCS scenarios
under the RCS with two fulfilling clusters. The PR adds two concrete
instantiations: mixed model (one cluster uses configurable security,
other basic) and same model (both clusters use configurable security).
  • Loading branch information
n1v0lg committed Mar 31, 2023
1 parent 67d155b commit 4ae4e3a
Show file tree
Hide file tree
Showing 4 changed files with 526 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCase {

Expand Down Expand Up @@ -77,9 +78,13 @@ public static void initFulfillingClusterClient() {
if (fulfillingClusterClient != null) {
return;
}
assert fulfillingCluster != null;
final int numberOfFcNodes = fulfillingCluster.getHttpAddresses().split(",").length;
final String url = fulfillingCluster.getHttpAddress(randomIntBetween(0, numberOfFcNodes - 1));
fulfillingClusterClient = buildRestClient(fulfillingCluster);
}

static RestClient buildRestClient(ElasticsearchCluster targetCluster) {
assert targetCluster != null;
final int numberOfFcNodes = targetCluster.getHttpAddresses().split(",").length;
final String url = targetCluster.getHttpAddress(randomIntBetween(0, numberOfFcNodes - 1));

final int portSeparator = url.lastIndexOf(':');
final var httpHost = new HttpHost(url.substring(0, portSeparator), Integer.parseInt(url.substring(portSeparator + 1)), "http");
Expand All @@ -90,7 +95,7 @@ public static void initFulfillingClusterClient() {
throw new UncheckedIOException(e);
}
builder.setStrictDeprecationMode(true);
fulfillingClusterClient = builder.build();
return builder.build();
}

@AfterClass
Expand All @@ -111,6 +116,10 @@ protected Settings restClientSettings() {

protected static Map<String, Object> createCrossClusterAccessApiKey(String indicesPrivilegesJson) {
initFulfillingClusterClient();
return createCrossClusterAccessApiKey(fulfillingClusterClient, indicesPrivilegesJson);
}

static Map<String, Object> createCrossClusterAccessApiKey(RestClient targetClusterClient, String indicesPrivilegesJson) {
// Create API key on FC
final var createApiKeyRequest = new Request("POST", "/_security/api_key");
createApiKeyRequest.setJsonEntity(Strings.format("""
Expand All @@ -124,7 +133,7 @@ protected static Map<String, Object> createCrossClusterAccessApiKey(String indic
}
}""", indicesPrivilegesJson));
try {
final Response createApiKeyResponse = performRequestAgainstFulfillingCluster(createApiKeyRequest);
final Response createApiKeyResponse = performRequestWithAdminUser(targetClusterClient, createApiKeyRequest);
assertOK(createApiKeyResponse);
return responseAsMap(createApiKeyResponse);
} catch (IOException e) {
Expand All @@ -133,44 +142,67 @@ protected static Map<String, Object> createCrossClusterAccessApiKey(String indic
}

protected void configureRemoteClusters() throws Exception {
// This method assume the cross cluster access API key is already configured in keystore
configureRemoteClusters(randomBoolean());
configureRemoteCluster(fulfillingCluster, randomBoolean());
}

/**
* Returns API key ID of cross cluster access API key.
*/
protected void configureRemoteClusters(boolean isProxyMode) throws Exception {
// This method assume the cross cluster access API key is already configured in keystore
configureRemoteCluster(fulfillingCluster, isProxyMode);
}

protected void configureRemoteCluster(ElasticsearchCluster targetFulfillingCluster, boolean isProxyMode) throws Exception {
configureRemoteCluster("my_remote_cluster", targetFulfillingCluster, false, isProxyMode, false);
}

protected void configureRemoteCluster(
String clusterAlias,
ElasticsearchCluster targetFulfillingCluster,
boolean basicSecurity,
boolean isProxyMode,
boolean skipUnavailable
) throws Exception {
// For configurable remote cluster security, this method assumes the cross cluster access API key is already configured in keystore
final Settings.Builder builder = Settings.builder();
final String remoteClusterEndpoint = basicSecurity
? targetFulfillingCluster.getTransportEndpoint(0)
: targetFulfillingCluster.getRemoteClusterServerEndpoint(0);
if (isProxyMode) {
builder.put("cluster.remote.my_remote_cluster.mode", "proxy")
.put("cluster.remote.my_remote_cluster.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0));
builder.put("cluster.remote." + clusterAlias + ".mode", "proxy")
.put("cluster.remote." + clusterAlias + ".proxy_address", remoteClusterEndpoint);
} else {
builder.put("cluster.remote.my_remote_cluster.mode", "sniff")
.putList("cluster.remote.my_remote_cluster.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0));
builder.put("cluster.remote." + clusterAlias + ".mode", "sniff")
.putList("cluster.remote." + clusterAlias + ".seeds", remoteClusterEndpoint);
}
builder.put("cluster.remote." + clusterAlias + ".skip_unavailable", skipUnavailable);
updateClusterSettings(builder.build());

// Ensure remote cluster is connected
final int numberOfFcNodes = fulfillingCluster.getHttpAddresses().split(",").length;
final int numberOfFcNodes = targetFulfillingCluster.getHttpAddresses().split(",").length;
final Request remoteInfoRequest = new Request("GET", "/_remote/info");
assertBusy(() -> {
final Response remoteInfoResponse = adminClient().performRequest(remoteInfoRequest);
assertOK(remoteInfoResponse);
final Map<String, Object> remoteInfoMap = responseAsMap(remoteInfoResponse);
assertThat(remoteInfoMap, hasKey("my_remote_cluster"));
assertThat(ObjectPath.eval("my_remote_cluster.connected", remoteInfoMap), is(true));
assertThat(remoteInfoMap, hasKey(clusterAlias));
assertThat(ObjectPath.eval(clusterAlias + ".connected", remoteInfoMap), is(true));
if (false == isProxyMode) {
assertThat(ObjectPath.eval("my_remote_cluster.num_nodes_connected", remoteInfoMap), equalTo(numberOfFcNodes));
assertThat(ObjectPath.eval(clusterAlias + ".num_nodes_connected", remoteInfoMap), equalTo(numberOfFcNodes));
}
final String credentialsValue = ObjectPath.eval(clusterAlias + ".cluster_credentials", remoteInfoMap);
if (basicSecurity) {
assertThat(credentialsValue, nullValue());
} else {
assertThat(credentialsValue, equalTo("::es_redacted::"));
}
assertThat(ObjectPath.eval("my_remote_cluster.cluster_credentials", remoteInfoMap), equalTo("::es_redacted::"));
});
}

protected static Response performRequestAgainstFulfillingCluster(Request request) throws IOException {
return performRequestWithAdminUser(fulfillingClusterClient, request);
}

protected static Response performRequestWithAdminUser(RestClient targetFulfillingClusterClient, Request request) throws IOException {
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(USER, PASS)));
return fulfillingClusterClient.performRequest(request);
return targetFulfillingClusterClient.performRequest(request);
}

// TODO centralize common usage of this across all tests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.remotecluster;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Strings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xcontent.ObjectPath;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;

public abstract class AbstractRemoteClusterSecurityWithMultipleRemotesRestIT extends AbstractRemoteClusterSecurityTestCase {

protected static ElasticsearchCluster otherFulfillingCluster;
protected static RestClient otherFulfillingClusterClient;

@BeforeClass
public static void initOtherFulfillingClusterClient() {
if (otherFulfillingClusterClient != null) {
return;
}
otherFulfillingClusterClient = buildRestClient(otherFulfillingCluster);
}

@AfterClass
public static void closeOtherFulfillingClusterClient() throws IOException {
IOUtils.close(otherFulfillingClusterClient);
}

public void testCrossClusterSearch() throws Exception {
configureRemoteClusters();
configureRolesOnClusters();

// Fulfilling cluster
{
// Index some documents, so we can attempt to search them from the querying cluster
final Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
bulkRequest.setJsonEntity(Strings.format("""
{ "index": { "_index": "cluster1_index1" } }
{ "name": "doc1" }
{ "index": { "_index": "cluster1_index2" } }
{ "name": "doc2" }\n"""));
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
}

// Other fulfilling cluster
{
// Index some documents, so we can attempt to search them from the querying cluster
final Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
bulkRequest.setJsonEntity(Strings.format("""
{ "index": { "_index": "cluster2_index1" } }
{ "name": "doc1" }
{ "index": { "_index": "cluster2_index2" } }
{ "name": "doc2" }\n"""));
assertOK(performRequestAgainstOtherFulfillingCluster(bulkRequest));
}

// Query cluster
{
// Index some documents, to use them in a multi-cluster search
final var indexDocRequest = new Request("POST", "/local_index/_doc?refresh=true");
indexDocRequest.setJsonEntity("{\"name\": \"doc1\"}");
assertOK(client().performRequest(indexDocRequest));

// Search across local cluster and both remotes
searchAndAssertIndicesFound(
String.format(
Locale.ROOT,
"/local_index,%s:%s/_search?ccs_minimize_roundtrips=%s",
randomFrom("my_remote_*", "*"),
randomFrom("*_index1", "*"),
randomBoolean()
),
"cluster1_index1",
"cluster2_index1",
"local_index"
);

// Search across both remotes using cluster alias wildcard
searchAndAssertIndicesFound(
String.format(
Locale.ROOT,
"/%s:%s/_search?ccs_minimize_roundtrips=%s",
randomFrom("my_remote_*", "*"),
randomFrom("*_index1", "*"),
randomBoolean()
),
"cluster1_index1",
"cluster2_index1"
);

// Search across both remotes using explicit cluster aliases
searchAndAssertIndicesFound(
String.format(
Locale.ROOT,
"/my_remote_cluster:%s,my_remote_cluster_2:%s/_search?ccs_minimize_roundtrips=%s",
randomFrom("cluster1_index1", "*_index1", "*"),
randomFrom("cluster2_index1", "*_index1", "*"),
randomBoolean()
),
"cluster1_index1",
"cluster2_index1"
);

// Search single remote
final boolean searchFirstCluster = randomBoolean();
final String index1 = searchFirstCluster ? "cluster1_index1" : "cluster2_index1";
searchAndAssertIndicesFound(
String.format(
Locale.ROOT,
"/%s:%s/_search?ccs_minimize_roundtrips=%s",
searchFirstCluster ? "my_remote_cluster" : "my_remote_cluster_2",
randomFrom(index1, "*_index1", "*"),
randomBoolean()
),
index1
);

// To simplify the test setup, we only ever (randomly) set skip_unavailable on the other remote, not on both,
// i.e. the first remote cluster always has skip_unavailable = false.
// This impacts below failure scenarios; in some cases, skipping the other remote results in overall request success
final boolean skipUnavailableOnOtherCluster = isSkipUnavailable("my_remote_cluster_2");

// Search when one cluster throws 403
// No permissions for this index name, so searching for it on either remote will result in 403
final String missingIndex = "missingIndex";
final boolean missingIndexOnFirstCluster = randomBoolean();
// Make sure we search for missing index on at least one remote, possibly both
final boolean missingIndexOnSecondCluster = false == missingIndexOnFirstCluster || randomBoolean();
final String searchPath1 = String.format(
Locale.ROOT,
"/my_remote_cluster:%s,my_remote_cluster_2:%s/_search?ccs_minimize_roundtrips=%s",
missingIndexOnFirstCluster ? missingIndex : randomFrom("cluster1_index1", "*_index1", "*"),
missingIndexOnSecondCluster ? missingIndex : randomFrom("cluster2_index1", "*_index1", "*"),
randomBoolean()
);
if (skipUnavailableOnOtherCluster && false == missingIndexOnFirstCluster) {
// 403 from other cluster is skipped, so we get a result
searchAndAssertIndicesFound(searchPath1, "cluster1_index1");
} else {
searchAndExpect403(searchPath1);
}

// Search with cluster alias wildcard matching both remotes, where index is authorized on one but not the other
final String index2 = randomFrom("cluster1_index1", "cluster2_index1");
final String searchPath2 = String.format(
Locale.ROOT,
"/my_remote_cluster*:%s/_search?ccs_minimize_roundtrips=%s",
index2,
randomBoolean()
);
if (skipUnavailableOnOtherCluster && index2.equals("cluster1_index1")) {
// 403 from other cluster is skipped, so we get a result
searchAndAssertIndicesFound(searchPath2, index2);
} else {
searchAndExpect403(searchPath2);
}

// Search when both clusters throw 403; in this case we always fail because first cluster is not skipped
searchAndExpect403(String.format(Locale.ROOT, "/*:%s/_search?ccs_minimize_roundtrips=%s", "missingIndex", randomBoolean()));
}
}

private static boolean isSkipUnavailable(String clusterAlias) throws IOException {
final Request remoteInfoRequest = new Request("GET", "/_remote/info");
final Response remoteInfoResponse = adminClient().performRequest(remoteInfoRequest);
assertOK(remoteInfoResponse);
final Map<String, Object> remoteInfoMap = responseAsMap(remoteInfoResponse);
assertThat(remoteInfoMap, hasKey(clusterAlias));
assertThat(ObjectPath.eval(clusterAlias + ".connected", remoteInfoMap), is(true));
return ObjectPath.eval(clusterAlias + ".skip_unavailable", remoteInfoMap);
}

private static void searchAndExpect403(String searchPath) {
final ResponseException exception = expectThrows(
ResponseException.class,
() -> performRequestWithRemoteSearchUser(new Request("GET", searchPath))
);
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(403));
}

protected abstract void configureRolesOnClusters() throws IOException;

static void searchAndAssertIndicesFound(String searchPath, String... expectedIndices) throws IOException {
final Response response = performRequestWithRemoteSearchUser(new Request("GET", searchPath));
assertOK(response);
final SearchResponse searchResponse = SearchResponse.fromXContent(responseAsParser(response));
final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
.map(SearchHit::getIndex)
.collect(Collectors.toList());
assertThat(actualIndices, containsInAnyOrder(expectedIndices));
}

static Response performRequestWithRemoteSearchUser(final Request request) throws IOException {
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(REMOTE_SEARCH_USER, PASS)));
return client().performRequest(request);
}

static Response performRequestAgainstOtherFulfillingCluster(Request putRoleRequest) throws IOException {
return performRequestWithAdminUser(otherFulfillingClusterClient, putRoleRequest);
}
}

0 comments on commit 4ae4e3a

Please sign in to comment.