Skip to content

Commit

Permalink
Remove SnapshotClient from HLRC (#85845)
Browse files Browse the repository at this point in the history
This removes the `SnapshotClient` from the high level rest client,
rewriting the tests to use the low-level client instead.

Relates to #83423
  • Loading branch information
dakrone committed Apr 14, 2022
1 parent 0b999c3 commit 6d6c7d5
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 749 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ public class RestHighLevelClient implements Closeable {
/** Do not access directly but through getVersionValidationFuture() */
private volatile ListenableFuture<Optional<String>> versionValidationFuture;

private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final SecurityClient securityClient = new SecurityClient(this);
private final EqlClient eqlClient = new EqlClient(this);

Expand Down Expand Up @@ -345,15 +344,6 @@ public final void close() throws IOException {
doClose.accept(client);
}

/**
* Provides a {@link SnapshotClient} which can be used to access the Snapshot API.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html">Snapshot API on elastic.co</a>
*/
public final SnapshotClient snapshot() {
return snapshotClient;
}

/**
* Provides methods for accessing the Elastic Licensed Security APIs that
* are shipped with the Elastic Stack distribution of Elasticsearch. All of
Expand Down

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,19 @@ public static void assertOK(Response response) {
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
}

/**
* Assert that the index in question has the given number of documents present
*/
public static void assertDocCount(RestClient client, String indexName, long docCount) throws IOException {
Request countReq = new Request("GET", "/" + indexName + "/_count");
ObjectPath resp = ObjectPath.createFromResponse(client.performRequest(countReq));
assertEquals(
"expected " + docCount + " documents but it was a different number",
docCount,
Long.parseLong(resp.evaluate("count").toString())
);
}

public static void assertAcknowledged(Response response) throws IOException {
assertOK(response);
String jsonBody = EntityUtils.toString(response.getEntity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;

import static org.elasticsearch.test.ESTestCase.assertEquals;
Expand Down Expand Up @@ -60,8 +57,7 @@ public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpCli
.build()
) {
Properties configuration = loadConfiguration();
restoreSnapshot(new RestHighLevelClient(client, ignore -> {}, List.of()) {
}, configuration);
restoreSnapshot(client, configuration);
}
}

Expand All @@ -73,44 +69,40 @@ static Properties loadConfiguration() throws IOException {
}
}

static void restoreSnapshot(RestHighLevelClient restHighLevelClient, Properties cfg) throws IOException {
if (restHighLevelClient.getLowLevelClient()
.performRequest(new Request("HEAD", "/" + cfg.getProperty("index_name")))
.getStatusLine()
.getStatusCode() == 404) {
restHighLevelClient.snapshot()
.createRepository(
new PutRepositoryRequest(cfg.getProperty("gcs_repo_name")).type("gcs")
static void restoreSnapshot(RestClient client, Properties cfg) throws IOException {
int status = client.performRequest(new Request("HEAD", "/" + cfg.getProperty("index_name"))).getStatusLine().getStatusCode();
if (status == 404) {
Request createRepo = new Request("PUT", "/_snapshot/" + cfg.getProperty("gcs_repo_name"));
createRepo.setJsonEntity(
Strings.toString(
new PutRepositoryRequest().type("gcs")
.settings(
Settings.builder()
.put("bucket", cfg.getProperty("gcs_bucket_name"))
.put("base_path", cfg.getProperty("gcs_base_path"))
.put("client", cfg.getProperty("gcs_client_name"))
.build()
),
RequestOptions.DEFAULT
);
RestoreSnapshotResponse resp = restHighLevelClient.snapshot()
.restore(
new RestoreSnapshotRequest(cfg.getProperty("gcs_repo_name"), cfg.getProperty("gcs_snapshot_name")).waitForCompletion(
true
),
RequestOptions.DEFAULT
);
)
)
);
client.performRequest(createRepo);

Request restoreRequest = new Request(
"POST",
"/_snapshot/" + cfg.getProperty("gcs_repo_name") + "/" + cfg.getProperty("gcs_snapshot_name") + "/_restore"
);
restoreRequest.addParameter("wait_for_completion", "true");
ObjectPath restore = ObjectPath.createFromResponse(client.performRequest(restoreRequest));
assertEquals(
"Unable to restore snapshot: "
+ resp.getRestoreInfo().toString()
+ restore
+ System.lineSeparator()
+ "Please check server logs to find the underlying issue.",
1,
resp.getRestoreInfo().successfulShards()
(int) restore.evaluate("snapshot.shards.successful")
);

assertEquals(
Long.parseLong(cfg.getProperty("index_doc_count")),
restHighLevelClient.count(new CountRequest(cfg.getProperty("index_name")), RequestOptions.DEFAULT).getCount()
);
ESRestTestCase.assertDocCount(client, cfg.getProperty("index_name"), Long.parseLong(cfg.getProperty("index_doc_count")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static void init() throws IOException {

@Before
public void restoreDataFromGcsRepo() throws Exception {
EqlDataLoader.restoreSnapshot(highLevelClient(), CFG);
EqlDataLoader.restoreSnapshot(client(), CFG);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
import org.apache.http.entity.StringEntity;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -27,24 +22,20 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.ShardsAcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -61,10 +52,11 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -204,58 +196,54 @@ private void beforeRestart(
if (sourceOnlyRepository) {
repoSettingsBuilder.put("delegate_type", "fs");
}
ElasticsearchAssertions.assertAcked(
client.snapshot()
.createRepository(
new PutRepositoryRequest(repoName).type(sourceOnlyRepository ? "source" : "fs").settings(repoSettingsBuilder),
RequestOptions.DEFAULT
)
Request createRepo = new Request("PUT", "/_snapshot/" + repoName);
createRepo.setJsonEntity(
Strings.toString(new PutRepositoryRequest().type(sourceOnlyRepository ? "source" : "fs").settings(repoSettingsBuilder.build()))
);
assertAcknowledged(client().performRequest(createRepo));

// list snapshots on new ES
List<SnapshotInfo> snapshotInfos = client.snapshot()
.get(new GetSnapshotsRequest(repoName).snapshots(new String[] { "_all" }), RequestOptions.DEFAULT)
.getSnapshots();
assertThat(snapshotInfos, hasSize(1));
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
assertEquals(snapshotName, snapshotInfo.snapshotId().getName());
assertEquals(repoName, snapshotInfo.repository());
assertEquals(Arrays.asList(indexName), snapshotInfo.indices());
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertEquals(numberOfShards, snapshotInfo.successfulShards());
assertEquals(numberOfShards, snapshotInfo.totalShards());
assertEquals(0, snapshotInfo.failedShards());
assertEquals(oldVersion, snapshotInfo.version());
Request getSnaps = new Request("GET", "/_snapshot/" + repoName + "/_all");
Response getResponse = client().performRequest(getSnaps);
ObjectPath getResp = ObjectPath.createFromResponse(getResponse);
assertThat(getResp.evaluate("total"), equalTo(1));
assertThat(getResp.evaluate("snapshots.0.snapshot"), equalTo(snapshotName));
assertThat(getResp.evaluate("snapshots.0.repository"), equalTo(repoName));
assertThat(getResp.evaluate("snapshots.0.indices"), contains(indexName));
assertThat(getResp.evaluate("snapshots.0.state"), equalTo(SnapshotState.SUCCESS.toString()));
assertEquals(numberOfShards, (int) getResp.evaluate("snapshots.0.shards.successful"));
assertEquals(numberOfShards, (int) getResp.evaluate("snapshots.0.shards.total"));
assertEquals(0, (int) getResp.evaluate("snapshots.0.shards.failed"));
assertEquals(oldVersion.toString(), getResp.evaluate("snapshots.0.version"));

// list specific snapshot on new ES
snapshotInfos = client.snapshot()
.get(new GetSnapshotsRequest(repoName).snapshots(new String[] { snapshotName }), RequestOptions.DEFAULT)
.getSnapshots();
assertThat(snapshotInfos, hasSize(1));
snapshotInfo = snapshotInfos.get(0);
assertEquals(snapshotName, snapshotInfo.snapshotId().getName());
assertEquals(repoName, snapshotInfo.repository());
assertEquals(Arrays.asList(indexName), snapshotInfo.indices());
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertEquals(numberOfShards, snapshotInfo.successfulShards());
assertEquals(numberOfShards, snapshotInfo.totalShards());
assertEquals(0, snapshotInfo.failedShards());
assertEquals(oldVersion, snapshotInfo.version());
getSnaps = new Request("GET", "/_snapshot/" + repoName + "/" + snapshotName);
getResponse = client().performRequest(getSnaps);
getResp = ObjectPath.createFromResponse(getResponse);
assertThat(getResp.evaluate("total"), equalTo(1));
assertThat(getResp.evaluate("snapshots.0.snapshot"), equalTo(snapshotName));
assertThat(getResp.evaluate("snapshots.0.repository"), equalTo(repoName));
assertThat(getResp.evaluate("snapshots.0.indices"), contains(indexName));
assertThat(getResp.evaluate("snapshots.0.state"), equalTo(SnapshotState.SUCCESS.toString()));
assertEquals(numberOfShards, (int) getResp.evaluate("snapshots.0.shards.successful"));
assertEquals(numberOfShards, (int) getResp.evaluate("snapshots.0.shards.total"));
assertEquals(0, (int) getResp.evaluate("snapshots.0.shards.failed"));
assertEquals(oldVersion.toString(), getResp.evaluate("snapshots.0.version"));

// list advanced snapshot info on new ES
SnapshotsStatusResponse snapshotsStatusResponse = client.snapshot()
.status(new SnapshotsStatusRequest(repoName).snapshots(new String[] { snapshotName }), RequestOptions.DEFAULT);
assertThat(snapshotsStatusResponse.getSnapshots(), hasSize(1));
SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0);
assertEquals(snapshotName, snapshotStatus.getSnapshot().getSnapshotId().getName());
assertEquals(repoName, snapshotStatus.getSnapshot().getRepository());
assertEquals(Sets.newHashSet(indexName), snapshotStatus.getIndices().keySet());
assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatus.getState());
assertEquals(numberOfShards, snapshotStatus.getShardsStats().getDoneShards());
assertEquals(numberOfShards, snapshotStatus.getShardsStats().getTotalShards());
assertEquals(0, snapshotStatus.getShardsStats().getFailedShards());
assertThat(snapshotStatus.getStats().getTotalSize(), greaterThan(0L));
assertThat(snapshotStatus.getStats().getTotalFileCount(), greaterThan(0));
getSnaps = new Request("GET", "/_snapshot/" + repoName + "/" + snapshotName + "/_status");
getResponse = client().performRequest(getSnaps);
getResp = ObjectPath.createFromResponse(getResponse);
assertThat(((List<?>) getResp.evaluate("snapshots")).size(), equalTo(1));
assertThat(getResp.evaluate("snapshots.0.snapshot"), equalTo(snapshotName));
assertThat(getResp.evaluate("snapshots.0.repository"), equalTo(repoName));
assertThat(((Map<?, ?>) getResp.evaluate("snapshots.0.indices")).keySet(), contains(indexName));
assertThat(getResp.evaluate("snapshots.0.state"), equalTo(SnapshotState.SUCCESS.toString()));
assertEquals(numberOfShards, (int) getResp.evaluate("snapshots.0.shards_stats.done"));
assertEquals(numberOfShards, (int) getResp.evaluate("snapshots.0.shards_stats.total"));
assertEquals(0, (int) getResp.evaluate("snapshots.0.shards_stats.failed"));
assertThat(getResp.evaluate("snapshots.0.stats.total.size_in_bytes"), greaterThan(0));
assertThat(getResp.evaluate("snapshots.0.stats.total.file_count"), greaterThan(0));

// restore / mount and check whether searches work
restoreMountAndVerify(
Expand All @@ -271,10 +259,9 @@ private void beforeRestart(
);

// close indices
RestClient llClient = client.getLowLevelClient();
assertTrue(closeIndex(llClient, "restored_" + indexName).isShardsAcknowledged());
assertTrue(closeIndex(llClient, "mounted_full_copy_" + indexName).isShardsAcknowledged());
assertTrue(closeIndex(llClient, "mounted_shared_cache_" + indexName).isShardsAcknowledged());
assertTrue(closeIndex(client(), "restored_" + indexName).isShardsAcknowledged());
assertTrue(closeIndex(client(), "mounted_full_copy_" + indexName).isShardsAcknowledged());
assertTrue(closeIndex(client(), "mounted_shared_cache_" + indexName).isShardsAcknowledged());

// restore / mount again
restoreMountAndVerify(
Expand Down Expand Up @@ -311,23 +298,20 @@ private void restoreMountAndVerify(
String snapshotName
) throws IOException {
// restore index
RestoreSnapshotResponse restoreSnapshotResponse = client.snapshot()
.restore(
new RestoreSnapshotRequest(repoName, snapshotName).indices(indexName)
.renamePattern("(.+)")
.renameReplacement("restored_$1")
.waitForCompletion(true),
RequestOptions.DEFAULT
);
assertNotNull(restoreSnapshotResponse.getRestoreInfo());
assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().totalShards());
assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().successfulShards());
Request restoreRequest = new Request("POST", "/_snapshot/" + repoName + "/" + snapshotName + "/_restore");
restoreRequest.setJsonEntity(
Strings.toString(new RestoreSnapshotRequest().indices(indexName).renamePattern("(.+)").renameReplacement("restored_$1"))
);
restoreRequest.addParameter("wait_for_completion", "true");
Response restoreResponse = client().performRequest(restoreRequest);
ObjectPath restore = ObjectPath.createFromResponse(restoreResponse);
assertEquals(numberOfShards, (int) restore.evaluate("snapshot.shards.total"));
assertEquals(numberOfShards, (int) restore.evaluate("snapshot.shards.successful"));

ensureGreen("restored_" + indexName);

String restoredIndex = "restored_" + indexName;
RestClient llClient = client.getLowLevelClient();
var response = responseAsMap(llClient.performRequest(new Request("GET", "/" + restoredIndex + "/_mapping")));
var response = responseAsMap(client().performRequest(new Request("GET", "/" + restoredIndex + "/_mapping")));
Map<?, ?> mapping = ObjectPath.evaluate(response, restoredIndex + ".mappings");
logger.info("mapping for {}: {}", restoredIndex, mapping);
assertThat(mapping, hasKey("_meta"));
Expand Down

0 comments on commit 6d6c7d5

Please sign in to comment.