-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-22130 Fix retries. #3704
Open
ascherbakoff
wants to merge
7
commits into
apache:main
Choose a base branch
from
gridgain:IGNITE-22130
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
f4c36dc
IGNITE-22130 Fix retries.
ascherbakoff 00c429b
IGNITE-22130 Fix tests.
ascherbakoff d0d0bb9
IGNITE-22130 Remove wrapReplicationException.
ascherbakoff 6175e95
IGNITE-22130 Remove wrapReplicationException.
ascherbakoff e00fb94
IGNITE-22130 Remove wrapReplicationException.
ascherbakoff 8e1147b
IGNITE-22130 Fix style.
ascherbakoff b9754dc
IGNITE-22130 Add nullable.
ascherbakoff File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,17 +17,22 @@ | |
|
||
package org.apache.ignite.internal.replicator; | ||
|
||
import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; | ||
import static org.apache.ignite.internal.util.ExceptionUtils.matchAny; | ||
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; | ||
import static org.apache.ignite.internal.util.ExceptionUtils.withCause; | ||
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; | ||
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; | ||
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR; | ||
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ForkJoinPool; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeoutException; | ||
import org.apache.ignite.internal.hlc.HybridClock; | ||
import org.apache.ignite.internal.lang.NodeStoppingException; | ||
|
@@ -45,10 +50,14 @@ | |
import org.apache.ignite.internal.replicator.message.ReplicaResponse; | ||
import org.apache.ignite.internal.replicator.message.TimestampAware; | ||
import org.apache.ignite.network.ClusterNode; | ||
import org.jetbrains.annotations.Nullable; | ||
import org.jetbrains.annotations.TestOnly; | ||
|
||
/** The service is intended to execute requests on replicas. */ | ||
public class ReplicaService { | ||
/** Retry timeout. */ | ||
private static final int RETRY_TIMEOUT_MILLIS = 10; | ||
|
||
/** Message service. */ | ||
private final MessagingService messagingService; | ||
|
||
|
@@ -59,6 +68,8 @@ public class ReplicaService { | |
|
||
private final ReplicationConfiguration replicationConfiguration; | ||
|
||
private @Nullable final ScheduledExecutorService retryExecutor; | ||
|
||
/** Requests to retry. */ | ||
private final Map<String, CompletableFuture<NetworkMessage>> pendingInvokes = new ConcurrentHashMap<>(); | ||
|
||
|
@@ -82,7 +93,8 @@ public ReplicaService( | |
messagingService, | ||
clock, | ||
ForkJoinPool.commonPool(), | ||
replicationConfiguration | ||
replicationConfiguration, | ||
null | ||
); | ||
} | ||
|
||
|
@@ -93,24 +105,27 @@ public ReplicaService( | |
* @param clock A hybrid logical clock. | ||
* @param partitionOperationsExecutor Partition operation executor. | ||
* @param replicationConfiguration Replication configuration. | ||
* @param retryExecutor Retry executor. | ||
*/ | ||
public ReplicaService( | ||
MessagingService messagingService, | ||
HybridClock clock, | ||
Executor partitionOperationsExecutor, | ||
ReplicationConfiguration replicationConfiguration | ||
ReplicationConfiguration replicationConfiguration, | ||
@Nullable ScheduledExecutorService retryExecutor | ||
) { | ||
this.messagingService = messagingService; | ||
this.clock = clock; | ||
this.partitionOperationsExecutor = partitionOperationsExecutor; | ||
this.replicationConfiguration = replicationConfiguration; | ||
this.retryExecutor = retryExecutor; | ||
} | ||
|
||
/** | ||
* Sends request to the replica node. | ||
* | ||
* @param targetNodeConsistentId A consistent id of the replica node.. | ||
* @param req Replica request. | ||
* @param req Replica request. | ||
* @return Response future with either evaluation result or completed exceptionally. | ||
* @see NodeStoppingException If either supplier or demander node is stopping. | ||
* @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet. | ||
|
@@ -210,7 +225,14 @@ private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, Re | |
return null; | ||
}); | ||
} else { | ||
res.completeExceptionally(errResp.throwable()); | ||
if (retryExecutor != null && matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)) { | ||
retryExecutor.schedule( | ||
// Need to resubmit again to pool which is valid for synchronous IO execution. | ||
() -> partitionOperationsExecutor.execute(() -> res.completeExceptionally(errResp.throwable())), | ||
RETRY_TIMEOUT_MILLIS, MILLISECONDS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understood it correctly,, we are holding response for several milliseconds to prevent instantaneous retry. |
||
} else { | ||
res.completeExceptionally(errResp.throwable()); | ||
} | ||
} | ||
} else { | ||
res.complete((R) ((ReplicaResponse) response).result()); | ||
|
@@ -224,7 +246,7 @@ private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, Re | |
/** | ||
* Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing. | ||
* | ||
* @param node Replica node. | ||
* @param node Replica node. | ||
* @param request Request. | ||
* @return Response future with either evaluation result or completed exceptionally. | ||
* @see NodeStoppingException If either supplier or demander node is stopping. | ||
|
@@ -252,8 +274,8 @@ public <R> CompletableFuture<R> invoke(String replicaConsistentId, ReplicaReques | |
/** | ||
* Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing. | ||
* | ||
* @param node Replica node. | ||
* @param request Request. | ||
* @param node Replica node. | ||
* @param request Request. | ||
* @param storageId Storage id. | ||
* @return Response future with either evaluation result or completed exceptionally. | ||
* @see NodeStoppingException If either supplier or demander node is stopping. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that using exception classes is better than error codes, in general.
By the way, should
ACQUIRE_LOCK_TIMEOUT_ERR
be taken into account as well?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree. The whole exception design is based on error codes. Checking error codes is more clean then comparing exception classes.
No it should not. This error code and related functionality should be removed, because we got retries from client side. I plan to create a ticket for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree, using Java classes that represent an exception is a widespread practice. Error codes are a way to provide the user with an additional clue on critical situations, especially in the case of thin clients that are not supported exceptions.
Ok, I got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we have no agreement here, because changing error code checking to
instanceof
doesn't make much sense to me. This however should not block PR changes. Or is this a merge blocker ? We can fix it later then we have strict rules on working with errors.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely not a blocker.