Skip to content

Commit

Permalink
Wait on index when polling global checkpoints (#71977)
Browse files Browse the repository at this point in the history
Currently when the fleet global checkpoints API returns immediately if
the index is not ready or shards are not ready. This commit modifies the
API to wait on the index and primary shards active up until the timeout
period.

Related to #71449.
  • Loading branch information
Tim-Brooks committed Apr 20, 2021
1 parent 6d7360b commit e493fb8
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 79 deletions.
27 changes: 26 additions & 1 deletion docs/reference/fleet/get-global-checkpoints.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ The purpose of the get global checkpoints api is to return the current global
checkpoints for an index. This API allows users to know the what sequence numbers
have been safely persisted in Elasticsearch.

[discrete]
[[polling-on-global-checkpoint]]
== Polling on global checkpoint advance

The API has an optional polling mode enabled by the `wait_for_advance` query
parameter. In polling mode, the API will only return after the global checkpoints
advance past the provided `checkpoints`. By default, `checkpoints` is an empty
Expand All @@ -20,6 +24,22 @@ boolean indicating that the request timed out.

Currently the `wait_for_advance` parameter is only supported for one shard indices.

[discrete]
[[polling-on-index]]
== Polling on index ready

By default in polling mode, an exception will be returned if the index does not
exist or all the primary shards are not active. In polling mode, the
`wait_for_index` parameter can be used to modify this behavior. If `wait_for_index`
is set to true, the API will wait for the index to be created and all primary
shards to be active.

If a timeout occurs before these conditions are met, the relevant exception will be
returned.

Currently the `wait_for_index` parameter is only supported when `wait_for_advance`
is true.

[[get-global-checkpoints-api-request]]
==== {api-request-title}

Expand All @@ -39,7 +59,12 @@ A single index or index alias that resolves to a single index.
`wait_for_advance`::
(Optional, Boolean) A boolean value which controls whether to wait (until the
`timeout`) for the global checkpoints to advance past the provided
`checkpoints`.
`checkpoints`. Defaults to `false`.

`wait_for_index`::
(Optional, Boolean) A boolean value which controls whether to wait (until the
`timeout`) for the target index to exist and all primary shards be active. Can
only be `true` when `wait_for_advance` is `true`. Defaults to `false`.

`checkpoints`::
(Optional, list) A comma separated list of previous global checkpoints.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
"description":"Whether to wait for the global checkpoint to advance past the specified current checkpoints",
"default":"false"
},
"wait_for_index":{
"type":"boolean",
"description":"Whether to wait for the target index to exist and all primary shards be active",
"default":"false"
},
"checkpoints":{
"type":"list",
"description":"Comma separated list of checkpoints",
Expand Down
2 changes: 0 additions & 2 deletions x-pack/plugin/fleet/qa/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import org.elasticsearch.gradle.test.RestIntegTestTask

apply plugin: 'elasticsearch.build'
tasks.named("test").configure { enabled = false }

Expand Down
8 changes: 4 additions & 4 deletions x-pack/plugin/fleet/qa/rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ import org.elasticsearch.gradle.info.BuildParams

apply plugin: 'elasticsearch.yaml-rest-test'

dependencies {
yamlRestTestImplementation(testArtifact(project(xpackModule('core'))))
}

restResources {
restApi {
include '_common', 'indices', 'index', 'fleet'
}
}

dependencies {
yamlRestTestImplementation(testArtifact(project(xpackModule('core'))))
}

testClusters.all {
testDistribution = 'DEFAULT'
setting 'xpack.security.enabled', 'false'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,14 @@ setup:

- match: { global_checkpoints.0: 1 }
- match: { timed_out: false }

---
"Wait for index timeout":
- do:
catch: missing
fleet.global_checkpoints:
index: "index-does-not-exist"
wait_for_advance: true
wait_for_index: true
checkpoints: []
timeout: "50ms"
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
Expand All @@ -29,6 +33,7 @@

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@ESIntegTestCase.ClusterScope(transportClientRatio = 0.0D)
public class GetGlobalCheckpointsActionIT extends ESIntegTestCase {
Expand Down Expand Up @@ -60,6 +65,7 @@ public void testGetGlobalCheckpoints() throws Exception {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
false,
false,
EMPTY_ARRAY,
TimeValue.parseTimeValue(randomTimeValue(), "test")
);
Expand All @@ -78,6 +84,7 @@ public void testGetGlobalCheckpoints() throws Exception {
final GetGlobalCheckpointsAction.Request request2 = new GetGlobalCheckpointsAction.Request(
indexName,
false,
false,
EMPTY_ARRAY,
TimeValue.parseTimeValue(randomTimeValue(), "test")
);
Expand Down Expand Up @@ -112,6 +119,7 @@ public void testPollGlobalCheckpointAdvancement() throws Exception {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
false,
false,
EMPTY_ARRAY,
TEN_SECONDS
);
Expand All @@ -128,6 +136,7 @@ public void testPollGlobalCheckpointAdvancement() throws Exception {
final GetGlobalCheckpointsAction.Request request2 = new GetGlobalCheckpointsAction.Request(
indexName,
true,
false,
new long[] { totalDocuments - 2 },
TimeValue.timeValueSeconds(30)
);
Expand Down Expand Up @@ -162,8 +171,9 @@ public void testPollGlobalCheckpointAdvancementTimeout() throws Exception {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
true,
false,
new long[] { 29 },
TimeValue.timeValueMillis(between(0, 100))
TimeValue.timeValueMillis(between(1, 100))
);
long start = System.nanoTime();
GetGlobalCheckpointsAction.Response response = client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet();
Expand All @@ -190,6 +200,7 @@ public void testMustProvideCorrectNumberOfShards() throws Exception {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
true,
false,
incorrectArrayLength,
TEN_SECONDS
);
Expand Down Expand Up @@ -220,6 +231,7 @@ public void testWaitForAdvanceOnlySupportsOneShard() throws Exception {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
true,
false,
new long[3],
TEN_SECONDS
);
Expand All @@ -231,16 +243,162 @@ public void testWaitForAdvanceOnlySupportsOneShard() throws Exception {
assertThat(exception.getMessage(), equalTo("wait_for_advance only supports indices with one shard. [shard count: 3]"));
}

public void testIndexDoesNotExist() throws Exception {
public void testIndexDoesNotExistNoWait() {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"non-existent",
false,
false,
EMPTY_ARRAY,
TimeValue.parseTimeValue(randomTimeValue(), "test")
TEN_SECONDS
);

long start = System.nanoTime();
ElasticsearchException exception = expectThrows(
IndexNotFoundException.class,
() -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
);
long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds();
assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds()));
}

public void testWaitOnIndexTimeout() {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"non-existent",
true,
true,
EMPTY_ARRAY,
TimeValue.timeValueMillis(between(1, 100))
);
ElasticsearchException exception = expectThrows(
IndexNotFoundException.class,
() -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
);
}

public void testWaitOnIndexCreated() throws Exception {
String indexName = "not-yet-existing";
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
true,
true,
EMPTY_ARRAY,
TEN_SECONDS
);
long start = System.nanoTime();
ActionFuture<GetGlobalCheckpointsAction.Response> future = client().execute(GetGlobalCheckpointsAction.INSTANCE, request);
Thread.sleep(randomIntBetween(10, 100));
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
)
.get();
client().prepareIndex(indexName, "_doc").setId(Integer.toString(0)).setSource("{}", XContentType.JSON).get();

GetGlobalCheckpointsAction.Response response = future.actionGet();
long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds();
assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds()));
assertThat(response.globalCheckpoints()[0], equalTo(0L));
assertFalse(response.timedOut());
}

