diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/spi/ProxyManager.java b/hazelcast-client/src/main/java/com/hazelcast/client/spi/ProxyManager.java index 22e83f654d12..8059f1084f23 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/spi/ProxyManager.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/spi/ProxyManager.java @@ -24,7 +24,6 @@ import com.hazelcast.client.cache.impl.ClientCacheProxyFactory; import com.hazelcast.client.config.ClientConfig; import com.hazelcast.client.config.ProxyFactoryConfig; -import com.hazelcast.client.connection.ClientConnectionManager; import com.hazelcast.client.impl.HazelcastClientInstanceImpl; import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.client.impl.protocol.codec.ClientAddDistributedObjectListenerCodec; @@ -78,7 +77,6 @@ import com.hazelcast.multimap.impl.MultiMapService; import com.hazelcast.nio.Address; import com.hazelcast.nio.ClassLoaderUtil; -import com.hazelcast.nio.Connection; import com.hazelcast.replicatedmap.impl.ReplicatedMapService; import com.hazelcast.ringbuffer.impl.RingbufferService; import com.hazelcast.scheduledexecutor.impl.DistributedScheduledExecutorService; @@ -343,37 +341,17 @@ private void sleepForProxyInitRetry() { } private void initialize(ClientProxy clientProxy) throws Exception { - final Address initializationTarget = findNextAddressToSendCreateRequest(); - final Connection connection = getTargetOrOwnerConnection(initializationTarget); - final ClientMessage clientMessage = ClientCreateProxyCodec.encodeRequest(clientProxy.getDistributedObjectName(), + Address initializationTarget = findNextAddressToSendCreateRequest(); + if (initializationTarget == null) { + throw new IOException("Not able to find a member to create proxy on!"); + } + ClientMessage clientMessage = ClientCreateProxyCodec.encodeRequest(clientProxy.getDistributedObjectName(), clientProxy.getServiceName(), initializationTarget); - new ClientInvocation(client, clientMessage, connection).invoke().get(); + new ClientInvocation(client, clientMessage, initializationTarget).invoke().get(); clientProxy.setContext(context); clientProxy.onInitialize(); } - private Connection getTargetOrOwnerConnection(final Address target) throws IOException { - if (target == null) { - throw new IOException("Not able to setup owner connection!"); - } - - final ClientConnectionManager connectionManager = client.getConnectionManager(); - Connection connection = connectionManager.getConnection(target); - if (connection == null) { - final Address ownerConnectionAddress = client.getClientClusterService().getOwnerConnectionAddress(); - if (ownerConnectionAddress == null) { - throw new IOException("Not able to setup owner connection!"); - } - - connection = connectionManager.getConnection(ownerConnectionAddress); - if (connection == null) { - throw new IOException("Client is not connected to member " + target); - } - } - - return connection; - } - public Address findNextAddressToSendCreateRequest() { int clusterSize = client.getClientClusterService().getSize(); Member liteMember = null; diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java index 554af95026fb..90d8e888c27c 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java @@ -20,16 +20,19 @@ import com.hazelcast.client.connection.nio.ClientConnection; import com.hazelcast.client.impl.HazelcastClientInstanceImpl; import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.spi.ClientClusterService; import com.hazelcast.client.spi.ClientExecutionService; import com.hazelcast.client.spi.EventHandler; import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.core.HazelcastOverloadException; import com.hazelcast.core.LifecycleService; +import com.hazelcast.core.OperationTimeoutException; import com.hazelcast.logging.ILogger; import com.hazelcast.nio.Address; import com.hazelcast.nio.Connection; import com.hazelcast.spi.exception.RetryableException; import com.hazelcast.spi.exception.TargetDisconnectedException; +import com.hazelcast.spi.exception.TargetNotMemberException; import java.io.IOException; import java.util.concurrent.Executor; @@ -52,6 +55,7 @@ public class ClientInvocation implements Runnable { private final ClientInvocationFuture clientInvocationFuture; private final ILogger logger; private final LifecycleService lifecycleService; + private final ClientClusterService clientClusterService; private final ClientInvocationServiceSupport invocationService; private final ClientExecutionService executionService; private final ClientMessage clientMessage; @@ -61,7 +65,7 @@ public class ClientInvocation implements Runnable { private final Connection connection; private volatile ClientConnection sendConnection; private boolean bypassHeartbeatCheck; - private long retryTimeoutPointInMillis; + private long retryExpirationMillis; private EventHandler handler; protected ClientInvocation(HazelcastClientInstanceImpl client, @@ -69,6 +73,7 @@ protected ClientInvocation(HazelcastClientInstanceImpl client, int partitionId, Address address, Connection connection) { + this.clientClusterService = client.getClientClusterService(); this.lifecycleService = client.getLifecycleService(); this.invocationService = (ClientInvocationServiceSupport) client.getInvocationService(); this.executionService = client.getClientExecutionService(); @@ -76,7 +81,7 @@ protected ClientInvocation(HazelcastClientInstanceImpl client, this.partitionId = partitionId; this.address = address; this.connection = connection; - this.retryTimeoutPointInMillis = System.currentTimeMillis() + invocationService.getInvocationTimeoutMillis(); + this.retryExpirationMillis = System.currentTimeMillis() + invocationService.getInvocationTimeoutMillis(); this.logger = invocationService.invocationLogger; this.callIdSequence = client.getCallIdSequence(); this.clientInvocationFuture = new ClientInvocationFuture(this, executionService, @@ -176,28 +181,52 @@ public void notifyException(Throwable exception) { return; } - if ((isBindToSingleConnection() && exception instanceof IOException) - || System.currentTimeMillis() > retryTimeoutPointInMillis) { + if (isNotAllowedToRetryOnSelection(exception)) { clientInvocationFuture.complete(exception); return; } - if (isRetrySafeException(exception) + boolean retry = isRetrySafeException(exception) || invocationService.isRedoOperation() - || (exception instanceof TargetDisconnectedException && clientMessage.isRetryable())) { - try { - ClientExecutionServiceImpl executionServiceImpl = (ClientExecutionServiceImpl) this.executionService; - executionServiceImpl.schedule(this, RETRY_WAIT_TIME_IN_SECONDS, TimeUnit.SECONDS); - } catch (RejectedExecutionException e) { - if (logger.isFinestEnabled()) { - logger.finest("Retry could not be scheduled ", e); - } - clientInvocationFuture.complete(exception); + || (exception instanceof TargetDisconnectedException && clientMessage.isRetryable()); + if (!retry) { + clientInvocationFuture.complete(exception); + return; + } + + long remainingMillis = retryExpirationMillis - System.currentTimeMillis(); + if (remainingMillis < 0) { + if (logger.isFinestEnabled()) { + logger.finest("Exception will not be retried because invocation timed out", exception); } + clientInvocationFuture.complete(new OperationTimeoutException(this + " timed out by " + + Math.abs(remainingMillis) + " ms")); return; } - clientInvocationFuture.complete(exception); + try { + executionService.schedule(this, RETRY_WAIT_TIME_IN_SECONDS, TimeUnit.SECONDS); + } catch (RejectedExecutionException e) { + clientInvocationFuture.complete(exception); + } + + } + + private boolean isNotAllowedToRetryOnSelection(Throwable exception) { + if (isBindToSingleConnection() && exception instanceof IOException) { + return true; + } + + if (address != null + && exception instanceof TargetNotMemberException + && clientClusterService.getMember(address) == null) { + //when invocation send over address + //if exception is target not member and + //address is not available in member list , don't retry + clientInvocationFuture.complete(exception); + return true; + } + return false; } private boolean isBindToSingleConnection() { @@ -244,4 +273,22 @@ public static boolean isRetrySafeException(Throwable t) { public Executor getUserExecutor() { return executionService.getUserExecutor(); } + + @Override + public String toString() { + String target; + if (isBindToSingleConnection()) { + target = "connection " + connection; + } else if (partitionId != -1) { + target = "partition " + partitionId; + } else if (address != null) { + target = "address " + address; + } else { + target = "random"; + } + return "ClientInvocation{" + + "clientMessageType=" + clientMessage.getMessageType() + + ", target=" + target + + ", sendConnection=" + sendConnection + '}'; + } } diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/ClientTimeoutTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/ClientTimeoutTest.java index be821083975b..abba5b0c8115 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/ClientTimeoutTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/ClientTimeoutTest.java @@ -17,9 +17,13 @@ package com.hazelcast.client; import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.client.spi.properties.ClientProperty; import com.hazelcast.client.test.TestHazelcastFactory; import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IExecutorService; import com.hazelcast.core.IList; +import com.hazelcast.core.OperationTimeoutException; +import com.hazelcast.spi.exception.RetryableHazelcastException; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; @@ -28,6 +32,11 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.Serializable; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + @RunWith(HazelcastParallelClassRunner.class) @Category({QuickTest.class, ParallelTest.class}) public class ClientTimeoutTest { @@ -61,9 +70,32 @@ public void testConnectionTimeout_withZeroValue() { public void testConnectionTimeout(int timeoutInMillis) { //Should work without throwing exception. - final ClientConfig clientConfig = new ClientConfig(); + ClientConfig clientConfig = new ClientConfig(); clientConfig.getNetworkConfig().setConnectionTimeout(timeoutInMillis); hazelcastFactory.newHazelcastInstance(); hazelcastFactory.newHazelcastClient(clientConfig); } + + @Test(expected = OperationTimeoutException.class) + public void testInvocationTimeOut() throws Throwable { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), "0"); + hazelcastFactory.newHazelcastInstance(); + HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig); + IExecutorService executorService = client.getExecutorService("test"); + Future future = executorService.submit(new RetryableExceptionThrowingCallable()); + try { + future.get(); + } catch (InterruptedException e) { + //ignored + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + public static class RetryableExceptionThrowingCallable implements Callable, Serializable { + public Object call() throws Exception { + throw new RetryableHazelcastException(); + } + } } diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientClusterStateTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientClusterStateTest.java index 898179fe5e7a..00f66fb1bcc3 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientClusterStateTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientClusterStateTest.java @@ -23,6 +23,7 @@ import com.hazelcast.config.Config; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; +import com.hazelcast.core.OperationTimeoutException; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; import com.hazelcast.spi.exception.TargetDisconnectedException; @@ -106,7 +107,7 @@ public void testClient_canConnect_whenClusterState_passive() { factory.newHazelcastClient(); } - @Test(expected = IllegalStateException.class) + @Test(expected = OperationTimeoutException.class) public void testClient_canNotExecuteWriteOperations_whenClusterState_passive() { warmUpPartitions(instances); diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientNodeExtensionTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientNodeExtensionTest.java index c6a186aabb16..36828b703796 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientNodeExtensionTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/cluster/ClientNodeExtensionTest.java @@ -22,6 +22,7 @@ import com.hazelcast.config.Config; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; +import com.hazelcast.core.OperationTimeoutException; import com.hazelcast.instance.DefaultNodeExtension; import com.hazelcast.instance.HazelcastInstanceFactory; import com.hazelcast.instance.Node; @@ -98,7 +99,7 @@ public void test_canNotConnect_whenNodeExtensionIsNotComplete() factory.newHazelcastClient(clientConfig); } - @Test(expected = IllegalStateException.class) + @Test(expected = OperationTimeoutException.class) public void test_canGetFromMap_whenNodeExtensionIsNotComplete() { IMap map = null; ManagedExtensionNodeContext nodeContext = null;