Skip to content

Commit

Permalink
Remove Redundant Listener Interface in ILM (#68960) (#71981)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Apr 20, 2021
1 parent e493fb8 commit 8523ef5
Show file tree
Hide file tree
Showing 51 changed files with 218 additions and 991 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand All @@ -23,7 +24,7 @@ abstract class AbstractUnfollowIndexStep extends AsyncActionStep {

@Override
public final void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
ClusterStateObserver observer, ActionListener<Boolean> listener) {
String followerIndex = indexMetadata.getIndex().getName();
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
Expand All @@ -34,5 +35,5 @@ public final void performAction(IndexMetadata indexMetadata, ClusterState curren
innerPerformAction(followerIndex, currentClusterState, listener);
}

abstract void innerPerformAction(String followerIndex, ClusterState currentClusterState, Listener listener);
abstract void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Boolean> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.core.ilm;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -44,10 +45,10 @@ public boolean isRetryable() {

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState, ClusterStateObserver observer,
Listener listener) {
stepToExecute.performAction(indexMetadata, currentClusterState, observer, new Listener() {
ActionListener<Boolean> listener) {
stepToExecute.performAction(indexMetadata, currentClusterState, observer, new ActionListener<Boolean>() {
@Override
public void onResponse(boolean complete) {
public void onResponse(Boolean complete) {
onResponseResult.set(complete);
listener.onResponse(complete);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -40,13 +41,5 @@ public boolean indexSurvives() {
}

public abstract void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener);

public interface Listener {

void onResponse(boolean complete);

void onFailure(Exception e);
}

ClusterStateObserver observer, ActionListener<Boolean> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand All @@ -35,7 +36,7 @@ public AsyncRetryDuringSnapshotActionStep(StepKey key, StepKey nextStepKey, Clie

@Override
public final void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
ClusterStateObserver observer, ActionListener<Boolean> listener) {
// Wrap the original listener to handle exceptions caused by ongoing snapshots
SnapshotExceptionListener snapshotExceptionListener = new SnapshotExceptionListener(indexMetadata.getIndex(), listener, observer,
currentClusterState.nodes().getLocalNode());
Expand All @@ -45,22 +46,22 @@ public final void performAction(IndexMetadata indexMetadata, ClusterState curren
/**
* Method to be performed during which no snapshots for the index are already underway.
*/
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener);
abstract void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener);

/**
* SnapshotExceptionListener is an injected listener wrapper that checks to see if a particular
* action failed due to a {@code SnapshotInProgressException}. If it did, then it registers a
* ClusterStateObserver listener waiting for the next time the snapshot is not running,
* re-running the step's {@link #performAction(IndexMetadata, ClusterState, ClusterStateObserver, Listener)}
* re-running the step's {@link #performAction(IndexMetadata, ClusterState, ClusterStateObserver, ActionListener)}
* method when the snapshot is no longer running.
*/
class SnapshotExceptionListener implements AsyncActionStep.Listener {
class SnapshotExceptionListener implements ActionListener<Boolean> {
private final Index index;
private final Listener originalListener;
private final ActionListener<Boolean> originalListener;
private final ClusterStateObserver observer;
private final DiscoveryNode localNode;

SnapshotExceptionListener(Index index, Listener originalListener, ClusterStateObserver observer,
SnapshotExceptionListener(Index index, ActionListener<Boolean> originalListener, ClusterStateObserver observer,
DiscoveryNode localNode) {
this.index = index;
this.originalListener = originalListener;
Expand All @@ -69,7 +70,7 @@ class SnapshotExceptionListener implements AsyncActionStep.Listener {
}

@Override
public void onResponse(boolean complete) {
public void onResponse(Boolean complete) {
originalListener.onResponse(complete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
final String shrunkenIndexSource = IndexMetadata.INDEX_RESIZE_SOURCE_NAME.get(indexMetadata.getSettings());
if (Strings.isNullOrEmpty(shrunkenIndexSource) == false) {
// the current managed index is a shrunk index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
final String indexName = indexMetadata.getIndex().getName();

LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
String followerIndex = indexMetadata.getIndex().getName();
Map<String, String> customIndexMetadata = indexMetadata.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class CloseIndexStep extends AsyncActionStep {

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
ClusterStateObserver observer, ActionListener<Boolean> listener) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN) {
CloseIndexRequest request = new CloseIndexRequest(indexMetadata.getIndex().getName());
getClient().admin().indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public boolean isRetryable() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
final String indexName = indexMetadata.getIndex().getName();

final LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
}

@Override
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, Listener listener) {
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, ActionListener<Boolean> listener) {
String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
String indexName = indexMetadata.getIndex().getName();
IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public int getMaxNumSegments() {
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
public void performAction(IndexMetadata indexMetadata, ClusterState currentState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
ForceMergeRequest request = new ForceMergeRequest(indexMetadata.getIndex().getName());
request.maxNumSegments(maxNumSegments);
getClient().admin().indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public FreezeStep(StepKey key, StepKey nextStepKey, Client client) {
}

@Override
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, Listener listener) {
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, ActionListener<Boolean> listener) {
getClient().admin().indices().execute(FreezeIndexAction.INSTANCE,
new FreezeRequest(indexMetadata.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
ActionListener.wrap(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public MountSearchableSnapshotRequest.Storage getStorage() {
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener) {
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Boolean> listener) {
String indexName = indexMetadata.getIndex().getName();

LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class OpenIndexStep extends AsyncActionStep {

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
ClusterStateObserver observer, ActionListener<Boolean> listener) {
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
OpenIndexRequest request = new OpenIndexRequest(indexMetadata.getIndex().getName());
getClient().admin().indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public boolean isRetryable() {
}

@Override
void innerPerformAction(String followerIndex, ClusterState currentClusterState, Listener listener) {
void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Boolean> listener) {
PersistentTasksCustomMetadata persistentTasksMetadata = currentClusterState.metadata().custom(PersistentTasksCustomMetadata.TYPE);
if (persistentTasksMetadata == null) {
listener.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public ReadOnlyStep(StepKey key, StepKey nextStepKey, Client client) {
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
public void performAction(IndexMetadata indexMetadata, ClusterState currentState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
getClient().admin().indices().execute(AddIndexBlockAction.INSTANCE,
new AddIndexBlockRequest(WRITE, indexMetadata.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
ActionListener.wrap(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public boolean isRetryable() {

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
ClusterStateObserver observer, ActionListener<Boolean> listener) {
String indexName = indexMetadata.getIndex().getName();
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
if (indexingComplete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public boolean isRetryable() {
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
public void performAction(IndexMetadata indexMetadata, ClusterState currentState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
final String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
final String indexName = indexMetadata.getIndex().getName();
final LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public boolean isRetryable() {
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState clusterState, ClusterStateObserver observer, Listener listener) {
public void performAction(IndexMetadata indexMetadata, ClusterState clusterState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
// These allocation deciders were chosen because these are the conditions that can prevent
// allocation long-term, and that we can inspect in advance. Most other allocation deciders
// will either only delay relocation (e.g. ThrottlingAllocationDecider), or don't work very
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -31,7 +32,7 @@ public boolean isRetryable() {
}

@Override
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, Listener listener) {
public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentState, ActionListener<Boolean> listener) {
// get source index
String indexName = indexMetadata.getIndex().getName();
// get target shrink index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public ByteSizeValue getMaxPrimaryShardSize() {
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
public void performAction(IndexMetadata indexMetadata, ClusterState currentState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetadata);
if (lifecycleState.getLifecycleDate() == null) {
throw new IllegalStateException("source index [" + indexMetadata.getIndex().getName() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public String getTargetIndexPrefix() {

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState, ClusterStateObserver observer,
Listener listener) {
ActionListener<Boolean> listener) {
String originalIndex = indexMetadata.getIndex().getName();
final String targetIndexName = targetIndexPrefix + originalIndex;
IndexMetadata targetIndexMetadata = currentClusterState.metadata().index(targetIndexName);
Expand All @@ -70,7 +70,7 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentClust
* The is_write_index will *not* be set on the target index as this operation is currently executed on read-only indices.
*/
static void deleteSourceIndexAndTransferAliases(Client client, IndexMetadata sourceIndex, TimeValue masterTimeoutValue,
String targetIndex, Listener listener) {
String targetIndex, ActionListener<Boolean> listener) {
String sourceIndexName = sourceIndex.getIndex().getName();
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest()
.masterNodeTimeout(masterTimeoutValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public boolean isRetryable() {
}

@Override
void innerPerformAction(String followerIndex, ClusterState currentClusterState, Listener listener) {
void innerPerformAction(String followerIndex, ClusterState currentClusterState, ActionListener<Boolean> listener) {
UnfollowAction.Request request = new UnfollowAction.Request(followerIndex);
getClient().execute(UnfollowAction.INSTANCE, request, ActionListener.wrap(
r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public String getRollupPolicy() {
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
public void performAction(IndexMetadata indexMetadata, ClusterState currentState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
final String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
final String indexName = indexMetadata.getIndex().getName();
final LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public boolean isRetryable() {
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
public void performAction(IndexMetadata indexMetadata, ClusterState currentState,
ClusterStateObserver observer, ActionListener<Boolean> listener) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetadata.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentState))
.settings(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
}
}
};
createRandomInstance().performAction(indexMetadata, currentClusterState, null, new AsyncActionStep.Listener() {
createRandomInstance().performAction(indexMetadata, currentClusterState, null, new ActionListener<Boolean>() {
@Override
public void onResponse(boolean complete) {
public void onResponse(Boolean complete) {

}

Expand Down

0 comments on commit 8523ef5

Please sign in to comment.