Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TransportShardReplicationOperationAction #10749

Closed
wants to merge 12 commits into from

Conversation

Projects
None yet
5 participants
@bleskes
Copy link
Member

commented Apr 23, 2015

Refactors TransportShardReplicationOperationAction state management into clear separate Primary phase and Replication phase. The primary phase is responsible for routing the request to the node holding the primary, validating it and performing the operation on the primary. The Replication phase is responsible for sending the request to the replicas and managing their responses.

This refactoring is aimed at simplifying adding operation start and end hooks that are needed for the counter mentioned in #10032 .

This also adds unit test infrastructure for this class, and some basic tests. We can extend later as we continue developing.

For now, this is planned to go to 2.0.0 but we make backport it to 1.6.0, depending how work goes with #10032

bleskes added some commits Apr 21, 2015

@bleskes

This comment has been minimized.

Copy link
Member Author

commented Apr 23, 2015

@martijnvg @brwe can you take a look?

// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(exp);

This comment has been minimized.

Copy link
@brwe

brwe Apr 23, 2015

Contributor

what if retry() throws an exception?

} else {
if (internalRequest.request().operationThreaded()) {

This comment has been minimized.

Copy link
@brwe

brwe Apr 23, 2015

Contributor

can we remove this whole block and add a check before that the node id is not the local one while we are already at it or do it in a different pr?

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

I rather do it in another change.

@@ -519,101 +661,119 @@ void performReplicas(PrimaryOperationRequest por, Tuple<Response, ReplicaRequest
ClusterState newState = clusterService.state();
ShardRouting newPrimaryShard = null;

This comment has been minimized.

Copy link
@brwe

brwe Apr 23, 2015

Contributor

not used?

@@ -461,7 +470,7 @@ void retry(Throwable failure) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart();
start();
}

@Override

This comment has been minimized.

Copy link
@brwe

brwe Apr 23, 2015

Contributor

we should call finishAsFailed() also in onClusterServiceClose() I think?

assertListenerThrows("primary phase should fail operation when moving from a retryable block a non-retryable one", listener, ClusterBlockException.class);
}

ClusterState stateWithUnassingedShards(String index, int numberOfReplicas) {

This comment has been minimized.

Copy link
@brwe

brwe Apr 23, 2015

Contributor

not used?



ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) {
ShardRoutingState primaryState;

This comment has been minimized.

Copy link
@brwe

brwe Apr 23, 2015

Contributor

not used?

}


void finishAsFailed(Throwable failure) {

This comment has been minimized.

Copy link
@brwe

brwe Apr 23, 2015

Contributor

Is this supposed to be the hook for the counter dec? It seems to be called after remote execution failed (https://github.com/elastic/elasticsearch/pull/10749/files#diff-a1076062ccd18e89873d62fe1d78e1e1R453) sometimes when the operation could not be executed at all due to blocks (https://github.com/elastic/elasticsearch/pull/10749/files#diff-a1076062ccd18e89873d62fe1d78e1e1R363) and also if write locally on the primary failed (https://github.com/elastic/elasticsearch/pull/10749/files#diff-a1076062ccd18e89873d62fe1d78e1e1R408) I would still have to be as careful with where to increment and decrement the counter like before.

I was hoping that this refactoring separates remote and local execution for the primary operation more strictly because this is I think where the major difficulty with reasoning about what the code does stems from. I might not fully understand though...

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

my main concern was the lack of separation between the part that sends remote replication request and the rest of the code. This is what I separated here. In the future we will remove all local execution and use the short cut in the transport service

* the constructor doesn't take any action, just calculates state. Call {@link #start()} to start
* replicating.
*/
public ReplicationPhase(ShardIterator originalShardIt, ReplicaRequest replicaRequest, Response finalResponse,

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

maybe make this package protected?

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

++

// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
if (shard.primary()) {
if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) {

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

if this return true then that would be odd? maybe we should log this and add an assert?

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

it means the shard relocated beneath our feet. It's very rare and I don't think we do the right thing here per se, but that is what we were doing and I rather not change it for now...

private final Response finalResponse;
private final ShardIterator shardIt;
private final ActionListener<Response> listener;
private final AtomicBoolean finished = new AtomicBoolean(false);

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

I find it confusing the we have the same field names for this in both ReplicationPhase and PrimaryPhase.

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

actually I just got a bit confused because both classes are in the same file...

@@ -623,7 +783,7 @@ void performOnReplica(final ReplicationState state, final ShardRouting shard, fi
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
state.onReplicaSuccess();
onReplicaSuccess();

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

just an idea: we could skip the creation of the ReplicationPhase entirely if shadow replicas are being used.

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

++ . it should be ignored in the counting phase..

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

I change shadow replicas handling. I think it's easier to still keep them here, for the case of a relocating primary for example

import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.hamcrest.Matchers.*;

public class ShardReplicationOperationTests extends ElasticsearchTestCase {

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

big +1 for this unit test!

This comment has been minimized.

Copy link
@s1monw

s1monw Apr 23, 2015

Contributor

yeah it's really good!


private static ThreadPool threadPool;

private TransportService transportService;

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

this can become a local variable in setUp()

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

changed

assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME));
}
}

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

maybe we add a test that tests what happens if not enough shard copies are available (write consistency)?

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

added

}

public void start() {
this.observer = new ClusterStateObserver(clusterService, internalRequest.request().timeout(), logger);
doStart();
try {

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

make start() package protected?

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

I changed it to be AbstractRunnable for other reasons.. the class is package private now

}

/** inner class is responsible for send the requests to all replica shards and manage the responses */
public final class ReplicationPhase {

This comment has been minimized.

Copy link
@martijnvg

martijnvg Apr 23, 2015

Member

package protected?

This comment has been minimized.

Copy link
@bleskes

bleskes Apr 23, 2015

Author Member

done

bleskes added some commits Apr 23, 2015

@bleskes

This comment has been minimized.

Copy link
Member Author

commented Apr 23, 2015

@martijnvg @brwe pushed an update with all the feedback + extra testing.

public void handleException(TransportException exp) {
onReplicaFailure(nodeId, exp);
logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest);
if (!ignoreReplicaException(exp)) {

This comment has been minimized.

Copy link
@s1monw

s1monw Apr 23, 2015

Contributor

can we use == false

@s1monw

This comment has been minimized.

Copy link
Contributor

commented Apr 23, 2015

I have to review the logic agian but the test alone makes me +1 this

@martijnvg

This comment has been minimized.

Copy link
Member

commented Apr 24, 2015

LGTM

@bleskes bleskes closed this in 5bdfdc4 Apr 24, 2015

@kevinkluge kevinkluge removed the review label Apr 24, 2015

bleskes added a commit that referenced this pull request Apr 24, 2015

Refactor TransportShardReplicationOperationAction
Refactor TransportShardReplicationOperationAction state management into clear separate Primary phase and Replication phase. The primary phase is responsible for routing the request to the node holding the primary, validating it and performing the operation on the primary. The Replication phase is responsible for sending the request to the replicas and managing their responses.

This also adds unit test infrastructure for this class, and some basic tests. We can extend later as we continue developing.

Closes #10749

@bleskes bleskes added the v1.6.0 label Apr 24, 2015

@bleskes

This comment has been minimized.

Copy link
Member Author

commented Apr 24, 2015

pushed to 1.x as well.

@bleskes bleskes deleted the bleskes:shard_replication_start_end branch Apr 24, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.