Skip to content

Commit

Permalink
Rename CCR APIs (#34027)
Browse files Browse the repository at this point in the history
* Renamed CCR APIs

Renamed:
* `/{index}/_ccr/create_and_follow` to `/{index}/_ccr/follow`
* `/{index}/_ccr/unfollow` to `/{index}/_ccr/pause_follow`
* `/{index}/_ccr/follow` to `/{index}/_ccr/resume_follow`

Relates to #33931
  • Loading branch information
martijnvg authored and atript8 committed Sep 28, 2018
1 parent 36c7adf commit 1eb1d7e
Show file tree
Hide file tree
Showing 33 changed files with 294 additions and 293 deletions.
Expand Up @@ -31,17 +31,17 @@ protected boolean preserveClusterUponCompletion() {
return true;
}

public void testFollowIndex() {
public void testResumeFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/follow");
final Request request = new Request("POST", "/follower/_ccr/resume_follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertNonCompliantLicense(request);
}
}

public void testCreateAndFollowIndex() {
public void testFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
final Request request = new Request("PUT", "/follower/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertNonCompliantLicense(request);
}
Expand Down
Expand Up @@ -80,11 +80,11 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, numDocs);
} else {
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
follow("leader_cluster:" + allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
Expand All @@ -93,9 +93,9 @@ public void testFollowIndex() throws Exception {
assertThat(countCcrNodeTasks(), equalTo(0));
});

followIndex("leader_cluster:" + allowedIndex, allowedIndex);
resumeFollow("leader_cluster:" + allowedIndex, allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
Expand All @@ -105,15 +105,15 @@ public void testFollowIndex() throws Exception {
});

Exception e = expectThrows(ResponseException.class,
() -> createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
() -> follow("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(),
containsString("action [indices:admin/xpack/ccr/create_and_follow_index] is unauthorized for user [test_ccr]"));
containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

e = expectThrows(ResponseException.class,
() -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
() -> resumeFollow("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
Expand Down Expand Up @@ -157,10 +157,10 @@ public void testAutoFollowPatterns() throws Exception {
verifyAutoFollowMonitoring();
});

// Cleanup by deleting auto follow pattern and unfollowing:
// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
assertOK(client().performRequest(request));
unfollowIndex(allowedIndex);
pauseFollow(allowedIndex);
}

private int countCcrNodeTasks() throws IOException {
Expand Down Expand Up @@ -201,14 +201,14 @@ private static void refresh(String index) throws IOException {
assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
private static void follow(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}
Expand Down Expand Up @@ -273,8 +273,8 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
private static void pauseFollow(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
Expand Down
Expand Up @@ -67,11 +67,11 @@ public void testFollowIndex() throws Exception {
} else {
logger.info("Running against follow cluster");
final String followIndexName = "test_index2";
createAndFollowIndex("leader_cluster:" + leaderIndexName, followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
// unfollow and then follow and then index a few docs in leader index:
unfollowIndex(followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
pauseFollow(followIndexName);
resumeFollow("leader_cluster:" + leaderIndexName, followIndexName);
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
Expand All @@ -86,11 +86,11 @@ public void testFollowIndex() throws Exception {
public void testFollowNonExistingLeaderIndex() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
ResponseException e = expectThrows(ResponseException.class,
() -> followIndex("leader_cluster:non-existing-index", "non-existing-index"));
() -> resumeFollow("leader_cluster:non-existing-index", "non-existing-index"));
assertThat(e.getMessage(), containsString("no such index"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));

e = expectThrows(ResponseException.class, () -> createAndFollowIndex("leader_cluster:non-existing-index", "non-existing-index"));
e = expectThrows(ResponseException.class, () -> followIndex("leader_cluster:non-existing-index", "non-existing-index"));
assertThat(e.getMessage(), containsString("no such index"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
}
Expand Down Expand Up @@ -146,20 +146,20 @@ private static void refresh(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
private static void pauseFollow(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
Expand Down
Expand Up @@ -16,7 +16,7 @@
- is_true: acknowledged

- do:
ccr.create_and_follow_index:
ccr.follow:
index: bar
body:
leader_index: foo
Expand All @@ -25,18 +25,18 @@
- is_true: index_following_started

- do:
ccr.unfollow_index:
ccr.pause_follow:
index: bar
- is_true: acknowledged

- do:
ccr.follow_index:
ccr.resume_follow:
index: bar
body:
leader_index: foo
- is_true: acknowledged

- do:
ccr.unfollow_index:
ccr.pause_follow:
index: bar
- is_true: acknowledged
Expand Up @@ -15,7 +15,7 @@
type: keyword

- do:
ccr.create_and_follow_index:
ccr.follow:
index: bar
body:
leader_index: foo
Expand Down Expand Up @@ -51,7 +51,7 @@
- gte: { bar.0.time_since_last_fetch_millis: -1 }

- do:
ccr.unfollow_index:
ccr.pause_follow:
index: bar
- is_true: acknowledged

Expand Up @@ -52,26 +52,26 @@
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction;
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -161,9 +161,9 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
new ActionHandler<>(AutoFollowStatsAction.INSTANCE, TransportAutoFollowStatsAction.class),
// follow actions
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, TransportCreateAndFollowIndexAction.class),
new ActionHandler<>(FollowIndexAction.INSTANCE, TransportFollowIndexAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, TransportUnfollowIndexAction.class),
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),
// auto-follow actions
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
Expand All @@ -183,9 +183,9 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestCcrStatsAction(settings, restController),
new RestAutoFollowStatsAction(settings, restController),
// follow APIs
new RestCreateAndFollowIndexAction(settings, restController),
new RestFollowIndexAction(settings, restController),
new RestUnfollowIndexAction(settings, restController),
new RestPutFollowAction(settings, restController),
new RestResumeFollowAction(settings, restController),
new RestPauseFollowAction(settings, restController),
// auto-follow APIs
new RestDeleteAutoFollowPatternAction(settings, restController),
new RestPutAutoFollowPatternAction(settings, restController),
Expand Down
Expand Up @@ -34,8 +34,8 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -184,13 +184,13 @@ void getLeaderClusterState(final Map<String, String> headers,

@Override
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
ResumeFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(followRequest);
PutFollowAction.Request request = new PutFollowAction.Request(followRequest);
followerClient.execute(
CreateAndFollowIndexAction.INSTANCE,
PutFollowAction.INSTANCE,
request,
ActionListener.wrap(r -> successHandler.run(), failureHandler)
);
Expand Down Expand Up @@ -306,7 +306,7 @@ private void followLeaderIndex(String clusterAlias,

String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
clusterAlias + ":" + leaderIndexName;
FollowIndexAction.Request request = new FollowIndexAction.Request();
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix);
request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
Expand Down Expand Up @@ -409,7 +409,7 @@ abstract void getLeaderClusterState(

abstract void createAndFollow(
Map<String, String> headers,
FollowIndexAction.Request followRequest,
ResumeFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);
Expand Down
Expand Up @@ -63,8 +63,8 @@ public static class Request extends SingleShardRequest<Request> {
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;

public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());
Expand Down

0 comments on commit 1eb1d7e

Please sign in to comment.