Skip to content

Commit

Permalink
Add Pause/Resume Auto Follower APIs (elastic#47510)
Browse files Browse the repository at this point in the history
This commit adds two APIs that allow to pause and resume 
CCR auto-follower patterns:

// pause auto-follower
POST /_ccr/auto_follow/my_pattern/pause

// resume auto-follower
POST /_ccr/auto_follow/my_pattern/resume

The ability to pause and resume auto-follow patterns can be 
useful in some situations, including the rolling upgrades of 
cluster using a bi-directional cross-cluster replication scheme 
(see elastic#46665).

This committ adds a new active flag to the AutoFollowPattern 
and adapts the AutoCoordinator and AutoFollower classes so 
that it stops to fetch remote's cluster state when all auto-follow 
patterns associate to the remote cluster are paused.

When an auto-follower is paused, remote indices that match the 
pattern are just ignored: they are not added to the pattern's 
followed indices uids list that is maintained in the local cluster 
state. This way, when the auto-follow pattern is resumed the 
indices created in the remote cluster in the meantime will be 
picked up again and added as new following indices. Indices 
created and then deleted in the remote cluster will be ignored 
as they won't be seen at all by the auto-follower pattern at 
resume time.
  • Loading branch information
tlrx authored and howardhuanghua committed Oct 14, 2019
1 parent c8d4f14 commit 8e0616a
Show file tree
Hide file tree
Showing 25 changed files with 1,165 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT
String remoteCluster = randomAlphaOfLength(4);
List<String> leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4));
String followIndexNamePattern = randomAlphaOfLength(4);
boolean active = randomBoolean();

Integer maxOutstandingReadRequests = null;
if (randomBoolean()) {
Expand Down Expand Up @@ -91,7 +92,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT
readPollTimeout = new TimeValue(randomNonNegativeLong());
}
patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters,
followIndexNamePattern, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize,
maxRetryDelay, readPollTimeout));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The API returns the following result:
{
"name": "my_auto_follow_pattern",
"pattern": {
"active": true,
"remote_cluster" : "remote_cluster",
"leader_index_patterns" :
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,89 @@
catch: missing
ccr.get_auto_follow_pattern:
name: my_pattern

---
"Test pause and resume auto follow pattern":
- skip:
version: " - 7.9.99"
reason: "pause/resume auto-follow patterns is supported since 8.0"

- do:
cluster.state: {}

- set: {master_node: master}

- do:
nodes.info: {}

- set: {nodes.$master.transport_address: local_ip}

- do:
cluster.put_settings:
body:
transient:
cluster.remote.local.seeds: $local_ip
flat_settings: true

- match: {transient: {cluster.remote.local.seeds: $local_ip}}

- do:
ccr.put_auto_follow_pattern:
name: pattern_test
body:
remote_cluster: local
leader_index_patterns: ['logs-*']
max_outstanding_read_requests: 2
- is_true: acknowledged

- do:
ccr.get_auto_follow_pattern:
name: pattern_test
- match: { patterns.0.name: 'pattern_test' }
- match: { patterns.0.pattern.remote_cluster: 'local' }
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
- match: { patterns.0.pattern.active: true }

- do:
catch: missing
ccr.pause_auto_follow_pattern:
name: unknown_pattern

- do:
ccr.pause_auto_follow_pattern:
name: pattern_test
- is_true: acknowledged

- do:
ccr.get_auto_follow_pattern:
name: pattern_test
- match: { patterns.0.name: 'pattern_test' }
- match: { patterns.0.pattern.remote_cluster: 'local' }
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
- match: { patterns.0.pattern.active: false }

- do:
catch: missing
ccr.resume_auto_follow_pattern:
name: unknown_pattern

- do:
ccr.resume_auto_follow_pattern:
name: pattern_test
- is_true: acknowledged

- do:
ccr.get_auto_follow_pattern:
name: pattern_test
- match: { patterns.0.name: 'pattern_test' }
- match: { patterns.0.pattern.remote_cluster: 'local' }
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
- match: { patterns.0.pattern.active: true }

- do:
ccr.delete_auto_follow_pattern:
name: pattern_test
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction;
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportActivateAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
Expand All @@ -81,9 +82,11 @@
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction;
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestPauseAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand All @@ -97,6 +100,7 @@
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
Expand Down Expand Up @@ -236,6 +240,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class),
new ActionHandler<>(ActivateAutoFollowPatternAction.INSTANCE, TransportActivateAutoFollowPatternAction.class),
// forget follower action
new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class),
usageAction,
Expand Down Expand Up @@ -264,6 +269,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestDeleteAutoFollowPatternAction(restController),
new RestPutAutoFollowPatternAction(restController),
new RestGetAutoFollowPatternAction(restController),
new RestPauseAutoFollowPatternAction(restController),
new RestResumeAutoFollowPatternAction(restController),
// forget follower API
new RestForgetFollowerAction(restController));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ synchronized void updateStats(List<AutoFollowResult> results) {
}

void updateAutoFollowers(ClusterState followerClusterState) {
AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
final AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) {
return;
}
Expand All @@ -206,8 +206,9 @@ void updateAutoFollowers(ClusterState followerClusterState) {
}

