Skip to content

Commit

Permalink
Merge pull request #15574 from sancar/fix/callbackRejectionException/…
Browse files Browse the repository at this point in the history
…master

Instance/ClientNotActiveEx for rejected callbacks
  • Loading branch information
mdogan committed Sep 20, 2019
2 parents 36550fd + fdbc997 commit 60591eb
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 11 deletions.
Expand Up @@ -58,9 +58,9 @@ public class ClientInvocation implements Runnable {
private static final AtomicLongFieldUpdater<ClientInvocation> INVOKE_COUNT
= AtomicLongFieldUpdater.newUpdater(ClientInvocation.class, "invokeCount");

final LifecycleService lifecycleService;
private final ClientInvocationFuture clientInvocationFuture;
private final ILogger logger;
private final LifecycleService lifecycleService;
private final ClientClusterService clientClusterService;
private final AbstractClientInvocationService invocationService;
private final ClientExecutionService executionService;
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.client.impl.spi.impl;

import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.logging.ILogger;
Expand All @@ -25,6 +26,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -85,6 +87,14 @@ public void andThen(ExecutionCallback<ClientMessage> callback, Executor executor
super.andThen(new InternalDelegatingExecutionCallback(callback), executor);
}

@Override
protected Exception wrapToInstanceNotActiveException(RejectedExecutionException e) {
if (!invocation.lifecycleService.isRunning()) {
return new HazelcastClientNotActiveException("Client is shut down", e);
}
return e;
}

@Override
protected void onComplete() {
callIdSequence.complete();
Expand Down
Expand Up @@ -17,8 +17,8 @@
package com.hazelcast.spi.impl;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.logging.ILogger;
import com.hazelcast.internal.util.executor.UnblockableThread;
import com.hazelcast.logging.ILogger;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -262,10 +262,12 @@ public void run() {

});
} catch (RejectedExecutionException e) {
callback.onFailure(e);
callback.onFailure(wrapToInstanceNotActiveException(e));
}
}

protected abstract Exception wrapToInstanceNotActiveException(RejectedExecutionException e);

// this method should not be needed; but there is a difference between client and server how it handles async throwables
protected Throwable unwrap(Throwable throwable) {
if (throwable instanceof ExecutionException && throwable.getCause() != null) {
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.spi.impl.operationservice.impl;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IndeterminateOperationState;
import com.hazelcast.core.IndeterminateOperationStateException;
import com.hazelcast.core.OperationTimeoutException;
Expand All @@ -27,6 +28,7 @@

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -75,6 +77,14 @@ protected void onInterruptDetected() {
interrupted = true;
}

@Override
protected Exception wrapToInstanceNotActiveException(RejectedExecutionException e) {
if (!invocation.context.nodeEngine.isRunning()) {
return new HazelcastInstanceNotActiveException(e.getMessage());
}
return e;
}

@Override
protected E resolveAndThrowIfException(Object unresolved) throws ExecutionException, InterruptedException {
Object value = resolve(unresolved);
Expand Down
Expand Up @@ -18,13 +18,15 @@

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipAdapter;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.Address;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
Expand All @@ -38,6 +40,7 @@

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -140,6 +143,34 @@ public void testRequestShouldFailOnShutdown() {
test.get("key");
}

@Test
public void testCallbackAfterClientShutdown() {
final HazelcastInstance server = hazelcastFactory.newHazelcastInstance();
ClientConfig clientConfig = new ClientConfig();
clientConfig.getNetworkConfig().setConnectionAttemptLimit(1);
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig);

IMap<Object, Object> test = client.getMap("test");
server.shutdown();
ICompletableFuture<Object> future = test.putAsync("key", "value");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> reference = new AtomicReference<>();
future.andThen(new ExecutionCallback<Object>() {
@Override
public void onResponse(Object response) {

}

@Override
public void onFailure(Throwable t) {
reference.set(t);
latch.countDown();
}
});
assertOpenEventually(latch);
assertInstanceOf(HazelcastClientNotActiveException.class, reference.get());
}

@Test(expected = HazelcastClientNotActiveException.class)
public void testExceptionAfterClientShutdown() {
hazelcastFactory.newHazelcastInstance();
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.spi.impl;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.test.HazelcastTestSupport;
Expand All @@ -24,6 +25,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -59,6 +61,11 @@ protected void onInterruptDetected() {
complete(new InterruptedException());
}

@Override
protected IllegalStateException wrapToInstanceNotActiveException(RejectedExecutionException e) {
return new HazelcastInstanceNotActiveException(e.getMessage());
}

@Override
protected String invocationToString() {
return "someinvocation";
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.spi.impl;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
Expand All @@ -26,7 +27,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;
Expand All @@ -35,7 +35,6 @@
@Category({QuickTest.class, ParallelJVMTest.class})
public class AbstractInvocationFuture_ClosedExecutorTest extends AbstractInvocationFuture_AbstractTest {


@Test
public void whenCompleteBeforeShutdown_thenCallback() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -51,7 +50,7 @@ public void onResponse(Object response) {

@Override
public void onFailure(Throwable t) {
if (t instanceof RejectedExecutionException) {
if (t instanceof HazelcastInstanceNotActiveException) {
onFailure.set(true);
}
}
Expand All @@ -74,7 +73,7 @@ public void onResponse(Object response) {

@Override
public void onFailure(Throwable t) {
if (t instanceof RejectedExecutionException) {
if (t instanceof HazelcastInstanceNotActiveException) {
onFailure.set(true);
}
}
Expand Down
Expand Up @@ -18,6 +18,8 @@

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
Expand All @@ -29,8 +31,10 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -59,6 +63,31 @@ public void whenNullCallback() {
future.andThen(null);
}

@Test
public void whenNodeIsShutdown() throws Throwable {
Address address = getAddress(local);
local.shutdown();

DummyOperation op = new DummyOperation(null);
InternalCompletableFuture<Object> future = operationService.invokeOnTarget(null, op, address);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> reference = new AtomicReference<>();
future.andThen(new ExecutionCallback<Object>() {
@Override
public void onResponse(Object response) {

}

@Override
public void onFailure(Throwable t) {
reference.set(t);
latch.countDown();
}
});
assertOpenEventually(latch);
assertInstanceOf(HazelcastInstanceNotActiveException.class, reference.get());
}

@Test(expected = IllegalArgumentException.class)
public void whenNullCallback2() {
DummyOperation op = new DummyOperation(null);
Expand Down

0 comments on commit 60591eb

Please sign in to comment.