Skip to content

Commit

Permalink
Add global checkpoint tracking on the primary
Browse files Browse the repository at this point in the history
This commit adds local tracking of the global checkpoints on all shard
copies when a global checkpoint tracker is operating in primary
mode. With this, we relay the global checkpoint on a shard copy back to
the primary shard during replication operations. This serves as another
step towards adding a background sync of the global checkpoint to the
shard copies.

Relates #26666
  • Loading branch information
jasontedor committed Sep 18, 2017
1 parent 973e756 commit d060ac0
Show file tree
Hide file tree
Showing 17 changed files with 544 additions and 210 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -175,7 +175,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}

task verifyBwcTestsEnabled {
Expand Down
Expand Up @@ -93,7 +93,8 @@ protected void sendReplicaRequest(
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT));
final long pre60NodeCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -173,6 +174,7 @@ public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
Expand Down Expand Up @@ -315,6 +317,14 @@ public interface Primary<
*/
void updateLocalCheckpointForShard(String allocationId, long checkpoint);

/**
* Update the local knowledge of the global checkpoint for the specified allocation ID.
*
* @param allocationId the allocation ID to update the global checkpoint for
* @param globalCheckpoint the global checkpoint
*/
void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint);

/**
* Returns the local checkpoint on the primary shard.
*
Expand Down Expand Up @@ -385,12 +395,24 @@ void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable
}

/**
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them.
*/
public interface ReplicaResponse {

/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
/**
* The local checkpoint for the shard. See {@link SequenceNumbersService#getLocalCheckpoint()}.
*
* @return the local checkpoint
**/
long localCheckpoint();

/**
* The global checkpoint for the shard. See {@link SequenceNumbersService#getGlobalCheckpoint()}.
*
* @return the global checkpoint
**/
long globalCheckpoint();

}

public static class RetryOnPrimaryException extends ElasticsearchException {
Expand Down
Expand Up @@ -531,7 +531,8 @@ public void onResponse(Releasable releasable) {
try {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint());
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
Expand Down Expand Up @@ -1006,6 +1007,11 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint)
indexShard.updateLocalCheckpointForShard(allocationId, checkpoint);
}

@Override
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
indexShard.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}

@Override
public long localCheckpoint() {
return indexShard.getLocalCheckpoint();
Expand All @@ -1025,47 +1031,60 @@ public ReplicationGroup getReplicationGroup() {

public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
private long globalCheckpoint;

ReplicaResponse() {

}

public ReplicaResponse(long localCheckpoint) {
public ReplicaResponse(long localCheckpoint, long globalCheckpoint) {
/*
* A replica should always know its own local checkpoint so this should always be a valid sequence number or the pre-6.0 local
* A replica should always know its own local checkpoints so this should always be a valid sequence number or the pre-6.0
* checkpoint value when simulating responses to replication actions that pre-6.0 nodes are not aware of (e.g., the global
* checkpoint background sync, and the primary/replica resync).
*/
assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.readFrom(in);
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT;
localCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.writeTo(out);
out.writeZLong(localCheckpoint);
} else {
// we use to write empty responses
Empty.INSTANCE.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
out.writeZLong(globalCheckpoint);
}
}

@Override
public long localCheckpoint() {
return localCheckpoint;
}

@Override
public long globalCheckpoint() {
return globalCheckpoint;
}

}

/**
Expand Down
Expand Up @@ -89,7 +89,8 @@ protected void sendReplicaRequest(
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT));
final long pre60NodeCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

Expand Down

0 comments on commit d060ac0

Please sign in to comment.