Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22130 Fix retries. #3704

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,30 @@ public static int extractCodeFrom(Throwable t) {
return INTERNAL_ERR;
}

/**
* Determine if a particular error matches any of passed error codes.
*
* @param t Unwrapped throwable.
* @param code The code.
* @param codes Other codes.
* @return {@code True} if exception allows retry.
*/
public static boolean matchAny(Throwable t, int code, int... codes) {
int errCode = extractCodeFrom(t);

if (code == errCode) {
return true;
}

for (int c0 : codes) {
if (c0 == errCode) {
return true;
}
}

return false;
}

// TODO: https://issues.apache.org/jira/browse/IGNITE-19870
// This method should be removed or re-worked and usages should be changed to IgniteExceptionMapperUtil.mapToPublicException.
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

package org.apache.ignite.internal.replicator;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.util.ExceptionUtils.matchAny;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.lang.NodeStoppingException;
Expand All @@ -45,10 +50,14 @@
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/** The service is intended to execute requests on replicas. */
public class ReplicaService {
/** Retry timeout. */
private static final int RETRY_TIMEOUT_MILLIS = 10;

/** Message service. */
private final MessagingService messagingService;

Expand All @@ -59,6 +68,8 @@ public class ReplicaService {

private final ReplicationConfiguration replicationConfiguration;

private final ScheduledExecutorService retryExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add @Nullable here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🆗


/** Requests to retry. */
private final Map<String, CompletableFuture<NetworkMessage>> pendingInvokes = new ConcurrentHashMap<>();

Expand All @@ -82,7 +93,8 @@ public ReplicaService(
messagingService,
clock,
ForkJoinPool.commonPool(),
replicationConfiguration
replicationConfiguration,
null
);
}

Expand All @@ -93,24 +105,27 @@ public ReplicaService(
* @param clock A hybrid logical clock.
* @param partitionOperationsExecutor Partition operation executor.
* @param replicationConfiguration Replication configuration.
* @param retryExecutor Retry executor.
*/
public ReplicaService(
MessagingService messagingService,
HybridClock clock,
Executor partitionOperationsExecutor,
ReplicationConfiguration replicationConfiguration
ReplicationConfiguration replicationConfiguration,
@Nullable ScheduledExecutorService retryExecutor
) {
this.messagingService = messagingService;
this.clock = clock;
this.partitionOperationsExecutor = partitionOperationsExecutor;
this.replicationConfiguration = replicationConfiguration;
this.retryExecutor = retryExecutor;
}

/**
* Sends request to the replica node.
*
* @param targetNodeConsistentId A consistent id of the replica node..
* @param req Replica request.
* @param req Replica request.
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
* @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet.
Expand Down Expand Up @@ -210,7 +225,14 @@ private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, Re
return null;
});
} else {
res.completeExceptionally(errResp.throwable());
if (retryExecutor != null && matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that using exception classes is better than error codes, in general.
By the way, should ACQUIRE_LOCK_TIMEOUT_ERR be taken into account as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that using exception classes is better than error codes, in general.

I disagree. The whole exception design is based on error codes. Checking error codes is more clean then comparing exception classes.

By the way, should ACQUIRE_LOCK_TIMEOUT_ERR be taken into account as well?

No it should not. This error code and related functionality should be removed, because we got retries from client side. I plan to create a ticket for this.

Copy link
Contributor

@sk0x50 sk0x50 May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. The whole exception design is based on error codes. Checking error codes is more clean then comparing exception classes.

I disagree, using Java classes that represent an exception is a widespread practice. Error codes are a way to provide the user with an additional clue on critical situations, especially in the case of thin clients that are not supported exceptions.

No it should not. This error code and related functionality should be removed, because we got retries from client side. I plan to create a ticket for this.

Ok, I got it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, using Java classes that represent an exception is a widespread practice. Error codes are a way to provide the user with an additional clue on critical situations, especially in the case of thin clients that are not supported exceptions.

Looks like we have no agreement here, because changing error code checking to instanceof doesn't make much sense to me. This however should not block PR changes. Or is this a merge blocker ? We can fix it later then we have strict rules on working with errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely not a blocker.

retryExecutor.schedule(
// Need to resubmit again to pool which is valid for synchronous IO execution.
() -> partitionOperationsExecutor.execute(() -> res.completeExceptionally(errResp.throwable())),
RETRY_TIMEOUT_MILLIS, MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood it correctly,, we are holding response for several milliseconds to prevent instantaneous retry.
What will change if we don't do that? Because it is incorrect to delay a response instead of retrying, especially in cases where we don't make the retry again.

} else {
res.completeExceptionally(errResp.throwable());
}
}
} else {
res.complete((R) ((ReplicaResponse) response).result());
Expand All @@ -224,7 +246,7 @@ private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, Re
/**
* Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing.
*
* @param node Replica node.
* @param node Replica node.
* @param request Request.
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
Expand Down Expand Up @@ -252,8 +274,8 @@ public <R> CompletableFuture<R> invoke(String replicaConsistentId, ReplicaReques
/**
* Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing.
*
* @param node Replica node.
* @param request Request.
* @param node Replica node.
* @param request Request.
* @param storageId Storage id.
* @return Response future with either evaluation result or completed exceptionally.
* @see NodeStoppingException If either supplier or demander node is stopping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ private PartialNode startPartialNode(
messagingServiceReturningToStorageOperationsPool,
hybridClock,
threadPoolsManager.partitionOperationsExecutor(),
replicationConfiguration
replicationConfiguration,
threadPoolsManager.commonScheduler()
);

var lockManager = new HeapLockManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ public void afterTest() {
private void checkResourcesAreReleased(IgniteImpl ignite) {
checkCursorsAreClosed(ignite);

assertTrue(ignite.txManager().lockManager().isEmpty());
try {
assertTrue(waitForCondition(() -> ignite.txManager().lockManager().isEmpty(), 1000));
} catch (InterruptedException e) {
fail("Unexpected interruption");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ public class IgniteImpl implements Ignite {
messagingServiceReturningToStorageOperationsPool,
clock,
threadPoolsManager.partitionOperationsExecutor(),
replicationConfig
replicationConfig,
threadPoolsManager.commonScheduler()
);

LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,8 @@ private class Node {
clusterService.messagingService(),
hybridClock,
threadPoolsManager.partitionOperationsExecutor(),
replicationConfiguration
replicationConfiguration,
threadPoolsManager.commonScheduler()
);

var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
Expand Down