public void testPrimaryShardsNotReadyNoWait() throws Exception {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"not-assigned",
false,
false,
EMPTY_ARRAY,
TEN_SECONDS
);
client().admin()
.indices()
.prepareCreate("not-assigned")
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "none")
)
.get();

UnavailableShardsException exception = expectThrows(
UnavailableShardsException.class,
() -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
);
assertEquals("Primary shards were not active [shards=1, active=0]", exception.getMessage());
}

public void testWaitOnPrimaryShardsReadyTimeout() throws Exception {
TimeValue timeout = TimeValue.timeValueMillis(between(1, 100));
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"not-assigned",
true,
true,
EMPTY_ARRAY,
timeout
);
client().admin()
.indices()
.prepareCreate("not-assigned")
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "none")
)
.get();

UnavailableShardsException exception = expectThrows(
UnavailableShardsException.class,
() -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
);
assertEquals("Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", exception.getMessage());
}

public void testWaitOnPrimaryShardsReady() throws Exception {
String indexName = "not-assigned";
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
true,
true,
EMPTY_ARRAY,
TEN_SECONDS
);
client().admin()
.indices()
.prepareCreate(indexName)
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "none")
)
.get();

long start = System.nanoTime();
ActionFuture<GetGlobalCheckpointsAction.Response> future = client().execute(GetGlobalCheckpointsAction.INSTANCE, request);
Thread.sleep(randomIntBetween(10, 100));
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", ""))
.get();
client().prepareIndex(indexName, "_doc").setId(Integer.toString(0)).setSource("{}", XContentType.JSON).get();

GetGlobalCheckpointsAction.Response response = future.actionGet();
long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds();
assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds()));
assertThat(response.globalCheckpoints()[0], equalTo(0L));
assertFalse(response.timedOut());
}
}

0 comments on commit e493fb8

Please sign in to comment.