Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ public static Object rebalanceTopic(int idx) {
stopErr = cctx.kernalContext().clientDisconnected() ?
new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
"Client node disconnected: " + cctx.igniteInstanceName()) :
new IgniteInterruptedCheckedException("Node is stopping: " + cctx.igniteInstanceName());
new IgniteCheckedException("Node is stopping: " + cctx.igniteInstanceName());

// Stop exchange worker
U.cancel(exchWorker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,9 @@ else if (msg instanceof WalStateAbstractMessage)
exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
}
catch (IgniteInterruptedCheckedException e) {
onDone(e);
assert cctx.kernalContext().isStopping();

onDone(new IgniteCheckedException("Node stopped"));

throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.Ignite;
Expand Down Expand Up @@ -144,8 +145,6 @@ public void testLockRelease() throws Exception {
*/
@Test
public void testLockTopologyChange() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-9213");

final int nodeCnt = 5;
int threadCnt = 8;
final int keys = 100;
Expand All @@ -167,9 +166,87 @@ public void testLockTopologyChange() throws Exception {
Lock lock = cache.lock(i);
lock.lock();

cache.put(i, i);
try {
cache.put(i, i);
}
finally {
lock.unlock();
}
}
}
}
}, threadCnt, "test-lock-thread");

q.add(f);

U.sleep(1_000);
}

stop.set(true);

IgniteInternalFuture<Long> f;

Exception err = null;

while ((f = q.poll()) != null) {
try {
f.get(60_000);
}
catch (Exception e) {
error("Test operation failed: " + e, e);

if (err == null)
err = e;
}
}

if (err != null)
fail("Test operation failed, see log for details");
}
finally {
stopAllGrids();
}
}

/**
* @throws Exception If failed.
*/
@Test
public void testLockNodeStop() throws Exception {
final int nodeCnt = 3;
int threadCnt = 2;
final int keys = 100;

try {
final AtomicBoolean stop = new AtomicBoolean(false);

Queue<IgniteInternalFuture<Long>> q = new ArrayDeque<>(nodeCnt);

lock.unlock();
for (int i = 0; i < nodeCnt; i++) {
final Ignite ignite = startGrid(i);

IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
while (!Thread.currentThread().isInterrupted() && !stop.get()) {
try {
IgniteCache<Integer, Integer> cache = ignite.cache(REPLICATED_TEST_CACHE);

for (int i = 0; i < keys; i++) {
Lock lock = cache.lock(i);
lock.lock();

try {
cache.put(i, i);
}
finally {
lock.unlock();
}
}
}
catch (Exception e) {
log.info("Ignore error: " + e);

break;
}
}
}
Expand All @@ -180,12 +257,31 @@ public void testLockTopologyChange() throws Exception {
U.sleep(1_000);
}

U.sleep(ThreadLocalRandom.current().nextLong(500) + 500);

// Stop all nodes, check that threads executing cache operations do not hang.
stopAllGrids();

stop.set(true);

IgniteInternalFuture<Long> f;

while ((f = q.poll()) != null)
f.get(2_000);
Exception err = null;

while ((f = q.poll()) != null) {
try {
f.get(60_000);
}
catch (Exception e) {
error("Test operation failed: " + e, e);

if (err == null)
err = e;
}
}

if (err != null)
fail("Test operation failed, see log for details");
}
finally {
stopAllGrids();
Expand Down