From 3e4b22caa0f35f85c55b411f8b81f4e7ccab4140 Mon Sep 17 00:00:00 2001 From: ihsan demir Date: Mon, 19 Jun 2017 16:48:49 +0300 Subject: [PATCH 1/3] Prevent ConnectionManager::shutdown to throw any exception. Reverted the order of InvocationService shutdown and ConnectionManager shutdown in order to eliminate any unnecessary retries happening during client shutdown, and prevent these retries causing any exceptions. Note that InvocationService::shutdown is only setting a boolean flag. At Java client, there is a CleanResourcesTask which periodically performs the cleanup of invocations. We really need such a task implemented in the future. --- .../include/hazelcast/client/connection/ConnectionManager.h | 2 +- .../src/hazelcast/client/connection/ConnectionManager.cpp | 3 ++- hazelcast/src/hazelcast/client/spi/LifecycleService.cpp | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hazelcast/include/hazelcast/client/connection/ConnectionManager.h b/hazelcast/include/hazelcast/client/connection/ConnectionManager.h index 66583dcdd6..07443a6eac 100644 --- a/hazelcast/include/hazelcast/client/connection/ConnectionManager.h +++ b/hazelcast/include/hazelcast/client/connection/ConnectionManager.h @@ -131,7 +131,7 @@ namespace hazelcast { void onConnectionClose(const Address &address, int socketId); /** - * Shutdown clientConnectionManager + * Shutdown clientConnectionManager. It does not throw any excpetion. */ void shutdown(); diff --git a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp index 82c5dddad3..e8f4110be6 100644 --- a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp +++ b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp @@ -72,7 +72,8 @@ namespace hazelcast { live = false; // close connections BOOST_FOREACH(boost::shared_ptr connection , connections.values()) { - connection->close("Hazelcast client is shutting down"); + // prevent any exceptions + util::IOUtil::closeResource(connection.get(), "Hazelcast client is shutting down"); } heartBeater.shutdown(); if (heartBeatThread.get() != NULL) { diff --git a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp index d86ea798a2..40c87dc809 100644 --- a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp +++ b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp @@ -70,10 +70,10 @@ namespace hazelcast { return; } fireLifecycleEvent(LifecycleEvent::SHUTTING_DOWN); + clientContext.getInvocationService().shutdown(); clientContext.getConnectionManager().shutdown(); clientContext.getClusterService().shutdown(); clientContext.getPartitionService().shutdown(); - clientContext.getInvocationService().shutdown(); clientContext.getNearCacheManager().destroyAllNearCaches(); fireLifecycleEvent(LifecycleEvent::SHUTDOWN); } From f1fd2158a2ce8fc13da32982b7000268ccd0efd2 Mon Sep 17 00:00:00 2001 From: ihsan demir Date: Tue, 20 Jun 2017 12:27:16 +0300 Subject: [PATCH 2/3] Corrected the return value of InvocationService::removeEventHandler method. Fix: Set the handler registration id correctly, previously we left it as unset (empty) which indirectly caused the removals of the registration to fail. --- .../hazelcast/client/connection/Connection.h | 2 ++ .../hazelcast/client/spi/InvocationService.h | 2 +- .../client/connection/Connection.cpp | 15 ++++++++++ .../client/spi/InvocationService.cpp | 30 +++++++++++-------- .../client/spi/ServerListenerService.cpp | 28 ++++++++++++----- 5 files changed, 57 insertions(+), 20 deletions(-) diff --git a/hazelcast/include/hazelcast/client/connection/Connection.h b/hazelcast/include/hazelcast/client/connection/Connection.h index d990c32ff5..47a60fbd07 100644 --- a/hazelcast/include/hazelcast/client/connection/Connection.h +++ b/hazelcast/include/hazelcast/client/connection/Connection.h @@ -123,6 +123,8 @@ namespace hazelcast { int connectionId; }; + + std::ostream HAZELCAST_API &operator << (std::ostream &out, const Connection &connection); } } } diff --git a/hazelcast/include/hazelcast/client/spi/InvocationService.h b/hazelcast/include/hazelcast/client/spi/InvocationService.h index bed8601329..f42b3bb0d3 100644 --- a/hazelcast/include/hazelcast/client/spi/InvocationService.h +++ b/hazelcast/include/hazelcast/client/spi/InvocationService.h @@ -110,7 +110,7 @@ namespace hazelcast { * @param callId of event handler registration request * @return true if found and removed, false otherwise */ - void removeEventHandler(int64_t callId); + bool removeEventHandler(int64_t callId); /** * Clean all promises (both request and event handlers). Retries requests on available connections if applicable. diff --git a/hazelcast/src/hazelcast/client/connection/Connection.cpp b/hazelcast/src/hazelcast/client/connection/Connection.cpp index 250066b5b0..32e64feecd 100644 --- a/hazelcast/src/hazelcast/client/connection/Connection.cpp +++ b/hazelcast/src/hazelcast/client/connection/Connection.cpp @@ -223,6 +223,21 @@ namespace hazelcast { bool Connection::isOwnerConnection() const { return _isOwnerConnection; } + + std::ostream HAZELCAST_API &operator << (std::ostream &out, const Connection &connection) { + Connection &conn = const_cast(connection); + time_t lastRead = conn.lastRead; + bool live = conn.live; + out << "ClientConnection{" + << "alive=" << live + << ", connectionId=" << connection.getConnectionId() + << ", remoteEndpoint=" << connection.getRemoteEndpoint() + << ", lastReadTime=" << lastRead + << ", isHeartbeating=" << conn.isHeartBeating() + << '}'; + + return out; + } } } } diff --git a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp index d1dd24b562..24deca57fd 100644 --- a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp +++ b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp @@ -136,15 +136,18 @@ namespace hazelcast { return retryCount; } - void InvocationService::removeEventHandler(int64_t callId) { + bool InvocationService::removeEventHandler(int64_t callId) { std::vector > connections = clientContext.getConnectionManager().getConnections(); std::vector >::iterator it; for (it = connections.begin(); it != connections.end(); ++it) { boost::shared_ptr &connectionPtr = *it; - if (deRegisterEventHandler(*connectionPtr, callId) != NULL) { - return; + boost::shared_ptr eventPromise = deRegisterEventHandler(*connectionPtr, + callId); + if (eventPromise.get() != (connection::CallPromise *) NULL) { + return true; } } + return false; } @@ -221,12 +224,11 @@ namespace hazelcast { boost::shared_ptr actualConn = registerAndEnqueue(connection, promise); if (NULL != actualConn.get()) { - char msg[300]; - const Address &serverAddr = connection->getRemoteEndpoint(); - hazelcast::util::snprintf(msg, 300, "[InvocationService::resend] Re-sending the request with id %lld " - "originally destined for %s to server [%s:%d] using the new correlation id %lld", correlationId, - lastTriedAddress.c_str(), serverAddr.getHost().c_str(), serverAddr.getPort(), promise->getRequest()->getCorrelationId()); - util::ILogger::getLogger().info(msg); + std::ostringstream out; + out << "[InvocationService::resend] Re-sending the request with id " << correlationId << + " originally destined for " << lastTriedAddress << " on connection " << *actualConn << + " using the new correlation id " << promise->getRequest()->getCorrelationId(); + util::ILogger::getLogger().info(out.str()); } return actualConn; @@ -343,9 +345,13 @@ namespace hazelcast { boost::shared_ptr promise) { client::impl::BaseEventHandler *eventHandler = promise->getEventHandler(); if (eventHandler != NULL) { - if (eventHandler->registrationId.size() == - 0) //if uuid is not set, it means it is first time that we are getting uuid. - return true; // then no need to handle it, just set as normal response + /** + * if uuid is not set, it means it is first time that we are getting uuid. + * then no need to handle it, just treat non-event response. + */ + if (eventHandler->registrationId.empty()) { + return true; + } // result->registrationId is the alias for the original registration clientContext.getServerListenerService().reRegisterListener(eventHandler->registrationId, response); diff --git a/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp b/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp index 0bff66deba..6b31c09a20 100644 --- a/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp +++ b/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp @@ -54,10 +54,9 @@ namespace hazelcast { handler->beforeListenerRegister(); connection::CallFuture future = clientContext.getInvocationService().invokeOnRandomTarget( addListenerCodec->encodeRequest(), handler); - - std::string registrationId = registerInternal(addListenerCodec, future); + handler->registrationId = registerInternal(addListenerCodec, future); handler->onListenerRegister(); - return registrationId; + return handler->registrationId; } void ServerListenerService::reRegisterListener(std::string registrationId, @@ -84,7 +83,9 @@ namespace hazelcast { boost::shared_ptr registration = registrationIdMap.remove(*uuid); if ((impl::listener::EventRegistration *)NULL != registration.get()) { - clientContext.getInvocationService().removeEventHandler(registration->getCorrelationId()); + if (!clientContext.getInvocationService().removeEventHandler(registration->getCorrelationId())) { + return false; + } // send a remove listener request removeListenerCodec.setRegistrationId(*uuid); @@ -93,10 +94,23 @@ namespace hazelcast { connection::CallFuture future = clientContext.getInvocationService().invokeOnTarget( request, registration->getMemberAddress()); - std::auto_ptr response = future.get(); + future.get(); + } catch (exception::IOException &e) { + //if invocation cannot be done that means connection is broken, the server is shutdown + util::ILogger &logger = util::ILogger::getLogger(); + if (logger.isFinestEnabled()) { + std::ostringstream out; + const Address &memberAddress = registration->getMemberAddress(); + out << "[ServerListenerService::deRegisterListener] Removal of listener with id " << + uuid << " from server " << memberAddress << " failed. " << e.what(); + logger.finest(out.str()); + } } catch (exception::IException &e) { - //if invocation cannot be done that means connection is broken and listener is already removed - (void)e; // suppress the unused variable warning + std::ostringstream out; + const Address &memberAddress = registration->getMemberAddress(); + out << "[ServerListenerService::deRegisterListener] Removal of listener with id " << + uuid << " from server " << memberAddress << " failed. " << e.what(); + util::ILogger::getLogger().warning(out.str()); } return true; From a24d2fd53dcb1669cdb0f5743af5f7d65dd7cf1a Mon Sep 17 00:00:00 2001 From: ihsan demir Date: Tue, 20 Jun 2017 14:45:06 +0300 Subject: [PATCH 3/3] Removed the unneded isOpen flag from InvocationService, it uses LifecycleService::isOpen instead. Also, suppressed some logs printed during shutdown. Updated the start sequence to start the InvocationService start before ConnectionManager start (currently InvocationService::start is no-op). Updated shutdown sequence to shutdown PartitionService before ConnectionManager. --- .../hazelcast/client/spi/InvocationService.h | 3 +- .../connection/ClusterListenerThread.cpp | 6 ++- .../client/spi/InvocationService.cpp | 50 ++++++++++--------- .../hazelcast/client/spi/LifecycleService.cpp | 9 ++-- .../hazelcast/client/spi/PartitionService.cpp | 30 +++++++---- 5 files changed, 56 insertions(+), 42 deletions(-) diff --git a/hazelcast/include/hazelcast/client/spi/InvocationService.h b/hazelcast/include/hazelcast/client/spi/InvocationService.h index f42b3bb0d3..ec86eb993a 100644 --- a/hazelcast/include/hazelcast/client/spi/InvocationService.h +++ b/hazelcast/include/hazelcast/client/spi/InvocationService.h @@ -22,7 +22,6 @@ #include "hazelcast/util/AtomicInt.h" #include "hazelcast/util/SynchronizedMap.h" #include "hazelcast/client/protocol/IMessageHandler.h" -#include "hazelcast/util/AtomicBoolean.h" #include "hazelcast/client/protocol/ClientExceptionFactory.h" #include @@ -65,6 +64,7 @@ namespace hazelcast { namespace spi { class ClientContext; + class LifecycleService; class HAZELCAST_API InvocationService : public protocol::IMessageHandler { public: @@ -142,7 +142,6 @@ namespace hazelcast { // Is not using the Connection* for the key due to a possible ABA problem. util::SynchronizedMap > callPromises; util::SynchronizedMap > eventHandlerPromises; - util::AtomicBoolean isOpen; protocol::ClientExceptionFactory exceptionFactory; bool isAllowedToSentRequest(connection::Connection& connection, protocol::ClientMessage const&); diff --git a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp index 1c031dedd7..0bb8b7d05a 100644 --- a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp +++ b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp @@ -81,8 +81,10 @@ namespace hazelcast { previousConnectionAddr = conn->getRemoteEndpoint(); previousConnectionAddrPtr = &previousConnectionAddr; } catch (std::exception &e) { - util::ILogger::getLogger().severe( - std::string("Error while connecting to cluster! =>") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().severe( + std::string("Error while connecting to cluster! =>") + e.what()); + } isStartedSuccessfully = false; clientContext.getLifecycleService().shutdown(); startLatch.countDown(); diff --git a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp index 24deca57fd..0327c3473a 100644 --- a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp +++ b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp @@ -31,6 +31,7 @@ #include "hazelcast/client/ClientProperties.h" #include "hazelcast/client/connection/Connection.h" #include "hazelcast/client/spi/ServerListenerService.h" +#include "hazelcast/client/spi/LifecycleService.h" #include "hazelcast/client/serialization/pimpl/SerializationService.h" #include "hazelcast/client/exception/IllegalStateException.h" #include "hazelcast/client/exception/InstanceNotActiveException.h" @@ -42,7 +43,7 @@ namespace hazelcast { namespace client { namespace spi { InvocationService::InvocationService(spi::ClientContext &clientContext) - : clientContext(clientContext), isOpen(false) { + : clientContext(clientContext) { redoOperation = clientContext.getClientConfig().isRedoOperation(); ClientProperties &properties = clientContext.getClientProperties(); retryWaitTime = properties.getRetryWaitTime().getInteger(); @@ -65,11 +66,10 @@ namespace hazelcast { } bool InvocationService::start() { - return isOpen.compareAndSet(false, true); + return true; } void InvocationService::shutdown() { - isOpen.compareAndSet(true, false); } connection::CallFuture InvocationService::invokeOnRandomTarget( @@ -237,15 +237,13 @@ namespace hazelcast { boost::shared_ptr InvocationService::registerAndEnqueue( boost::shared_ptr &connection, boost::shared_ptr promise) { - if (!isOpen) { + if (!clientContext.getLifecycleService().isRunning()) { char msg[200]; - util::snprintf(msg, 200, "[InvocationService::registerAndEnqueue] InvocationService is shutdown. " - "Did not register the promise for message correlation id:%lld", - promise->getRequest()->getCorrelationId()); - hazelcast::util::ILogger::getLogger().info(msg); + util::snprintf(msg, 200, "Client is not running. Did not register the promise for message " + "correlation id:%lld", promise->getRequest()->getCorrelationId()); std::auto_ptr exception(new exception::IllegalStateException( - "InvocationService::registerAndEnqueue", "Invocation service is not open. Can not process the request.")); + "InvocationService::registerAndEnqueue", msg)); promise->setException(exception); return boost::shared_ptr(); @@ -365,8 +363,8 @@ namespace hazelcast { void InvocationService::tryResend(std::auto_ptr exception, boost::shared_ptr promise, const std::string &lastTriedAddress) { - bool serviceOpen = isOpen; - if (serviceOpen && (promise->getRequest()->isRetryable() || isRedoOperation())) { + if (clientContext.getLifecycleService().isRunning() && + (promise->getRequest()->isRetryable() || isRedoOperation())) { resend(promise, lastTriedAddress); return; } @@ -391,16 +389,19 @@ namespace hazelcast { std::string address = util::IOUtil::to_string(connection.getRemoteEndpoint()); - char msg[200]; - util::snprintf(msg, 200, "[cleanResources] There are %u waiting promises on connection with id:%d (%s) ", - promises.size(), connection.getConnectionId(), address.c_str()); - util::ILogger::getLogger().info(msg); + util::ILogger &logger = util::ILogger::getLogger(); + if (logger.isFinestEnabled()) { + std::ostringstream out; + out << "[InvocationService::cleanResources] There are " << promises.size() << " waiting promises on " + "connection " << connection; + logger.finest(out.str()); + } for (std::vector > >::iterator it = promises.begin(); it != promises.end(); ++it) { - if (!isOpen) { + if (!clientContext.getLifecycleService().isRunning()) { std::auto_ptr exception(new exception::IllegalStateException( - "InvocationService::cleanResources", "Invocation service is not open.")); + "InvocationService::cleanResources", "Client is not running.")); it->second->setException(exception); } else { std::auto_ptr exception(new exception::IOException( @@ -417,19 +418,20 @@ namespace hazelcast { connection)->clear(); util::ILogger &logger = util::ILogger::getLogger(); + if (logger.isFinestEnabled()) { + char msg[200]; + util::snprintf(msg, 200, "[InvocationService::cleanEventHandlers] There are %ld event handler promises " + "on connection with id:%d to be retried", promises.size(), connection.getConnectionId()); + logger.finest(msg); + } - char msg[200]; - util::snprintf(msg, 200, "[InvocationService::cleanEventHandlers] There are %ld event handler promises on connection with id:%d to be retried", - promises.size(), connection.getConnectionId()); - logger.info(msg); - - if (isOpen) { + if (clientContext.getLifecycleService().isRunning()) { for (std::vector > >::const_iterator it = promises.begin(); it != promises.end(); ++it) { clientContext.getServerListenerService().retryFailedListener(it->second); } } else { - logger.info("[InvocationService::cleanEventHandlers] The service is closed. Shall not retry " + logger.finest("[InvocationService::cleanEventHandlers] The service is closed. Shall not retry " "registering any event handler if exists."); } } diff --git a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp index 40c87dc809..93fe1d26c4 100644 --- a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp +++ b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp @@ -43,15 +43,16 @@ namespace hazelcast { fireLifecycleEvent(LifecycleEvent::STARTING); active = true; - if (!clientContext.getConnectionManager().start()) { + + if (!clientContext.getInvocationService().start()) { return false; } - if (!clientContext.getClusterService().start()) { + if (!clientContext.getConnectionManager().start()) { return false; } - if (!clientContext.getInvocationService().start()) { + if (!clientContext.getClusterService().start()) { return false; } @@ -71,9 +72,9 @@ namespace hazelcast { } fireLifecycleEvent(LifecycleEvent::SHUTTING_DOWN); clientContext.getInvocationService().shutdown(); + clientContext.getPartitionService().shutdown(); clientContext.getConnectionManager().shutdown(); clientContext.getClusterService().shutdown(); - clientContext.getPartitionService().shutdown(); clientContext.getNearCacheManager().destroyAllNearCaches(); fireLifecycleEvent(LifecycleEvent::SHUTDOWN); } diff --git a/hazelcast/src/hazelcast/client/spi/PartitionService.cpp b/hazelcast/src/hazelcast/client/spi/PartitionService.cpp index 9877a35efb..a0244401df 100644 --- a/hazelcast/src/hazelcast/client/spi/PartitionService.cpp +++ b/hazelcast/src/hazelcast/client/spi/PartitionService.cpp @@ -107,7 +107,9 @@ namespace hazelcast { responseMessage = future.get(); } catch (exception::IOException& e) { - util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + } } return responseMessage; } @@ -123,7 +125,9 @@ namespace hazelcast { responseMessage = future.get(); } catch (exception::IOException& e) { - util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + } } return responseMessage; } @@ -170,12 +174,14 @@ namespace hazelcast { } } - if (!result) { - util::ILogger::getLogger().severe("PartitionService::getInitialPartitions Cannot get initial partitions!"); - } else { - util::ILogger::getLogger().finest("PartitionService::getInitialPartitions Got " + - util::IOUtil::to_string(partitionCount) + - " initial partitions successfully."); + if (clientContext.getLifecycleService().isRunning()) { + if (!result) { + util::ILogger::getLogger().severe("PartitionService::getInitialPartitions Cannot get initial partitions!"); + } else { + util::ILogger::getLogger().finest("PartitionService::getInitialPartitions Got " + + util::IOUtil::to_string(partitionCount) + + " initial partitions successfully."); + } } return result; } @@ -195,9 +201,13 @@ namespace hazelcast { processPartitionResponse(*partitionResponse); } } catch (hazelcast::client::exception::IException& e) { - util::ILogger::getLogger().finest(std::string("Exception in partitionService::refreshPartitions ") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().finest(std::string("Exception in partitionService::refreshPartitions ") + e.what()); + } } catch (...) { - util::ILogger::getLogger().finest(std::string("Unkown exception in partitionService::refreshPartitions ")); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().finest(std::string("Unkown exception in partitionService::refreshPartitions ")); + } throw; } updating = false;