Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not retry TargetNotMemberException when invocation made on target #10404

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.hazelcast.client.cache.impl.ClientCacheProxyFactory;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ProxyFactoryConfig;
import com.hazelcast.client.connection.ClientConnectionManager;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ClientAddDistributedObjectListenerCodec;
Expand Down Expand Up @@ -78,7 +77,6 @@
import com.hazelcast.multimap.impl.MultiMapService;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ClassLoaderUtil;
import com.hazelcast.nio.Connection;
import com.hazelcast.replicatedmap.impl.ReplicatedMapService;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.scheduledexecutor.impl.DistributedScheduledExecutorService;
Expand Down Expand Up @@ -343,37 +341,17 @@ private void sleepForProxyInitRetry() {
}

private void initialize(ClientProxy clientProxy) throws Exception {
final Address initializationTarget = findNextAddressToSendCreateRequest();
final Connection connection = getTargetOrOwnerConnection(initializationTarget);
final ClientMessage clientMessage = ClientCreateProxyCodec.encodeRequest(clientProxy.getDistributedObjectName(),
Address initializationTarget = findNextAddressToSendCreateRequest();
if (initializationTarget == null) {
throw new IOException("Not able to find a member to create proxy on!");
}
ClientMessage clientMessage = ClientCreateProxyCodec.encodeRequest(clientProxy.getDistributedObjectName(),
clientProxy.getServiceName(), initializationTarget);
new ClientInvocation(client, clientMessage, connection).invoke().get();
new ClientInvocation(client, clientMessage, initializationTarget).invoke().get();
clientProxy.setContext(context);
clientProxy.onInitialize();
}

private Connection getTargetOrOwnerConnection(final Address target) throws IOException {
if (target == null) {
throw new IOException("Not able to setup owner connection!");
}

final ClientConnectionManager connectionManager = client.getConnectionManager();
Connection connection = connectionManager.getConnection(target);
if (connection == null) {
final Address ownerConnectionAddress = client.getClientClusterService().getOwnerConnectionAddress();
if (ownerConnectionAddress == null) {
throw new IOException("Not able to setup owner connection!");
}

connection = connectionManager.getConnection(ownerConnectionAddress);
if (connection == null) {
throw new IOException("Client is not connected to member " + target);
}
}

return connection;
}

public Address findNextAddressToSendCreateRequest() {
int clusterSize = client.getClientClusterService().getSize();
Member liteMember = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.spi.ClientClusterService;
import com.hazelcast.client.spi.ClientExecutionService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.HazelcastOverloadException;
import com.hazelcast.core.LifecycleService;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.spi.exception.RetryableException;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.exception.TargetNotMemberException;

import java.io.IOException;
import java.util.concurrent.Executor;
Expand All @@ -52,6 +55,7 @@ public class ClientInvocation implements Runnable {
private final ClientInvocationFuture clientInvocationFuture;
private final ILogger logger;
private final LifecycleService lifecycleService;
private final ClientClusterService clientClusterService;
private final ClientInvocationServiceSupport invocationService;
private final ClientExecutionService executionService;
private final ClientMessage clientMessage;
Expand All @@ -61,22 +65,23 @@ public class ClientInvocation implements Runnable {
private final Connection connection;
private volatile ClientConnection sendConnection;
private boolean bypassHeartbeatCheck;
private long retryTimeoutPointInMillis;
private long retryExpirationMillis;
private EventHandler handler;

protected ClientInvocation(HazelcastClientInstanceImpl client,
ClientMessage clientMessage,
int partitionId,
Address address,
Connection connection) {
this.clientClusterService = client.getClientClusterService();
this.lifecycleService = client.getLifecycleService();
this.invocationService = (ClientInvocationServiceSupport) client.getInvocationService();
this.executionService = client.getClientExecutionService();
this.clientMessage = clientMessage;
this.partitionId = partitionId;
this.address = address;
this.connection = connection;
this.retryTimeoutPointInMillis = System.currentTimeMillis() + invocationService.getInvocationTimeoutMillis();
this.retryExpirationMillis = System.currentTimeMillis() + invocationService.getInvocationTimeoutMillis();
this.logger = invocationService.invocationLogger;
this.callIdSequence = client.getCallIdSequence();
this.clientInvocationFuture = new ClientInvocationFuture(this, executionService,
Expand Down Expand Up @@ -176,28 +181,52 @@ public void notifyException(Throwable exception) {
return;
}

if ((isBindToSingleConnection() && exception instanceof IOException)
|| System.currentTimeMillis() > retryTimeoutPointInMillis) {
if (isNotAllowedToRetryOnSelection(exception)) {
clientInvocationFuture.complete(exception);
return;
}

if (isRetrySafeException(exception)
boolean retry = isRetrySafeException(exception)
|| invocationService.isRedoOperation()
|| (exception instanceof TargetDisconnectedException && clientMessage.isRetryable())) {
try {
ClientExecutionServiceImpl executionServiceImpl = (ClientExecutionServiceImpl) this.executionService;
executionServiceImpl.schedule(this, RETRY_WAIT_TIME_IN_SECONDS, TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
if (logger.isFinestEnabled()) {
logger.finest("Retry could not be scheduled ", e);
}
clientInvocationFuture.complete(exception);
|| (exception instanceof TargetDisconnectedException && clientMessage.isRetryable());
if (!retry) {
clientInvocationFuture.complete(exception);
return;
}

long remainingMillis = retryExpirationMillis - System.currentTimeMillis();
if (remainingMillis < 0) {
if (logger.isFinestEnabled()) {
logger.finest("Exception will not be retried because invocation timed out", exception);
}
clientInvocationFuture.complete(new OperationTimeoutException(this + " timed out by "
+ Math.abs(remainingMillis) + " ms"));
return;
}

clientInvocationFuture.complete(exception);
try {
executionService.schedule(this, RETRY_WAIT_TIME_IN_SECONDS, TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
clientInvocationFuture.complete(exception);
}

}

private boolean isNotAllowedToRetryOnSelection(Throwable exception) {
if (isBindToSingleConnection() && exception instanceof IOException) {
return true;
}

if (address != null
&& exception instanceof TargetNotMemberException
&& clientClusterService.getMember(address) == null) {
//when invocation send over address
//if exception is target not member and
//address is not available in member list , don't retry
clientInvocationFuture.complete(exception);
return true;
}
return false;
}

private boolean isBindToSingleConnection() {
Expand Down Expand Up @@ -244,4 +273,22 @@ public static boolean isRetrySafeException(Throwable t) {
public Executor getUserExecutor() {
return executionService.getUserExecutor();
}

@Override
public String toString() {
String target;
if (isBindToSingleConnection()) {
target = "connection " + connection;
} else if (partitionId != -1) {
target = "partition " + partitionId;
} else if (address != null) {
target = "address " + address;
} else {
target = "random";
}
return "ClientInvocation{"
+ "clientMessageType=" + clientMessage.getMessageType()
+ ", target=" + target
+ ", sendConnection=" + sendConnection + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package com.hazelcast.client;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
Expand All @@ -28,6 +32,11 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class ClientTimeoutTest {
Expand Down Expand Up @@ -61,9 +70,32 @@ public void testConnectionTimeout_withZeroValue() {

public void testConnectionTimeout(int timeoutInMillis) {
//Should work without throwing exception.
final ClientConfig clientConfig = new ClientConfig();
ClientConfig clientConfig = new ClientConfig();
clientConfig.getNetworkConfig().setConnectionTimeout(timeoutInMillis);
hazelcastFactory.newHazelcastInstance();
hazelcastFactory.newHazelcastClient(clientConfig);
}

@Test(expected = OperationTimeoutException.class)
public void testInvocationTimeOut() throws Throwable {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), "0");
hazelcastFactory.newHazelcastInstance();
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig);
IExecutorService executorService = client.getExecutorService("test");
Future<Boolean> future = executorService.submit(new RetryableExceptionThrowingCallable());
try {
future.get();
} catch (InterruptedException e) {
//ignored
} catch (ExecutionException e) {
throw e.getCause();
}
}

public static class RetryableExceptionThrowingCallable implements Callable, Serializable {
public Object call() throws Exception {
throw new RetryableHazelcastException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.exception.TargetDisconnectedException;
Expand Down Expand Up @@ -106,7 +107,7 @@ public void testClient_canConnect_whenClusterState_passive() {
factory.newHazelcastClient();
}

@Test(expected = IllegalStateException.class)
@Test(expected = OperationTimeoutException.class)
public void testClient_canNotExecuteWriteOperations_whenClusterState_passive() {
warmUpPartitions(instances);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.DefaultNodeExtension;
import com.hazelcast.instance.HazelcastInstanceFactory;
import com.hazelcast.instance.Node;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void test_canNotConnect_whenNodeExtensionIsNotComplete()
factory.newHazelcastClient(clientConfig);
}

@Test(expected = IllegalStateException.class)
@Test(expected = OperationTimeoutException.class)
public void test_canGetFromMap_whenNodeExtensionIsNotComplete() {
IMap<Object, Object> map = null;
ManagedExtensionNodeContext nodeContext = null;
Expand Down