Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fully encapsulate LocalCheckpointTracker inside of the engine #31213

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -635,11 +635,28 @@ public CommitStats commitStats() {
}

/**
* The sequence number service for this engine.
* @return the local checkpoint for this Engine
*/
public abstract long getLocalCheckpoint();

/**
* Waits for all operations up to the provided sequence number to complete.
*
* @return the sequence number service
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;

/**
* Reset the local checkpoint in the tracker to the given local checkpoint
* @param localCheckpoint the new checkpoint to be set
*/
public abstract void resetLocalCheckpoint(long localCheckpoint);

/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();
public abstract SeqNoStats getSeqNoStats(long globalCheckpoint);

/**
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -2185,10 +2186,31 @@ public MergeStats getMergeStats() {
return mergeScheduler.stats();
}

public final LocalCheckpointTracker getLocalCheckpointTracker() {
// Used only for testing! Package private to prevent anyone else from using it
LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

@Override
public long getLocalCheckpoint() {
return localCheckpointTracker.getCheckpoint();
}

@Override
public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

@Override
public void resetLocalCheckpoint(long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
}

/**
* Returns the number of times a version was looked up either from the index.
* Note this is only available if assertions are enabled
Expand Down
22 changes: 10 additions & 12 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public void updateShardState(final ShardRouting newRouting,
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;

if (newRouting.primary() && currentRouting.isRelocationTarget() == false) {
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
}

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
Expand Down Expand Up @@ -479,8 +479,7 @@ public void updateShardState(final ShardRouting newRouting,
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(),
getEngine().getLocalCheckpointTracker().getCheckpoint());
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
public void onResponse(ResyncTask resyncTask) {
Expand All @@ -506,7 +505,7 @@ public void onFailure(Exception e) {
}
},
e -> failShard("exception during primary term transition", e));
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
primaryTerm = newPrimaryTerm;
}
}
Expand Down Expand Up @@ -873,7 +872,7 @@ public CommitStats commitStats() {
@Nullable
public SeqNoStats seqNoStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
}

public IndexingStats indexingStats(String... types) {
Expand Down Expand Up @@ -1707,7 +1706,7 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
getEngine().getLocalCheckpointTracker().waitForOpsToComplete(seqNo);
getEngine().waitForOpsToComplete(seqNo);
}

/**
Expand Down Expand Up @@ -1740,7 +1739,7 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
* @return the local checkpoint
*/
public long getLocalCheckpoint() {
return getEngine().getLocalCheckpointTracker().getCheckpoint();
return getEngine().getLocalCheckpoint();
}

/**
Expand Down Expand Up @@ -1781,7 +1780,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
return;
}
// only sync if there are not operations in flight
final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
final String allocationId = routingEntry().allocationId().getId();
Expand Down Expand Up @@ -1818,7 +1817,7 @@ public ReplicationGroup getReplicationGroup() {
*/
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
verifyReplicationTarget();
final long localCheckpoint = getEngine().getLocalCheckpointTracker().getCheckpoint();
final long localCheckpoint = getLocalCheckpoint();
if (globalCheckpoint > localCheckpoint) {
/*
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
Expand Down Expand Up @@ -1847,8 +1846,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
verifyPrimary();
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
getEngine().getLocalCheckpointTracker().getCheckpoint() ==
primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
}
Expand Down Expand Up @@ -2234,7 +2232,7 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
operationPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().getLocalCheckpointTracker().resetCheckpoint(localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
});
globalCheckpointUpdated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Index getIndex() {
}

long maxSeqNo() {
return shard.getEngine().getLocalCheckpointTracker().getMaxSeqNo();
return shard.getEngine().getSeqNoStats(-1).getMaxSeqNo();
}

long maxUnsafeAutoIdTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -350,7 +351,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
}
final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId);
IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno.
EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(oldPrimaryShard)); // Make gap in seqno.
long moreDocs = scaledRandomIntBetween(1, 10);
for (int i = 0; i < moreDocs; i++) {
IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i));
Expand Down
Loading