diff --git a/hazelcast/include/hazelcast/client/connection/Connection.h b/hazelcast/include/hazelcast/client/connection/Connection.h index d990c32ff5..47a60fbd07 100644 --- a/hazelcast/include/hazelcast/client/connection/Connection.h +++ b/hazelcast/include/hazelcast/client/connection/Connection.h @@ -123,6 +123,8 @@ namespace hazelcast { int connectionId; }; + + std::ostream HAZELCAST_API &operator << (std::ostream &out, const Connection &connection); } } } diff --git a/hazelcast/include/hazelcast/client/connection/ConnectionManager.h b/hazelcast/include/hazelcast/client/connection/ConnectionManager.h index 66583dcdd6..07443a6eac 100644 --- a/hazelcast/include/hazelcast/client/connection/ConnectionManager.h +++ b/hazelcast/include/hazelcast/client/connection/ConnectionManager.h @@ -131,7 +131,7 @@ namespace hazelcast { void onConnectionClose(const Address &address, int socketId); /** - * Shutdown clientConnectionManager + * Shutdown clientConnectionManager. It does not throw any excpetion. */ void shutdown(); diff --git a/hazelcast/include/hazelcast/client/spi/InvocationService.h b/hazelcast/include/hazelcast/client/spi/InvocationService.h index bed8601329..ec86eb993a 100644 --- a/hazelcast/include/hazelcast/client/spi/InvocationService.h +++ b/hazelcast/include/hazelcast/client/spi/InvocationService.h @@ -22,7 +22,6 @@ #include "hazelcast/util/AtomicInt.h" #include "hazelcast/util/SynchronizedMap.h" #include "hazelcast/client/protocol/IMessageHandler.h" -#include "hazelcast/util/AtomicBoolean.h" #include "hazelcast/client/protocol/ClientExceptionFactory.h" #include @@ -65,6 +64,7 @@ namespace hazelcast { namespace spi { class ClientContext; + class LifecycleService; class HAZELCAST_API InvocationService : public protocol::IMessageHandler { public: @@ -110,7 +110,7 @@ namespace hazelcast { * @param callId of event handler registration request * @return true if found and removed, false otherwise */ - void removeEventHandler(int64_t callId); + bool removeEventHandler(int64_t callId); /** * Clean all promises (both request and event handlers). Retries requests on available connections if applicable. @@ -142,7 +142,6 @@ namespace hazelcast { // Is not using the Connection* for the key due to a possible ABA problem. util::SynchronizedMap > callPromises; util::SynchronizedMap > eventHandlerPromises; - util::AtomicBoolean isOpen; protocol::ClientExceptionFactory exceptionFactory; bool isAllowedToSentRequest(connection::Connection& connection, protocol::ClientMessage const&); diff --git a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp index 1c031dedd7..0bb8b7d05a 100644 --- a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp +++ b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp @@ -81,8 +81,10 @@ namespace hazelcast { previousConnectionAddr = conn->getRemoteEndpoint(); previousConnectionAddrPtr = &previousConnectionAddr; } catch (std::exception &e) { - util::ILogger::getLogger().severe( - std::string("Error while connecting to cluster! =>") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().severe( + std::string("Error while connecting to cluster! =>") + e.what()); + } isStartedSuccessfully = false; clientContext.getLifecycleService().shutdown(); startLatch.countDown(); diff --git a/hazelcast/src/hazelcast/client/connection/Connection.cpp b/hazelcast/src/hazelcast/client/connection/Connection.cpp index 250066b5b0..32e64feecd 100644 --- a/hazelcast/src/hazelcast/client/connection/Connection.cpp +++ b/hazelcast/src/hazelcast/client/connection/Connection.cpp @@ -223,6 +223,21 @@ namespace hazelcast { bool Connection::isOwnerConnection() const { return _isOwnerConnection; } + + std::ostream HAZELCAST_API &operator << (std::ostream &out, const Connection &connection) { + Connection &conn = const_cast(connection); + time_t lastRead = conn.lastRead; + bool live = conn.live; + out << "ClientConnection{" + << "alive=" << live + << ", connectionId=" << connection.getConnectionId() + << ", remoteEndpoint=" << connection.getRemoteEndpoint() + << ", lastReadTime=" << lastRead + << ", isHeartbeating=" << conn.isHeartBeating() + << '}'; + + return out; + } } } } diff --git a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp index 82c5dddad3..e8f4110be6 100644 --- a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp +++ b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp @@ -72,7 +72,8 @@ namespace hazelcast { live = false; // close connections BOOST_FOREACH(boost::shared_ptr connection , connections.values()) { - connection->close("Hazelcast client is shutting down"); + // prevent any exceptions + util::IOUtil::closeResource(connection.get(), "Hazelcast client is shutting down"); } heartBeater.shutdown(); if (heartBeatThread.get() != NULL) { diff --git a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp index d1dd24b562..0327c3473a 100644 --- a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp +++ b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp @@ -31,6 +31,7 @@ #include "hazelcast/client/ClientProperties.h" #include "hazelcast/client/connection/Connection.h" #include "hazelcast/client/spi/ServerListenerService.h" +#include "hazelcast/client/spi/LifecycleService.h" #include "hazelcast/client/serialization/pimpl/SerializationService.h" #include "hazelcast/client/exception/IllegalStateException.h" #include "hazelcast/client/exception/InstanceNotActiveException.h" @@ -42,7 +43,7 @@ namespace hazelcast { namespace client { namespace spi { InvocationService::InvocationService(spi::ClientContext &clientContext) - : clientContext(clientContext), isOpen(false) { + : clientContext(clientContext) { redoOperation = clientContext.getClientConfig().isRedoOperation(); ClientProperties &properties = clientContext.getClientProperties(); retryWaitTime = properties.getRetryWaitTime().getInteger(); @@ -65,11 +66,10 @@ namespace hazelcast { } bool InvocationService::start() { - return isOpen.compareAndSet(false, true); + return true; } void InvocationService::shutdown() { - isOpen.compareAndSet(true, false); } connection::CallFuture InvocationService::invokeOnRandomTarget( @@ -136,15 +136,18 @@ namespace hazelcast { return retryCount; } - void InvocationService::removeEventHandler(int64_t callId) { + bool InvocationService::removeEventHandler(int64_t callId) { std::vector > connections = clientContext.getConnectionManager().getConnections(); std::vector >::iterator it; for (it = connections.begin(); it != connections.end(); ++it) { boost::shared_ptr &connectionPtr = *it; - if (deRegisterEventHandler(*connectionPtr, callId) != NULL) { - return; + boost::shared_ptr eventPromise = deRegisterEventHandler(*connectionPtr, + callId); + if (eventPromise.get() != (connection::CallPromise *) NULL) { + return true; } } + return false; } @@ -221,12 +224,11 @@ namespace hazelcast { boost::shared_ptr actualConn = registerAndEnqueue(connection, promise); if (NULL != actualConn.get()) { - char msg[300]; - const Address &serverAddr = connection->getRemoteEndpoint(); - hazelcast::util::snprintf(msg, 300, "[InvocationService::resend] Re-sending the request with id %lld " - "originally destined for %s to server [%s:%d] using the new correlation id %lld", correlationId, - lastTriedAddress.c_str(), serverAddr.getHost().c_str(), serverAddr.getPort(), promise->getRequest()->getCorrelationId()); - util::ILogger::getLogger().info(msg); + std::ostringstream out; + out << "[InvocationService::resend] Re-sending the request with id " << correlationId << + " originally destined for " << lastTriedAddress << " on connection " << *actualConn << + " using the new correlation id " << promise->getRequest()->getCorrelationId(); + util::ILogger::getLogger().info(out.str()); } return actualConn; @@ -235,15 +237,13 @@ namespace hazelcast { boost::shared_ptr InvocationService::registerAndEnqueue( boost::shared_ptr &connection, boost::shared_ptr promise) { - if (!isOpen) { + if (!clientContext.getLifecycleService().isRunning()) { char msg[200]; - util::snprintf(msg, 200, "[InvocationService::registerAndEnqueue] InvocationService is shutdown. " - "Did not register the promise for message correlation id:%lld", - promise->getRequest()->getCorrelationId()); - hazelcast::util::ILogger::getLogger().info(msg); + util::snprintf(msg, 200, "Client is not running. Did not register the promise for message " + "correlation id:%lld", promise->getRequest()->getCorrelationId()); std::auto_ptr exception(new exception::IllegalStateException( - "InvocationService::registerAndEnqueue", "Invocation service is not open. Can not process the request.")); + "InvocationService::registerAndEnqueue", msg)); promise->setException(exception); return boost::shared_ptr(); @@ -343,9 +343,13 @@ namespace hazelcast { boost::shared_ptr promise) { client::impl::BaseEventHandler *eventHandler = promise->getEventHandler(); if (eventHandler != NULL) { - if (eventHandler->registrationId.size() == - 0) //if uuid is not set, it means it is first time that we are getting uuid. - return true; // then no need to handle it, just set as normal response + /** + * if uuid is not set, it means it is first time that we are getting uuid. + * then no need to handle it, just treat non-event response. + */ + if (eventHandler->registrationId.empty()) { + return true; + } // result->registrationId is the alias for the original registration clientContext.getServerListenerService().reRegisterListener(eventHandler->registrationId, response); @@ -359,8 +363,8 @@ namespace hazelcast { void InvocationService::tryResend(std::auto_ptr exception, boost::shared_ptr promise, const std::string &lastTriedAddress) { - bool serviceOpen = isOpen; - if (serviceOpen && (promise->getRequest()->isRetryable() || isRedoOperation())) { + if (clientContext.getLifecycleService().isRunning() && + (promise->getRequest()->isRetryable() || isRedoOperation())) { resend(promise, lastTriedAddress); return; } @@ -385,16 +389,19 @@ namespace hazelcast { std::string address = util::IOUtil::to_string(connection.getRemoteEndpoint()); - char msg[200]; - util::snprintf(msg, 200, "[cleanResources] There are %u waiting promises on connection with id:%d (%s) ", - promises.size(), connection.getConnectionId(), address.c_str()); - util::ILogger::getLogger().info(msg); + util::ILogger &logger = util::ILogger::getLogger(); + if (logger.isFinestEnabled()) { + std::ostringstream out; + out << "[InvocationService::cleanResources] There are " << promises.size() << " waiting promises on " + "connection " << connection; + logger.finest(out.str()); + } for (std::vector > >::iterator it = promises.begin(); it != promises.end(); ++it) { - if (!isOpen) { + if (!clientContext.getLifecycleService().isRunning()) { std::auto_ptr exception(new exception::IllegalStateException( - "InvocationService::cleanResources", "Invocation service is not open.")); + "InvocationService::cleanResources", "Client is not running.")); it->second->setException(exception); } else { std::auto_ptr exception(new exception::IOException( @@ -411,19 +418,20 @@ namespace hazelcast { connection)->clear(); util::ILogger &logger = util::ILogger::getLogger(); + if (logger.isFinestEnabled()) { + char msg[200]; + util::snprintf(msg, 200, "[InvocationService::cleanEventHandlers] There are %ld event handler promises " + "on connection with id:%d to be retried", promises.size(), connection.getConnectionId()); + logger.finest(msg); + } - char msg[200]; - util::snprintf(msg, 200, "[InvocationService::cleanEventHandlers] There are %ld event handler promises on connection with id:%d to be retried", - promises.size(), connection.getConnectionId()); - logger.info(msg); - - if (isOpen) { + if (clientContext.getLifecycleService().isRunning()) { for (std::vector > >::const_iterator it = promises.begin(); it != promises.end(); ++it) { clientContext.getServerListenerService().retryFailedListener(it->second); } } else { - logger.info("[InvocationService::cleanEventHandlers] The service is closed. Shall not retry " + logger.finest("[InvocationService::cleanEventHandlers] The service is closed. Shall not retry " "registering any event handler if exists."); } } diff --git a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp index d86ea798a2..93fe1d26c4 100644 --- a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp +++ b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp @@ -43,15 +43,16 @@ namespace hazelcast { fireLifecycleEvent(LifecycleEvent::STARTING); active = true; - if (!clientContext.getConnectionManager().start()) { + + if (!clientContext.getInvocationService().start()) { return false; } - if (!clientContext.getClusterService().start()) { + if (!clientContext.getConnectionManager().start()) { return false; } - if (!clientContext.getInvocationService().start()) { + if (!clientContext.getClusterService().start()) { return false; } @@ -70,10 +71,10 @@ namespace hazelcast { return; } fireLifecycleEvent(LifecycleEvent::SHUTTING_DOWN); + clientContext.getInvocationService().shutdown(); + clientContext.getPartitionService().shutdown(); clientContext.getConnectionManager().shutdown(); clientContext.getClusterService().shutdown(); - clientContext.getPartitionService().shutdown(); - clientContext.getInvocationService().shutdown(); clientContext.getNearCacheManager().destroyAllNearCaches(); fireLifecycleEvent(LifecycleEvent::SHUTDOWN); } diff --git a/hazelcast/src/hazelcast/client/spi/PartitionService.cpp b/hazelcast/src/hazelcast/client/spi/PartitionService.cpp index 9877a35efb..a0244401df 100644 --- a/hazelcast/src/hazelcast/client/spi/PartitionService.cpp +++ b/hazelcast/src/hazelcast/client/spi/PartitionService.cpp @@ -107,7 +107,9 @@ namespace hazelcast { responseMessage = future.get(); } catch (exception::IOException& e) { - util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + } } return responseMessage; } @@ -123,7 +125,9 @@ namespace hazelcast { responseMessage = future.get(); } catch (exception::IOException& e) { - util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what()); + } } return responseMessage; } @@ -170,12 +174,14 @@ namespace hazelcast { } } - if (!result) { - util::ILogger::getLogger().severe("PartitionService::getInitialPartitions Cannot get initial partitions!"); - } else { - util::ILogger::getLogger().finest("PartitionService::getInitialPartitions Got " + - util::IOUtil::to_string(partitionCount) + - " initial partitions successfully."); + if (clientContext.getLifecycleService().isRunning()) { + if (!result) { + util::ILogger::getLogger().severe("PartitionService::getInitialPartitions Cannot get initial partitions!"); + } else { + util::ILogger::getLogger().finest("PartitionService::getInitialPartitions Got " + + util::IOUtil::to_string(partitionCount) + + " initial partitions successfully."); + } } return result; } @@ -195,9 +201,13 @@ namespace hazelcast { processPartitionResponse(*partitionResponse); } } catch (hazelcast::client::exception::IException& e) { - util::ILogger::getLogger().finest(std::string("Exception in partitionService::refreshPartitions ") + e.what()); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().finest(std::string("Exception in partitionService::refreshPartitions ") + e.what()); + } } catch (...) { - util::ILogger::getLogger().finest(std::string("Unkown exception in partitionService::refreshPartitions ")); + if (clientContext.getLifecycleService().isRunning()) { + util::ILogger::getLogger().finest(std::string("Unkown exception in partitionService::refreshPartitions ")); + } throw; } updating = false; diff --git a/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp b/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp index 0bff66deba..6b31c09a20 100644 --- a/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp +++ b/hazelcast/src/hazelcast/client/spi/ServerListenerService.cpp @@ -54,10 +54,9 @@ namespace hazelcast { handler->beforeListenerRegister(); connection::CallFuture future = clientContext.getInvocationService().invokeOnRandomTarget( addListenerCodec->encodeRequest(), handler); - - std::string registrationId = registerInternal(addListenerCodec, future); + handler->registrationId = registerInternal(addListenerCodec, future); handler->onListenerRegister(); - return registrationId; + return handler->registrationId; } void ServerListenerService::reRegisterListener(std::string registrationId, @@ -84,7 +83,9 @@ namespace hazelcast { boost::shared_ptr registration = registrationIdMap.remove(*uuid); if ((impl::listener::EventRegistration *)NULL != registration.get()) { - clientContext.getInvocationService().removeEventHandler(registration->getCorrelationId()); + if (!clientContext.getInvocationService().removeEventHandler(registration->getCorrelationId())) { + return false; + } // send a remove listener request removeListenerCodec.setRegistrationId(*uuid); @@ -93,10 +94,23 @@ namespace hazelcast { connection::CallFuture future = clientContext.getInvocationService().invokeOnTarget( request, registration->getMemberAddress()); - std::auto_ptr response = future.get(); + future.get(); + } catch (exception::IOException &e) { + //if invocation cannot be done that means connection is broken, the server is shutdown + util::ILogger &logger = util::ILogger::getLogger(); + if (logger.isFinestEnabled()) { + std::ostringstream out; + const Address &memberAddress = registration->getMemberAddress(); + out << "[ServerListenerService::deRegisterListener] Removal of listener with id " << + uuid << " from server " << memberAddress << " failed. " << e.what(); + logger.finest(out.str()); + } } catch (exception::IException &e) { - //if invocation cannot be done that means connection is broken and listener is already removed - (void)e; // suppress the unused variable warning + std::ostringstream out; + const Address &memberAddress = registration->getMemberAddress(); + out << "[ServerListenerService::deRegisterListener] Removal of listener with id " << + uuid << " from server " << memberAddress << " failed. " << e.what(); + util::ILogger::getLogger().warning(out.str()); } return true;