Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global checkpoint tracking on the primary #26666

Merged
merged 25 commits into from Sep 18, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -93,7 +93,8 @@ protected void sendReplicaRequest(
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT));
final long pre60NodeCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -173,6 +174,7 @@ public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
Expand Down Expand Up @@ -315,6 +317,14 @@ public interface Primary<
*/
void updateLocalCheckpointForShard(String allocationId, long checkpoint);

/**
* Update the local knowledge of the global checkpoint for the specified allocation ID.
*
* @param allocationId the allocation ID to update the global checkpoint for
* @param globalCheckpoint the global checkpoint
*/
void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint);

/**
* Returns the local checkpoint on the primary shard.
*
Expand Down Expand Up @@ -385,12 +395,24 @@ void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable
}

/**
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

*/
public interface ReplicaResponse {

/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
/**
* The local checkpoint for the shard. See {@link SequenceNumbersService#getLocalCheckpoint()}.
*
* @return the local checkpoint
**/
long localCheckpoint();

/**
* The global checkpoint for the shard. See {@link SequenceNumbersService#getGlobalCheckpoint()}.
*
* @return the global checkpoint
**/
long globalCheckpoint();

}

public static class RetryOnPrimaryException extends ElasticsearchException {
Expand Down
Expand Up @@ -531,7 +531,8 @@ public void onResponse(Releasable releasable) {
try {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint());
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
Expand Down Expand Up @@ -1006,6 +1007,11 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint)
indexShard.updateLocalCheckpointForShard(allocationId, checkpoint);
}

@Override
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
indexShard.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}

@Override
public long localCheckpoint() {
return indexShard.getLocalCheckpoint();
Expand All @@ -1025,47 +1031,60 @@ public ReplicationGroup getReplicationGroup() {

public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
private long globalCheckpoint;

ReplicaResponse() {

}

public ReplicaResponse(long localCheckpoint) {
public ReplicaResponse(long localCheckpoint, long globalCheckpoint) {
/*
* A replica should always know its own local checkpoint so this should always be a valid sequence number or the pre-6.0 local
* A replica should always know its own local checkpoints so this should always be valida sequence number or the pre-6.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valida?

* checkpoint value when simulating responses to replication actions that pre-6.0 nodes are not aware of (e.g., the global
* checkpoint background sync, and the primary/replica resync).
*/
assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.readFrom(in);
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT;
localCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.writeTo(out);
out.writeZLong(localCheckpoint);
} else {
// we use to write empty responses
Empty.INSTANCE.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
out.writeZLong(globalCheckpoint);
}
}

@Override
public long localCheckpoint() {
return localCheckpoint;
}

@Override
public long globalCheckpoint() {
return globalCheckpoint;
}

}

/**
Expand Down
Expand Up @@ -89,7 +89,8 @@ protected void sendReplicaRequest(
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT));
final long pre60NodeCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

Expand Down