Skip to content

Commit

Permalink
IGNITE-9053 testReentrantLockConstantTopologyChangeNonFailoverSafe ca…
Browse files Browse the repository at this point in the history
…n hang in case of broken tx

Signed-off-by: Anton Vinogradov <av@apache.org>
  • Loading branch information
anton-vinogradov committed Aug 17, 2018
1 parent f8e29b7 commit 94ba157
Showing 1 changed file with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -47,6 +48,7 @@
import org.apache.ignite.internal.GridMessageListenHandler;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
Expand Down Expand Up @@ -74,6 +76,7 @@
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
Expand Down Expand Up @@ -1165,7 +1168,35 @@ public void addNotification(UUID nodeId,
throw e;
}

fut.get();
while (true) {
try {
fut.get(100, TimeUnit.MILLISECONDS);

break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
// Additional failover to break waiting on node left/fail
// in case left/fail event processing failed, hanged or delayed.
if (!ctx.discovery().alive(nodeId)) {
SyncMessageAckFuture fut0 = syncMsgFuts.remove(futId);

if (fut0 != null) {
ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
"Node left grid after receiving, but before processing the message [node=" +
nodeId + "]");

fut0.onDone(err);
}

break;
}

LT.warn(log, "Failed to wait for ack message. [node=" + nodeId +
", routine=" + routineId + "]");
}
}

assert fut.isDone() : "Future in not finished [fut= " + fut + "]";
}
else {
final GridContinuousBatch batch = info.add(obj);
Expand Down

0 comments on commit 94ba157

Please sign in to comment.