diff --git a/hazelcast/include/hazelcast/client/connection/Connection.h b/hazelcast/include/hazelcast/client/connection/Connection.h index 8dc345a099..d990c32ff5 100644 --- a/hazelcast/include/hazelcast/client/connection/Connection.h +++ b/hazelcast/include/hazelcast/client/connection/Connection.h @@ -68,7 +68,7 @@ namespace hazelcast { void connect(int timeoutInMillis); - void close(); + void close(const char *closeReason = NULL); void write(protocol::ClientMessage *message); diff --git a/hazelcast/include/hazelcast/client/exception/ProtocolExceptions.h b/hazelcast/include/hazelcast/client/exception/ProtocolExceptions.h index 2fcf0127d8..f1ac82705c 100644 --- a/hazelcast/include/hazelcast/client/exception/ProtocolExceptions.h +++ b/hazelcast/include/hazelcast/client/exception/ProtocolExceptions.h @@ -33,8 +33,9 @@ namespace hazelcast { namespace exception { class HAZELCAST_API ProtocolException : public IException { public: - ProtocolException(const std::string& source, const std::string& message, int32_t errorNo, int32_t causeCode) - : IException(source, message), errorCode(errorNo), causeErrorCode(causeCode) { + ProtocolException(const std::string& message, const std::string& details, int32_t errorNo, + int32_t causeCode) + : IException("Cluster", details), errorCode(errorNo), causeErrorCode(causeCode) { } ProtocolException(const std::string& source, const std::string& message) @@ -56,8 +57,8 @@ namespace hazelcast { #define DEFINE_PROTOCOL_EXCEPTION(ClassName) \ class HAZELCAST_API ClassName : public ProtocolException {\ public:\ - ClassName(const std::string& source, const std::string& message, int32_t errorCode, int32_t causeCode) \ - : ProtocolException(source, message, errorCode, causeCode) {\ + ClassName(const std::string& message, const std::string& details, int32_t errorNo, int32_t causeCode) \ + : ProtocolException(message, details, errorNo, causeCode) {\ }\ ClassName(const std::string& source, const std::string& message) \ : ProtocolException(source, message) {\ diff --git a/hazelcast/include/hazelcast/client/spi/LifecycleService.h b/hazelcast/include/hazelcast/client/spi/LifecycleService.h index 048fa2a929..698c4d4259 100644 --- a/hazelcast/include/hazelcast/client/spi/LifecycleService.h +++ b/hazelcast/include/hazelcast/client/spi/LifecycleService.h @@ -16,9 +16,6 @@ // // Created by sancar koyunlu on 6/17/13. - - - #ifndef HAZELCAST_LIFECYCLE_SERVICE #define HAZELCAST_LIFECYCLE_SERVICE @@ -67,7 +64,7 @@ namespace hazelcast { std::set listeners; util::Mutex listenerLock; util::AtomicBoolean active; - + util::Mutex shutdownLock; }; } diff --git a/hazelcast/include/hazelcast/util/Closeable.h b/hazelcast/include/hazelcast/util/Closeable.h index 0bae25493d..911bede073 100644 --- a/hazelcast/include/hazelcast/util/Closeable.h +++ b/hazelcast/include/hazelcast/util/Closeable.h @@ -27,7 +27,7 @@ namespace hazelcast { public: virtual ~Closeable(); - virtual void close() = 0; + virtual void close(const char *closeReason) = 0; }; } } diff --git a/hazelcast/include/hazelcast/util/IOUtil.h b/hazelcast/include/hazelcast/util/IOUtil.h index f8ede479ec..91c88ecf19 100644 --- a/hazelcast/include/hazelcast/util/IOUtil.h +++ b/hazelcast/include/hazelcast/util/IOUtil.h @@ -46,7 +46,7 @@ namespace hazelcast { return value; } - static void closeResource(Closeable *closable); + static void closeResource(Closeable *closable, const char *closeReason = NULL); }; } diff --git a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp index 5f0e777306..1c031dedd7 100644 --- a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp +++ b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp @@ -84,6 +84,7 @@ namespace hazelcast { util::ILogger::getLogger().severe( std::string("Error while connecting to cluster! =>") + e.what()); isStartedSuccessfully = false; + clientContext.getLifecycleService().shutdown(); startLatch.countDown(); return; } @@ -105,7 +106,7 @@ namespace hazelcast { clientContext.getConnectionManager().onCloseOwnerConnection(); if (deletingConnection.compareAndSet(false, true)) { - util::IOUtil::closeResource(conn.get()); + util::IOUtil::closeResource(conn.get(), "Error while listening cluster events"); conn.reset(); deletingConnection = false; clientContext.getLifecycleService().fireLifecycleEvent(LifecycleEvent::CLIENT_DISCONNECTED); @@ -117,7 +118,7 @@ namespace hazelcast { void ClusterListenerThread::stop() { if (deletingConnection.compareAndSet(false, true)) { - util::IOUtil::closeResource(conn.get()); + util::IOUtil::closeResource(conn.get(), "Cluster listener thread is stopping"); conn.reset(); deletingConnection = false; } diff --git a/hazelcast/src/hazelcast/client/connection/Connection.cpp b/hazelcast/src/hazelcast/client/connection/Connection.cpp index ab6c82f1de..250066b5b0 100644 --- a/hazelcast/src/hazelcast/client/connection/Connection.cpp +++ b/hazelcast/src/hazelcast/client/connection/Connection.cpp @@ -89,7 +89,7 @@ namespace hazelcast { outputSocketStream.write(PROTOCOL); } - void Connection::close() { + void Connection::close(const char *closeReason) { if (!live.compareAndSet(true, false)) { return; } @@ -98,8 +98,10 @@ namespace hazelcast { int socketId = socket->getSocketId(); std::stringstream message; - message << "Closing connection (id:" << connectionId << ") to " << serverAddr << " with socket id " << socketId << - (_isOwnerConnection ? " as the owner connection." : "."); + message << "Closing connection (id:" << connectionId << ") to " << serverAddr << + " with socket id " << socketId << + (_isOwnerConnection ? " as the owner connection." : ". ") << + (NULL != closeReason ? closeReason : ""); util::ILogger::getLogger().warning(message.str()); if (!_isOwnerConnection) { readHandler.deRegisterSocket(); diff --git a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp index d776ab2b85..82c5dddad3 100644 --- a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp +++ b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp @@ -16,6 +16,7 @@ // // Created by sancar koyunlu on 8/21/13. +#include #include "hazelcast/util/Util.h" #include "hazelcast/client/protocol/AuthenticationStatus.h" #include "hazelcast/client/exception/AuthenticationException.h" @@ -69,6 +70,10 @@ namespace hazelcast { void ConnectionManager::shutdown() { live = false; + // close connections + BOOST_FOREACH(boost::shared_ptr connection , connections.values()) { + connection->close("Hazelcast client is shutting down"); + } heartBeater.shutdown(); if (heartBeatThread.get() != NULL) { heartBeatThread->cancel(); @@ -350,7 +355,7 @@ namespace hazelcast { void ConnectionManager::checkLive() { if (!live) { - throw exception::HazelcastException("ConnectionManager is not active!"); + throw exception::HazelcastException("ConnectionManager", "ConnectionManager is not active!"); } } @@ -384,7 +389,7 @@ namespace hazelcast { } else { ownerConnectionFuture.close(); } - util::IOUtil::closeResource(&connection); + util::IOUtil::closeResource(&connection, "Heartbeat failed"); } void ConnectionManager::removeEndpoint(const Address &address) { diff --git a/hazelcast/src/hazelcast/client/connection/IOSelector.cpp b/hazelcast/src/hazelcast/client/connection/IOSelector.cpp index 8e3187483f..10145e8f2c 100644 --- a/hazelcast/src/hazelcast/client/connection/IOSelector.cpp +++ b/hazelcast/src/hazelcast/client/connection/IOSelector.cpp @@ -42,7 +42,6 @@ namespace hazelcast { :connectionManager(connectionManager) { t.tv_sec = 5; t.tv_usec = 0; - isAlive = true; } void IOSelector::staticListen(util::ThreadArgs &args) { @@ -55,6 +54,10 @@ namespace hazelcast { } void IOSelector::wakeUp() { + if (!wakeUpSocket.get()) { + return; + } + int wakeUpSignal = 9; try { wakeUpSocket->send(&wakeUpSignal, sizeof(int)); @@ -91,6 +94,7 @@ namespace hazelcast { sleepingSocket->setBlocking(false); wakeUpSocketSet.insertSocket(sleepingSocket.get()); wakeUpListenerSocketId = sleepingSocket->getSocketId(); + isAlive = true; return true; } else { util::ILogger::getLogger().severe("IOSelector::initListenSocket " + std::string(strerror(errno))); @@ -99,7 +103,14 @@ namespace hazelcast { } void IOSelector::shutdown() { - isAlive = false; + if (!isAlive.compareAndSet(true, false)) { + return; + } + try { + wakeUp(); + } catch (exception::IOException &) { + // suppress io exception + } } void IOSelector::addTask(ListenerTask *listenerTask) { diff --git a/hazelcast/src/hazelcast/client/connection/OwnerConnectionFuture.cpp b/hazelcast/src/hazelcast/client/connection/OwnerConnectionFuture.cpp index c9c7c4da54..79b30b7a93 100644 --- a/hazelcast/src/hazelcast/client/connection/OwnerConnectionFuture.cpp +++ b/hazelcast/src/hazelcast/client/connection/OwnerConnectionFuture.cpp @@ -91,7 +91,7 @@ namespace hazelcast { std::stringstream message; message << "Closing owner connection to " << currentOwnerConnection->getRemoteEndpoint(); util::ILogger::getLogger().finest(message.str()); - util::IOUtil::closeResource(currentOwnerConnection.get()); + util::IOUtil::closeResource(currentOwnerConnection.get(), message.str().c_str()); markAsClosed(); } } diff --git a/hazelcast/src/hazelcast/client/spi/ClusterService.cpp b/hazelcast/src/hazelcast/client/spi/ClusterService.cpp index 0769f8d85b..80b455b0a6 100644 --- a/hazelcast/src/hazelcast/client/spi/ClusterService.cpp +++ b/hazelcast/src/hazelcast/client/spi/ClusterService.cpp @@ -69,7 +69,9 @@ namespace hazelcast { } void ClusterService::shutdown() { - active = false; + if (!active.compareAndSet(true, false)) { + return; + } if (NULL != clusterThread.getThread()) { // avoid anyone waiting on the start latch to get stuck clusterThread.startLatch.countDown(); diff --git a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp index e827030366..d86ea798a2 100644 --- a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp +++ b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp @@ -64,13 +64,16 @@ namespace hazelcast { } void LifecycleService::shutdown() { - if (!active.compareAndSet(true, false)) + util::LockGuard guard(shutdownLock); + + if (!active.compareAndSet(true, false)) { return; + } fireLifecycleEvent(LifecycleEvent::SHUTTING_DOWN); - clientContext.getInvocationService().shutdown(); - clientContext.getPartitionService().shutdown(); - clientContext.getClusterService().shutdown(); clientContext.getConnectionManager().shutdown(); + clientContext.getClusterService().shutdown(); + clientContext.getPartitionService().shutdown(); + clientContext.getInvocationService().shutdown(); clientContext.getNearCacheManager().destroyAllNearCaches(); fireLifecycleEvent(LifecycleEvent::SHUTDOWN); } diff --git a/hazelcast/src/hazelcast/util/IOUtil.cpp b/hazelcast/src/hazelcast/util/IOUtil.cpp index 7b84e1a1da..0f73dd3e9a 100644 --- a/hazelcast/src/hazelcast/util/IOUtil.cpp +++ b/hazelcast/src/hazelcast/util/IOUtil.cpp @@ -22,10 +22,10 @@ namespace hazelcast { namespace util { - void IOUtil::closeResource(Closeable *closable) { + void IOUtil::closeResource(Closeable *closable, const char *closeReason) { if (closable != NULL) { try { - closable->close(); + closable->close(closeReason); } catch (client::exception::IException& e) { std::stringstream message; message << "closeResource failed" << e.what(); diff --git a/hazelcast/src/hazelcast/util/Thread.cpp b/hazelcast/src/hazelcast/util/Thread.cpp index 56beb5c396..caef24d6e5 100644 --- a/hazelcast/src/hazelcast/util/Thread.cpp +++ b/hazelcast/src/hazelcast/util/Thread.cpp @@ -50,6 +50,7 @@ namespace hazelcast { , isJoined(false) , isInterrupted(false){ init(func, arg0, arg1, arg2, arg3); + } long Thread::getThreadID() { @@ -92,6 +93,11 @@ namespace hazelcast { if (!isJoined.compareAndSet(false, true)) { return true; } + if (id == getThreadID()) { + // called from inside the thread, deadlock possibility + return false; + } + DWORD err = WaitForSingleObject(thread, INFINITE); if (err != WAIT_OBJECT_0) { return false; @@ -197,6 +203,12 @@ namespace hazelcast { if (!isJoined.compareAndSet(false, true)) { return true; } + + if (pthread_equal(thread, pthread_self())) { + // called from inside the thread, deadlock possibility + return false; + } + int err = pthread_join(thread, NULL); if (EINVAL == err || ESRCH == err || EDEADLK == err) { isJoined = false; diff --git a/hazelcast/test/src/cluster/ClientConnectionTest.cpp b/hazelcast/test/src/cluster/ClientConnectionTest.cpp index 40c0252848..6c070d0280 100644 --- a/hazelcast/test/src/cluster/ClientConnectionTest.cpp +++ b/hazelcast/test/src/cluster/ClientConnectionTest.cpp @@ -48,13 +48,6 @@ namespace hazelcast { ASSERT_THROW(HazelcastClient client(config), exception::IllegalStateException); } - TEST_F(ClientConnectionTest, testTcpSocketConnectionTimeout_withIntMax) { - HazelcastServer instance(*g_srvFactory, true); - ClientConfig config; - config.addAddress(Address("8.8.8.8", 5701)); - ASSERT_THROW(HazelcastClient client(config), exception::IllegalStateException); - } - #ifdef HZ_BUILD_WITH_SSL TEST_F(ClientConnectionTest, testSslSocketTimeoutToOutsideNetwork) { HazelcastServer instance(*g_srvFactory, true); diff --git a/hazelcast/test/src/cluster/ClusterTest.cpp b/hazelcast/test/src/cluster/ClusterTest.cpp index e51540b04b..48286b89b4 100644 --- a/hazelcast/test/src/cluster/ClusterTest.cpp +++ b/hazelcast/test/src/cluster/ClusterTest.cpp @@ -58,6 +58,64 @@ namespace hazelcast { } }; + class ClientAllStatesListener : public LifecycleListener { + public: + + ClientAllStatesListener(util::CountDownLatch *startingLatch, util::CountDownLatch *startedLatch = NULL, + util::CountDownLatch *connectedLatch = NULL, + util::CountDownLatch *disconnectedLatch = NULL, + util::CountDownLatch *shuttingDownLatch = NULL, + util::CountDownLatch *shutdownLatch = NULL) + : startingLatch(startingLatch), startedLatch(startedLatch), connectedLatch(connectedLatch), + disconnectedLatch(disconnectedLatch), shuttingDownLatch(shuttingDownLatch), + shutdownLatch(shutdownLatch) { } + + virtual void stateChanged(const LifecycleEvent &lifecycleEvent) { + switch (lifecycleEvent.getState()) { + case LifecycleEvent::STARTING: + if (startingLatch) { + startingLatch->countDown(); + } + break; + case LifecycleEvent::STARTED: + if (startedLatch) { + startedLatch->countDown(); + } + break; + case LifecycleEvent::CLIENT_CONNECTED: + if (connectedLatch) { + connectedLatch->countDown(); + } + break; + case LifecycleEvent::CLIENT_DISCONNECTED: + if (disconnectedLatch) { + disconnectedLatch->countDown(); + } + break; + case LifecycleEvent::SHUTTING_DOWN: + if (shuttingDownLatch) { + shuttingDownLatch->countDown(); + } + break; + case LifecycleEvent::SHUTDOWN: + if (shutdownLatch) { + shutdownLatch->countDown(); + } + break; + default: + FAIL() << "No such state expected:" << lifecycleEvent.getState(); + } + } + + private: + util::CountDownLatch *startingLatch; + util::CountDownLatch *startedLatch; + util::CountDownLatch *connectedLatch; + util::CountDownLatch *disconnectedLatch; + util::CountDownLatch *shuttingDownLatch; + util::CountDownLatch *shutdownLatch; + }; + class SampleInitialListener : public InitialMembershipListener { public: SampleInitialListener(util::CountDownLatch &_memberAdded, util::CountDownLatch &_attributeLatch, @@ -249,10 +307,39 @@ namespace hazelcast { } TEST_P(ClusterTest, testBehaviourWhenClusterNotFound) { - ClientConfig clientConfig; + ClientConfig &clientConfig = *const_cast(GetParam()); ASSERT_THROW(HazelcastClient client(clientConfig), exception::IllegalStateException); } + TEST_P(ClusterTest, testAllClientStates) { + HazelcastServer instance(*g_srvFactory); + + ClientConfig clientConfig; + clientConfig.setAttemptPeriod(1000); + clientConfig.setConnectionAttemptLimit(1); + util::CountDownLatch startingLatch(1); + util::CountDownLatch startedLatch(1); + util::CountDownLatch connectedLatch(1); + util::CountDownLatch disconnectedLatch(1); + util::CountDownLatch shuttingDownLatch(1); + util::CountDownLatch shutdownLatch(1); + ClientAllStatesListener listener(&startingLatch, &startedLatch, &connectedLatch, &disconnectedLatch, + &shuttingDownLatch, &shutdownLatch); + clientConfig.addListener(&listener); + + HazelcastClient client(clientConfig); + + ASSERT_TRUE(startingLatch.await(0)); + ASSERT_TRUE(startedLatch.await(0)); + ASSERT_TRUE(connectedLatch.await(0)); + + instance.shutdown(); + + ASSERT_TRUE(disconnectedLatch.await(3)); + ASSERT_TRUE(shuttingDownLatch.await(5)); + ASSERT_TRUE(shutdownLatch.await(500)); + } + #ifdef HZ_BUILD_WITH_SSL INSTANTIATE_TEST_CASE_P(All, ClusterTest, diff --git a/hazelcast/test/src/issues/IssueTest.cpp b/hazelcast/test/src/issues/IssueTest.cpp index c613624a93..7d03d40705 100644 --- a/hazelcast/test/src/issues/IssueTest.cpp +++ b/hazelcast/test/src/issues/IssueTest.cpp @@ -138,7 +138,10 @@ namespace hazelcast { } catch (exception::IOException &) { // this is the expected exception, test passes, do nothing } catch (exception::IException &e) { - FAIL() << "IException is received while we expect IOException. Received exception:" << e.what(); + std::string msg = e.what(); + if (msg.find("ConnectionManager is not active") == std::string::npos) { + FAIL() << "Unexpected exception. Received exception:" << msg; + } } } diff --git a/hazelcast/test/src/util/ClientUtilTest.cpp b/hazelcast/test/src/util/ClientUtilTest.cpp index fda6c5c0b1..6b8a2c1c0b 100644 --- a/hazelcast/test/src/util/ClientUtilTest.cpp +++ b/hazelcast/test/src/util/ClientUtilTest.cpp @@ -20,6 +20,7 @@ #include "hazelcast/util/Util.h" #include "hazelcast/util/Future.h" #include "hazelcast/util/Thread.h" +#include "hazelcast/util/CountDownLatch.h" #include #include @@ -29,7 +30,7 @@ namespace hazelcast { namespace client { namespace test { class ClientUtilTest : public ::testing::Test { - public: + protected: static void wakeTheConditionUp(util::ThreadArgs& args) { util::Mutex *mutex = (util::Mutex *)args.arg0; util::ConditionVariable *cv = (util::ConditionVariable *)args.arg1; @@ -55,6 +56,19 @@ namespace hazelcast { std::auto_ptr exception(new exception::IException("exceptionName", "details")); future->set_exception(exception); } + + static void cancelJoinFromRunningThread(util::ThreadArgs& args) { + util::Thread *currentThread = args.currentThread; + util::CountDownLatch *latch = (util::CountDownLatch *) args.arg0; + currentThread->cancel(); + ASSERT_FALSE(currentThread->join()); + latch->countDown(); + } + + static void notifyExitingThread(util::ThreadArgs& args) { + util::CountDownLatch *latch = (util::CountDownLatch *) args.arg0; + latch->countDown(); + } }; TEST_F(ClientUtilTest, testConditionWaitTimeout) { @@ -152,6 +166,25 @@ namespace hazelcast { ASSERT_EQ(threadName, thread.getThreadName()); } + TEST_F (ClientUtilTest, testThreadJoinAfterThreadExited) { + std::string threadName = "myThreadName"; + util::CountDownLatch latch(1); + util::Thread thread(threadName, notifyExitingThread, &latch); + ASSERT_TRUE(latch.await(2)); + // guarantee that the thread exited + util::sleep(1); + + // call join after thread exit + thread.join(); + } + + TEST_F (ClientUtilTest, testCancelJoinItselfFromTheRunningThread) { + std::string threadName = "myThreadName"; + util::CountDownLatch latch(1); + util::Thread thread(threadName, cancelJoinFromRunningThread, &latch); + ASSERT_TRUE(latch.await(1000)); + } + void sleepyThread(util::ThreadArgs& args) { int sleepTime = *(int *)args.arg0; args.currentThread->interruptibleSleep(sleepTime);