Skip to content

Commit

Permalink
Cleanup small spots in CCR (#91267)
Browse files Browse the repository at this point in the history
Removing some dead code and other obvious small things in CCR.
Also, cleaning up cluster state request instances with needlessly large
scope to make the code more readable.
  • Loading branch information
original-brownbear committed Nov 7, 2022
1 parent 2c1f9ab commit d03664f
Show file tree
Hide file tree
Showing 22 changed files with 45 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ public void testFollowStandardIndexCanNotOverrideMode() throws Exception {
public void testSyntheticSource() throws Exception {
final int numDocs = 128;
final String leaderIndexName = "synthetic_leader";
long basetime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-04-28T18:35:24.467Z");
if ("leader".equals(targetCluster)) {
logger.info("Running against leader cluster");
createIndex(adminClient(), leaderIndexName, Settings.EMPTY, """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import org.junit.After;
import org.junit.Before;

import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;

public class CcrRestIT extends ESClientYamlSuiteTestCase {

public CcrRestIT(final ClientYamlTestCandidate testCandidate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,6 @@ protected String backingIndexName(String dataStreamName, int generation) {
return DataStream.getDefaultBackingIndexName(dataStreamName, generation, time.getOrCompute());
}

protected String backingIndexName(String dataStreamName, int generation, long epochMillis) {
return DataStream.getDefaultBackingIndexName(dataStreamName, generation, epochMillis);
}

protected RestClient buildLeaderClient() throws IOException {
assert "leader".equals(targetCluster) == false;
return buildClient(System.getProperty("tests.leader_host"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -97,7 +96,6 @@
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
Expand Down Expand Up @@ -408,10 +406,6 @@ public void onIndexModule(IndexModule indexModule) {
}
}

protected XPackLicenseState getLicenseState() {
return XPackPlugin.getSharedLicenseState();
}

@Override
public Collection<RequestValidators.RequestValidator<PutMappingRequest>> mappingRequestValidators() {
return Collections.singletonList(CcrRequests.CCR_PUT_MAPPING_REQUEST_VALIDATOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand Down Expand Up @@ -118,16 +119,11 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
final Consumer<Exception> onFailure,
final BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer
) {

final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metadata(true);
request.indices(leaderIndex);
checkRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
client.getRemoteClusterClient(clusterAlias),
request,
CcrRequests.metadataRequest(leaderIndex),
onFailure,
remoteClusterStateResponse -> {
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;

import java.io.IOException;
import java.util.Set;

class CcrRepositoryManager extends AbstractLifecycleComponent {
Expand All @@ -45,7 +44,7 @@ protected void doStart() {
protected void doStop() {}

@Override
protected void doClose() throws IOException {}
protected void doClose() {}

private void putRepository(String repositoryName) {
ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,15 @@ void getRemoteClusterState(
final long metadataVersion,
final BiConsumer<ClusterStateResponse, Exception> handler
) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metadata(true);
request.routingTable(true);
request.waitForMetadataVersion(metadataVersion);
request.waitForTimeout(waitForMetadataTimeOut);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
CcrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
remoteCluster,
request,
new ClusterStateRequest().clear()
.metadata(true)
.routingTable(true)
.waitForMetadataVersion(metadataVersion)
.waitForTimeout(waitForMetadataTimeOut),
e -> handler.accept(null, e),
remoteClusterStateResponse -> handler.accept(remoteClusterStateResponse, null)
);
Expand Down Expand Up @@ -613,7 +611,7 @@ private void checkAutoFollowPattern(
List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()
.filter(otherPattern -> otherPattern.v2().match(indexAbstraction))
.map(Tuple::v1)
.collect(Collectors.toList());
.toList();
if (otherMatchingPatterns.size() != 0) {
groupedListener.onResponse(
new Tuple<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static void getIndexMetadata(
final List<Index> followingIndices = Arrays.stream(indices).filter(index -> {
final IndexMetadata indexMetadata = state.metadata().index(index);
return indexMetadata != null && CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetadata.getSettings());
}).collect(Collectors.toList());
}).toList();
if (followingIndices.isEmpty() == false && "ccr".equals(request.origin()) == false) {
final String errorMessage = "can't put mapping to the following indices "
+ "["
Expand All @@ -132,7 +132,7 @@ public static void getIndexMetadata(
final List<Index> followingIndices = Arrays.stream(indices).filter(index -> {
final IndexMetadata indexMetadata = state.metadata().index(index);
return indexMetadata != null && CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetadata.getSettings());
}).collect(Collectors.toList());
}).toList();
if (followingIndices.isEmpty() == false && "ccr".equals(request.origin()) == false) {
final String errorMessage = "can't modify aliases on indices "
+ "["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,56 +214,54 @@ public String toString() {

public static final class Response extends ActionResponse {

private long mappingVersion;
private final long mappingVersion;

public long getMappingVersion() {
return mappingVersion;
}

private long settingsVersion;
private final long settingsVersion;

public long getSettingsVersion() {
return settingsVersion;
}

private long aliasesVersion;
private final long aliasesVersion;

public long getAliasesVersion() {
return aliasesVersion;
}

private long globalCheckpoint;
private final long globalCheckpoint;

public long getGlobalCheckpoint() {
return globalCheckpoint;
}

private long maxSeqNo;
private final long maxSeqNo;

public long getMaxSeqNo() {
return maxSeqNo;
}

private long maxSeqNoOfUpdatesOrDeletes;
private final long maxSeqNoOfUpdatesOrDeletes;

public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes;
}

private Translog.Operation[] operations;
private final Translog.Operation[] operations;

public Translog.Operation[] getOperations() {
return operations;
}

private long tookInMillis;
private final long tookInMillis;

public long getTookInMillis() {
return tookInMillis;
}

Response() {}

Response(StreamInput in) throws IOException {
super(in);
mappingVersion = in.readVLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
Expand Down Expand Up @@ -201,8 +200,6 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = CcrRequests.metadataRequest(leaderIndex.getName());

CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
final IndexMetadata leaderIMD = clusterStateResponse.getState().metadata().getIndexSafe(leaderIndex);
final IndexMetadata followerIMD = clusterService.state().metadata().getIndexSafe(followIndex);
Expand Down Expand Up @@ -247,7 +244,9 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
}
};
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
remoteClient(params).admin()
.cluster()
.state(CcrRequests.metadataRequest(leaderIndex.getName()), ActionListener.wrap(onResponse, errorHandler));
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
Expand Down Expand Up @@ -281,8 +280,6 @@ protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exc
final var leaderIndex = params.getLeaderShardId().getIndex();
final var followerIndex = params.getFollowShardId().getIndex();

final var clusterStateRequest = CcrRequests.metadataRequest(leaderIndex.getName());

final CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
final var leaderIndexMetadata = clusterStateResponse.getState().metadata().getIndexSafe(leaderIndex);
final var followerIndexMetadata = clusterService.state().metadata().getIndexSafe(followerIndex);
Expand Down Expand Up @@ -374,7 +371,9 @@ protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exc
};

try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
remoteClient(params).admin()
.cluster()
.state(CcrRequests.metadataRequest(leaderIndex.getName()), ActionListener.wrap(onResponse, errorHandler));
} catch (final NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask;

import java.util.List;
import java.util.stream.Collectors;

public class TransportPauseFollowAction extends AcknowledgedTransportMasterNodeAction<PauseFollowAction.Request> {

Expand Down Expand Up @@ -87,7 +86,7 @@ protected void masterOperation(
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
})
.map(PersistentTasksCustomMetadata.PersistentTask::getId)
.collect(Collectors.toList());
.toList();

if (shardFollowTaskIds.isEmpty()) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,10 @@ public ClusterState execute(ClusterState currentState) {
});
};

final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metadata(true);

CcrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
request.getRemoteCluster(),
clusterStateRequest,
new ClusterStateRequest().clear().metadata(true),
listener::onFailure,
consumer
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,11 @@ public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperations
appliedOperations.add(targetOp);
location = locationToSync(location, result.getTranslogLocation());
} else {
if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) {
if (result.getFailure()instanceof final AlreadyProcessedFollowingEngineException failure) {
// The existing operations below the global checkpoint won't be replicated as they were processed
// in every replicas already. However, the existing operations above the global checkpoint will be
// replicated to replicas but with the existing primary term (not the current primary term) in order
// to guarantee the consistency between the primary and replicas, and between translog and Lucene index.
final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure();
if (logger.isTraceEnabled()) {
logger.trace(
"operation [{}] was processed before on following primary shard {} with existing term {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest {

private DiscoveryNode node;
private String sessionUUID;
private final String sessionUUID;

ClearCcrRestoreSessionRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) {

public static class PutCcrRestoreSessionResponse extends ActionResponse {

private DiscoveryNode node;
private Store.MetadataSnapshot storeFileMetadata;
private long mappingVersion;
private final DiscoveryNode node;
private final Store.MetadataSnapshot storeFileMetadata;
private final long mappingVersion;

PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetadata, long mappingVersion) {
this.node = node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) {
}

@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
public int fillSeqNoGaps(long primaryTerm) {
// a noop implementation, because follow shard does not own the history but the leader shard does.
return 0;
}

@Override
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
assert FollowingEngineAssertions.assertPrimaryIncomingSequenceNumber(origin, seqNo);
assert FollowingEngineAssertions.assertPrimaryIncomingSequenceNumber(seqNo);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static boolean preFlight(final Engine.Operation operation) {
return true;
}

static boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
static boolean assertPrimaryIncomingSequenceNumber(final long seqNo) {
// sequence number should be set when operation origin is primary
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "primary operations on a following index must have an assigned sequence number";
return true;
Expand Down

0 comments on commit d03664f

Please sign in to comment.