final CopyOnWriteHashMap<String, AutoFollower> autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers);
Set<String> newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream()
.map(entry -> entry.getValue().getRemoteCluster())
Set<String> newRemoteClusters = autoFollowMetadata.getPatterns().values().stream()
.filter(AutoFollowPattern::isActive)
.map(AutoFollowPattern::getRemoteCluster)
.filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false)
.collect(Collectors.toSet());

Expand Down Expand Up @@ -283,6 +284,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
String remoteCluster = entry.getKey();
AutoFollower autoFollower = entry.getValue();
boolean exist = autoFollowMetadata.getPatterns().values().stream()
.filter(AutoFollowPattern::isActive)
.anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
if (exist == false) {
LOGGER.info("removing auto-follower for remote cluster [{}]", remoteCluster);
Expand Down Expand Up @@ -345,6 +347,7 @@ abstract static class AutoFollower {
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;
private volatile boolean stop;
private volatile List<String> lastActivePatterns = List.of();

AutoFollower(final String remoteCluster,
final Consumer<List<AutoFollowResult>> statsUpdater,
Expand Down Expand Up @@ -384,7 +387,9 @@ void start() {

final List<String> patterns = autoFollowMetadata.getPatterns().entrySet().stream()
.filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster))
.filter(entry -> entry.getValue().isActive())
.map(Map.Entry::getKey)
.sorted()
.collect(Collectors.toList());
if (patterns.isEmpty()) {
LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster);
Expand All @@ -394,8 +399,15 @@ void start() {
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
this.autoFollowResults = new AtomicArray<>(patterns.size());

// keep the list of the last known active patterns for this auto-follower
// if the list changed, we explicitly retrieve the last cluster state in
// order to avoid timeouts when waiting for the next remote cluster state
// version that might never arrive
final long nextMetadataVersion = Objects.equals(patterns, lastActivePatterns) ? metadataVersion + 1 : metadataVersion;
this.lastActivePatterns = List.copyOf(patterns);

final Thread thread = Thread.currentThread();
getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
getRemoteClusterState(remoteCluster, Math.max(1L, nextMetadataVersion), (remoteClusterStateResponse, remoteError) -> {
// Also check removed flag here, as it may take a while for this remote cluster state api call to return:
if (removed) {
LOGGER.info("AutoFollower instance for cluster [{}] has been removed", remoteCluster);
Expand Down Expand Up @@ -445,8 +457,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);

final List<Index> leaderIndicesToFollow =
getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(autoFollowPatternName), thread);
} else {
Expand Down Expand Up @@ -599,7 +610,7 @@ static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) {
continue;
}
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex());
if (indexRoutingTable != null &&
// Leader indices can be in the cluster state, but not all primary shards may be ready yet.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class TransportActivateAutoFollowPatternAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

@Inject
public TransportActivateAutoFollowPatternAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver resolver) {
super(ActivateAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, resolver);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected AcknowledgedResponse read(final StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}

@Override
protected ClusterBlockException checkBlock(final Request request, final ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(final Task task, final Request request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
clusterService.submitStateUpdateTask("activate-auto-follow-pattern-" + request.getName(),
new AckedClusterStateUpdateTask<>(request, listener) {

@Override
protected AcknowledgedResponse newResponse(final boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
return innerActivate(request, currentState);
}
});
}

static ClusterState innerActivate(final Request request, ClusterState currentState) {
final AutoFollowMetadata autoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) {
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName());
}

final Map<String, AutoFollowMetadata.AutoFollowPattern> patterns = autoFollowMetadata.getPatterns();
final AutoFollowMetadata.AutoFollowPattern previousAutoFollowPattern = patterns.get(request.getName());
if (previousAutoFollowPattern == null) {
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName());
}

if (previousAutoFollowPattern.isActive() == request.isActive()) {
return currentState;
}

final Map<String, AutoFollowMetadata.AutoFollowPattern> newPatterns = new HashMap<>(patterns);
newPatterns.put(request.getName(),
new AutoFollowMetadata.AutoFollowPattern(
previousAutoFollowPattern.getRemoteCluster(),
previousAutoFollowPattern.getLeaderIndexPatterns(),
previousAutoFollowPattern.getFollowIndexPattern(),
request.isActive(),
previousAutoFollowPattern.getMaxReadRequestOperationCount(),
previousAutoFollowPattern.getMaxWriteRequestOperationCount(),
previousAutoFollowPattern.getMaxOutstandingReadRequests(),
previousAutoFollowPattern.getMaxOutstandingWriteRequests(),
previousAutoFollowPattern.getMaxReadRequestSize(),
previousAutoFollowPattern.getMaxWriteRequestSize(),
previousAutoFollowPattern.getMaxWriteBufferCount(),
previousAutoFollowPattern.getMaxWriteBufferSize(),
previousAutoFollowPattern.getMaxRetryDelay(),
previousAutoFollowPattern.getReadPollTimeout()));

return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(newPatterns, autoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getHeaders()))
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getRemoteCluster(),
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
true,
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxWriteRequestOperationCount(),
request.getParameters().getMaxOutstandingReadRequests(),
Expand Down
Loading

0 comments on commit 8e0616a

Please sign in to comment.