From 87487333d20d653f8c9d7bc6fc5b6865054bd4a5 Mon Sep 17 00:00:00 2001 From: ihsan demir Date: Thu, 17 Mar 2016 15:54:29 +0200 Subject: [PATCH] Fixes the problems with client hanging during server restarts on heavy loads. Fixes a memory access issue when re-throwing an exception, correctly clears the call promise maps during connection close. Adds LoadTests while server retarting and solves the problems that the tests creates. --- CMakeLists.txt | 2 +- .../hazelcast/client/HazelcastClient.h | 1 + hazelcast/include/hazelcast/client/IMap.h | 18 +- hazelcast/include/hazelcast/client/IQueue.h | 7 +- .../hazelcast/client/connection/CallPromise.h | 6 - .../client/connection/ClusterListenerThread.h | 3 +- .../hazelcast/client/connection/Connection.h | 6 + .../client/connection/ConnectionManager.h | 22 ++- .../hazelcast/client/connection/IOSelector.h | 2 + .../client/exception/pimpl/ExceptionHandler.h | 2 + .../client/protocol/ClientMessageBuilder.h | 4 +- .../hazelcast/client/proxy/IQueueImpl.h | 2 +- .../client/proxy/TransactionalObject.h | 4 +- .../hazelcast/client/spi/ClusterService.h | 2 +- .../hazelcast/client/spi/InvocationService.h | 58 ++++--- .../hazelcast/client/spi/PartitionService.h | 3 + .../include/hazelcast/util/ConcurrentQueue.h | 37 ++++- hazelcast/include/hazelcast/util/SocketSet.h | 3 +- hazelcast/include/hazelcast/util/Thread.h | 23 ++- hazelcast/src/hazelcast/client/Socket.cpp | 9 +- .../client/connection/CallPromise.cpp | 1 - .../connection/ClusterListenerThread.cpp | 22 ++- .../client/connection/Connection.cpp | 24 ++- .../client/connection/ConnectionManager.cpp | 93 ++++++++--- .../hazelcast/client/connection/IOHandler.cpp | 1 + .../client/connection/IOSelector.cpp | 4 + .../client/connection/ReadHandler.cpp | 5 +- .../client/connection/WriteHandler.cpp | 18 +- .../exception/pimpl/ExceptionHandler.cpp | 6 +- .../client/protocol/ClientMessageBuilder.cpp | 10 +- .../src/hazelcast/client/proxy/IQueueImpl.cpp | 2 +- .../hazelcast/client/spi/ClusterService.cpp | 15 +- .../client/spi/InvocationService.cpp | 141 ++++++++++------ .../hazelcast/client/spi/LifecycleService.cpp | 10 +- .../hazelcast/client/spi/PartitionService.cpp | 8 +- hazelcast/src/hazelcast/util/ByteBuffer.cpp | 2 +- hazelcast/src/hazelcast/util/SocketSet.cpp | 52 ++++-- hazelcast/src/hazelcast/util/Thread.cpp | 49 ++++-- hazelcast/src/hazelcast/util/Util.cpp | 2 +- hazelcast/test/src/ClientTestSupport.h | 2 +- hazelcast/test/src/HazelcastServer.cpp | 3 +- hazelcast/test/src/HazelcastServer.h | 2 + hazelcast/test/src/HazelcastServerFactory.cpp | 70 +++++--- hazelcast/test/src/HazelcastServerFactory.h | 7 +- hazelcast/test/src/SimpleMapTest.h | 9 +- hazelcast/test/src/cluster/ClusterTest.cpp | 4 +- .../test/src/faulttolerance/LoadTest.cpp | 156 ++++++++++++++++++ hazelcast/test/src/issues/IssueTest.cpp | 2 +- hazelcast/test/src/queue/ClientQueueTest.cpp | 37 ++--- hazelcast/test/src/txn/ClientTxnTest.cpp | 6 +- hazelcast/test/src/util/BitsTest.cpp | 3 - hazelcast/test/src/util/ClientUtilTest.cpp | 2 +- .../test/src/util/ConcurrentQueueTest.cpp | 135 +++++++++++++++ java/src/main/java/CppClientListener.java | 54 ++++-- 54 files changed, 872 insertions(+), 299 deletions(-) create mode 100644 hazelcast/test/src/faulttolerance/LoadTest.cpp create mode 100644 hazelcast/test/src/util/ConcurrentQueueTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 7d3997c907..9d7730c360 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -130,7 +130,7 @@ ENDIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows") IF(${HZ_BUILD_TESTS} MATCHES "ON") SET(BUILD_GTEST "ON") - SET(DBUILD_GMOCK "OFF") + SET(BUILD_GMOCK "OFF") ADD_SUBDIRECTORY(hazelcast/test) ENDIF(${HZ_BUILD_TESTS} MATCHES "ON") diff --git a/hazelcast/include/hazelcast/client/HazelcastClient.h b/hazelcast/include/hazelcast/client/HazelcastClient.h index 25a8558798..3636cced23 100644 --- a/hazelcast/include/hazelcast/client/HazelcastClient.h +++ b/hazelcast/include/hazelcast/client/HazelcastClient.h @@ -414,6 +414,7 @@ namespace hazelcast { /** * Constructs a hazelcastClient with given ClientConfig. * Note: ClientConfig will be copied. + * @param config client configuration to start the client with */ HazelcastClient(ClientConfig&); diff --git a/hazelcast/include/hazelcast/client/IMap.h b/hazelcast/include/hazelcast/client/IMap.h index aa26ebf69a..6927e22565 100644 --- a/hazelcast/include/hazelcast/client/IMap.h +++ b/hazelcast/include/hazelcast/client/IMap.h @@ -508,7 +508,7 @@ namespace hazelcast { */ std::map getAll(const std::set &keys) { std::vector keySet(keys.size()); - int i = 0; + size_t i = 0; for (typename std::set::iterator it = keys.begin(); it != keys.end(); ++it) { keySet[i++] = toData(*it); } @@ -533,9 +533,9 @@ namespace hazelcast { */ std::vector keySet() { std::vector dataResult = proxy::IMapImpl::keySet(); - int size = dataResult.size(); + size_t size = dataResult.size(); std::vector keys(size); - for (int i = 0; i < size; ++i) { + for (size_t i = 0; i < size; ++i) { boost::shared_ptr key = toObject(dataResult[i]); keys[i] = *key; } @@ -554,9 +554,9 @@ namespace hazelcast { */ std::vector keySet(const serialization::IdentifiedDataSerializable &predicate) { std::vector dataResult = proxy::IMapImpl::keySet(predicate); - int size = dataResult.size(); + size_t size = dataResult.size(); std::vector keys(size); - for (int i = 0; i < size; ++i) { + for (size_t i = 0; i < size; ++i) { boost::shared_ptr key = toObject(dataResult[i]); keys[i] = *key; } @@ -572,9 +572,9 @@ namespace hazelcast { */ std::vector values() { std::vector dataResult = proxy::IMapImpl::values(); - int size = dataResult.size(); + size_t size = dataResult.size(); std::vector values(size); - for (int i = 0; i < size; ++i) { + for (size_t i = 0; i < size; ++i) { boost::shared_ptr value = toObject(dataResult[i]); values[i] = *value; } @@ -591,9 +591,9 @@ namespace hazelcast { */ std::vector values(const serialization::IdentifiedDataSerializable &predicate) { std::vector dataResult = proxy::IMapImpl::values(predicate); - int size = dataResult.size(); + size_t size = dataResult.size(); std::vector values(size); - for (int i = 0; i < size; ++i) { + for (size_t i = 0; i < size; ++i) { boost::shared_ptr value = toObject(dataResult[i]); values[i] = *value; } diff --git a/hazelcast/include/hazelcast/client/IQueue.h b/hazelcast/include/hazelcast/client/IQueue.h index 039a47d4e2..28e50d4d7f 100644 --- a/hazelcast/include/hazelcast/client/IQueue.h +++ b/hazelcast/include/hazelcast/client/IQueue.h @@ -151,7 +151,7 @@ namespace hazelcast { * @param elements the vector that elements will be drained to. * @return number of elements drained. */ - int drainTo(std::vector& elements) { + size_t drainTo(std::vector& elements) { return drainTo(elements, -1); } @@ -162,7 +162,7 @@ namespace hazelcast { * @param elements vector that elements will be drained to. * @return number of elements drained. */ - int drainTo(std::vector& elements, int maxElements) { + size_t drainTo(std::vector& elements, size_t maxElements) { std::vector coll = proxy::IQueueImpl::drainTo(maxElements); for (std::vector::const_iterator it = coll.begin(); it != coll.end(); ++it) { boost::shared_ptr e = context->getSerializationService().template toObject(*it); @@ -266,8 +266,7 @@ namespace hazelcast { } private: IQueue(const std::string& instanceName, spi::ClientContext *context) - : proxy::IQueueImpl(instanceName, context){ - + : proxy::IQueueImpl(instanceName, context) { } }; } diff --git a/hazelcast/include/hazelcast/client/connection/CallPromise.h b/hazelcast/include/hazelcast/client/connection/CallPromise.h index a99338bf53..30a168fb7c 100644 --- a/hazelcast/include/hazelcast/client/connection/CallPromise.h +++ b/hazelcast/include/hazelcast/client/connection/CallPromise.h @@ -16,19 +16,13 @@ // // Created by sancar koyunlu on 03/01/14. // - - - - #ifndef HAZELCAST_ClientCallPromise #define HAZELCAST_ClientCallPromise - #include "hazelcast/util/HazelcastDll.h" #include "hazelcast/util/Future.h" #include "hazelcast/util/AtomicInt.h" -#include "hazelcast/client/impl/BaseEventHandler.h" #include namespace hazelcast { diff --git a/hazelcast/include/hazelcast/client/connection/ClusterListenerThread.h b/hazelcast/include/hazelcast/client/connection/ClusterListenerThread.h index 2491a12595..6556773a0a 100644 --- a/hazelcast/include/hazelcast/client/connection/ClusterListenerThread.h +++ b/hazelcast/include/hazelcast/client/connection/ClusterListenerThread.h @@ -28,6 +28,7 @@ #include "hazelcast/client/MembershipEvent.h" #include +#include #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) #pragma warning(push) @@ -69,7 +70,7 @@ namespace hazelcast { const int32_t &operationType, std::auto_ptr value); - std::vector
getSocketAddresses(); + std::set getSocketAddresses(); util::CountDownLatch startLatch; diff --git a/hazelcast/include/hazelcast/client/connection/Connection.h b/hazelcast/include/hazelcast/client/connection/Connection.h index d45dc6247c..9be78a8e89 100644 --- a/hazelcast/include/hazelcast/client/connection/Connection.h +++ b/hazelcast/include/hazelcast/client/connection/Connection.h @@ -85,6 +85,10 @@ namespace hazelcast { virtual void handleMessage(connection::Connection &connection, std::auto_ptr message); + int getConnectionId() const; + + void setConnectionId(int connectionId); + util::AtomicInt lastRead; util::AtomicBoolean live; private: @@ -101,6 +105,8 @@ namespace hazelcast { protocol::ClientMessageBuilder messageBuilder; protocol::ClientMessage wrapperMessage; std::auto_ptr responseMessage; + + int connectionId; }; } diff --git a/hazelcast/include/hazelcast/client/connection/ConnectionManager.h b/hazelcast/include/hazelcast/client/connection/ConnectionManager.h index 8c4f2d3bd7..24e7149eb4 100644 --- a/hazelcast/include/hazelcast/client/connection/ConnectionManager.h +++ b/hazelcast/include/hazelcast/client/connection/ConnectionManager.h @@ -96,18 +96,35 @@ namespace hazelcast { /** * Tries to connect to an address in member list. * + * @param tryCount The number of times it shall try during connection establishment if not connected * @return authenticated connection * @throws Exception authentication failed or no connection found */ boost::shared_ptr getRandomConnection(int tryCount); + /** + * Tries to connect to an address in member list. + * + * This check is to guarantee that the same server is not retried. This may happen since the + * cluster member update and the partition update may be received a little later and/or the load + * balancer may produce the same address. + * + * @param tryCount The number of times it shall try during connection establishment if not connected + * @param retryWaitTime The number of seconds to wait if the found random address is the same as the last + * tried address comes out to be the same as the randomly selected server address. + * @return authenticated connection + * @throws Exception authentication failed or no connection found + */ + boost::shared_ptr getRandomConnection(int tryCount, const std::string &lastTriedAddress, + int retryWaitTime); + /** * Called when an connection is closed. * Clears related resources of given clientConnection. * * @param clientConnection closed connection */ - void onConnectionClose(const Address &address); + void onConnectionClose(const Address &address, int socketId); /** * Shutdown clientConnectionManager @@ -168,6 +185,8 @@ namespace hazelcast { std::auto_ptr uuid, std::auto_ptr ownerUuid); + boost::shared_ptr getOwnerConnection(); + std::vector PROTOCOL; util::SynchronizedMap connections; util::SynchronizedMap socketConnections; @@ -188,6 +207,7 @@ namespace hazelcast { OwnerConnectionFuture ownerConnectionFuture; util::Atomic callIdGenerator; + util::Atomic connectionIdCounter; }; } } diff --git a/hazelcast/include/hazelcast/client/connection/IOSelector.h b/hazelcast/include/hazelcast/client/connection/IOSelector.h index 30cb4f3327..b2dd6a2e2a 100644 --- a/hazelcast/include/hazelcast/client/connection/IOSelector.h +++ b/hazelcast/include/hazelcast/client/connection/IOSelector.h @@ -60,6 +60,8 @@ namespace hazelcast { void addTask(ListenerTask *listenerTask); + void cancelTask(ListenerTask *listenerTask); + void wakeUp(); void shutdown(); diff --git a/hazelcast/include/hazelcast/client/exception/pimpl/ExceptionHandler.h b/hazelcast/include/hazelcast/client/exception/pimpl/ExceptionHandler.h index 03e30f1564..1bca6f917a 100644 --- a/hazelcast/include/hazelcast/client/exception/pimpl/ExceptionHandler.h +++ b/hazelcast/include/hazelcast/client/exception/pimpl/ExceptionHandler.h @@ -44,6 +44,8 @@ namespace hazelcast { */ static std::string INSTANCE_NOT_ACTIVE; + + static std::string ILLEGAL_STATE; /** * InternalAPI rethrows the exception with appropriate type * diff --git a/hazelcast/include/hazelcast/client/protocol/ClientMessageBuilder.h b/hazelcast/include/hazelcast/client/protocol/ClientMessageBuilder.h index b00d4af5f8..c22546d450 100644 --- a/hazelcast/include/hazelcast/client/protocol/ClientMessageBuilder.h +++ b/hazelcast/include/hazelcast/client/protocol/ClientMessageBuilder.h @@ -47,7 +47,7 @@ namespace hazelcast { class ClientMessageBuilder { public: - ClientMessageBuilder(IMessageHandler *service, connection::Connection &connection); + ClientMessageBuilder(IMessageHandler &service, connection::Connection &connection); virtual ~ClientMessageBuilder(); @@ -77,7 +77,7 @@ namespace hazelcast { std::auto_ptr message; - IMessageHandler *messageHandler; + IMessageHandler &messageHandler; connection::Connection &connection; int32_t frameLen; diff --git a/hazelcast/include/hazelcast/client/proxy/IQueueImpl.h b/hazelcast/include/hazelcast/client/proxy/IQueueImpl.h index cc93028142..5b61158339 100644 --- a/hazelcast/include/hazelcast/client/proxy/IQueueImpl.h +++ b/hazelcast/include/hazelcast/client/proxy/IQueueImpl.h @@ -46,7 +46,7 @@ namespace hazelcast { bool contains(const serialization::pimpl::Data& element); - std::vector drainTo(int maxElements); + std::vector drainTo(size_t maxElements); std::auto_ptr peek(); diff --git a/hazelcast/include/hazelcast/client/proxy/TransactionalObject.h b/hazelcast/include/hazelcast/client/proxy/TransactionalObject.h index 094a8b1813..6776da22c5 100644 --- a/hazelcast/include/hazelcast/client/proxy/TransactionalObject.h +++ b/hazelcast/include/hazelcast/client/proxy/TransactionalObject.h @@ -88,9 +88,9 @@ namespace hazelcast { template std::vector toObjectCollection(const std::vector &keyDataSet) { - int size = keyDataSet.size(); + size_t size = keyDataSet.size(); std::vector keys(size); - for (int i = 0; i < size; i++) { + for (size_t i = 0; i < size; i++) { boost::shared_ptr v = toObject(keyDataSet[i]); keys[i] = *v; } diff --git a/hazelcast/include/hazelcast/client/spi/ClusterService.h b/hazelcast/include/hazelcast/client/spi/ClusterService.h index 736948e838..010d278da4 100644 --- a/hazelcast/include/hazelcast/client/spi/ClusterService.h +++ b/hazelcast/include/hazelcast/client/spi/ClusterService.h @@ -105,7 +105,7 @@ namespace hazelcast { void setMembers(std::auto_ptr > map); - boost::shared_ptr connectToOne(); + boost::shared_ptr connectToOne(const Address *previousConnectionAddr); // ------------------------------------------------------ }; diff --git a/hazelcast/include/hazelcast/client/spi/InvocationService.h b/hazelcast/include/hazelcast/client/spi/InvocationService.h index 86234bfa81..2200d25f24 100644 --- a/hazelcast/include/hazelcast/client/spi/InvocationService.h +++ b/hazelcast/include/hazelcast/client/spi/InvocationService.h @@ -15,8 +15,6 @@ */ // // Created by sancar koyunlu on 5/23/13. - - #ifndef HAZELCAST_INVOCATION_SERVICE #define HAZELCAST_INVOCATION_SERVICE @@ -24,6 +22,7 @@ #include "hazelcast/util/AtomicInt.h" #include "hazelcast/util/SynchronizedMap.h" #include "hazelcast/client/protocol/IMessageHandler.h" +#include "hazelcast/util/AtomicBoolean.h" #include #include @@ -64,7 +63,6 @@ namespace hazelcast { } namespace spi { - class ClientContext; class HAZELCAST_API InvocationService : public protocol::IMessageHandler { @@ -73,21 +71,29 @@ namespace hazelcast { virtual ~InvocationService(); - void start(); + bool start(); + + void shutdown(); connection::CallFuture invokeOnRandomTarget(std::auto_ptr request); - connection::CallFuture invokeOnPartitionOwner(std::auto_ptr request, int partitionId); + connection::CallFuture invokeOnPartitionOwner(std::auto_ptr request, + int partitionId); - connection::CallFuture invokeOnTarget(std::auto_ptr request, const Address& target); + connection::CallFuture invokeOnTarget(std::auto_ptr request, + const Address& target); - connection::CallFuture invokeOnRandomTarget(std::auto_ptr request, hazelcast::client::impl::BaseEventHandler *handler); + connection::CallFuture invokeOnRandomTarget(std::auto_ptr request, + client::impl::BaseEventHandler *handler); - connection::CallFuture invokeOnTarget(std::auto_ptr request, hazelcast::client::impl::BaseEventHandler *handler, const Address& target); + connection::CallFuture invokeOnTarget(std::auto_ptr request, + client::impl::BaseEventHandler *handler, const Address& target); - connection::CallFuture invokeOnPartitionOwner(std::auto_ptr request, hazelcast::client::impl::BaseEventHandler *handler, int partitionId); + connection::CallFuture invokeOnPartitionOwner(std::auto_ptr request, + client::impl::BaseEventHandler *handler, int partitionId); - connection::CallFuture invokeOnConnection(std::auto_ptr request, boost::shared_ptr connection); + connection::CallFuture invokeOnConnection(std::auto_ptr request, + boost::shared_ptr connection); bool isRedoOperation() const; @@ -123,55 +129,65 @@ namespace hazelcast { /** * Retries the given promise on an available connection. */ - boost::shared_ptr resend(boost::shared_ptr promise, const std::string& lastAddress); + boost::shared_ptr resend(boost::shared_ptr promise, + const std::string& lastAddress); private: bool redoOperation; int heartbeatTimeout; int retryWaitTime; int retryCount; spi::ClientContext& clientContext; - util::SynchronizedMap > callPromises; - util::SynchronizedMap > eventHandlerPromises; + // Is not using the Connection* for the key due to a possible ABA problem. + util::SynchronizedMap > callPromises; + util::SynchronizedMap > eventHandlerPromises; + + util::AtomicBoolean isOpen; bool isAllowedToSentRequest(connection::Connection& connection, protocol::ClientMessage const&); - connection::CallFuture doSend(std::auto_ptr request, std::auto_ptr eventHandler, boost::shared_ptr, int); + connection::CallFuture doSend(std::auto_ptr request, + std::auto_ptr eventHandler, + boost::shared_ptr, int); /** * Returns the actual connection that request is send over, * Returns null shared_ptr if request is not send. */ - boost::shared_ptr registerAndEnqueue(boost::shared_ptr,boost::shared_ptr, int partitionId); + boost::shared_ptr registerAndEnqueue(boost::shared_ptr &conn, + boost::shared_ptr); /** CallId Related **/ - void registerCall(connection::Connection& connection, boost::shared_ptr promise); + void registerCall(connection::Connection &connection, boost::shared_ptr promise); - boost::shared_ptr deRegisterCall(connection::Connection& connection, int64_t callId); + boost::shared_ptr deRegisterCall(int connectionId, int64_t callId); /** **/ void registerEventHandler(int64_t correlationId, connection::Connection& connection, boost::shared_ptr promise); - boost::shared_ptr deRegisterEventHandler(connection::Connection& connection, int64_t callId); + boost::shared_ptr deRegisterEventHandler(connection::Connection& connection, + int64_t callId); /***** HANDLE PACKET PART ****/ /* returns shouldSetResponse */ - bool handleException(protocol::ClientMessage *response, boost::shared_ptr promise, const Address& address); + bool handleException(protocol::ClientMessage *response, boost::shared_ptr promise, + const Address& address); /* returns shouldSetResponse */ bool handleEventUuid(protocol::ClientMessage *response, boost::shared_ptr promise); /** CallPromise Map **/ - boost::shared_ptr< util::SynchronizedMap > getCallPromiseMap(connection::Connection& connection); + boost::shared_ptr< util::SynchronizedMap > getCallPromiseMap(int connectionId); /** EventHandler Map **/ // TODO: Put the promise map as a member of the connection object. In this way, we can get the promise map directly from connection object // without a need for a map lookup since we already know the connection and the map is specific to a connection - boost::shared_ptr< util::SynchronizedMap > getEventHandlerPromiseMap(connection::Connection& connection); + boost::shared_ptr< util::SynchronizedMap > getEventHandlerPromiseMap( + connection::Connection& connection); boost::shared_ptr getEventHandlerPromise(connection::Connection& , int64_t callId); }; diff --git a/hazelcast/include/hazelcast/client/spi/PartitionService.h b/hazelcast/include/hazelcast/client/spi/PartitionService.h index b15e5d0a2e..79aa493e5c 100644 --- a/hazelcast/include/hazelcast/client/spi/PartitionService.h +++ b/hazelcast/include/hazelcast/client/spi/PartitionService.h @@ -75,6 +75,9 @@ namespace hazelcast { */ void refreshPartitions(); + // Wakes up the partition thread and triggers a partition refresh + void wakeup(); + private: spi::ClientContext &clientContext; diff --git a/hazelcast/include/hazelcast/util/ConcurrentQueue.h b/hazelcast/include/hazelcast/util/ConcurrentQueue.h index 25251fa9fc..50877c36f9 100644 --- a/hazelcast/include/hazelcast/util/ConcurrentQueue.h +++ b/hazelcast/include/hazelcast/util/ConcurrentQueue.h @@ -23,7 +23,7 @@ #include "hazelcast/util/HazelcastDll.h" #include "hazelcast/util/LockGuard.h" #include "hazelcast/util/Mutex.h" -#include +#include #include #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) @@ -43,7 +43,7 @@ namespace hazelcast { void offer(T *e) { util::LockGuard lg(m); - internalQueue.push(e); + internalQueue.push_back(e); } T *poll() { @@ -51,14 +51,43 @@ namespace hazelcast { util::LockGuard lg(m); if (!internalQueue.empty()) { e = internalQueue.front(); - internalQueue.pop(); + internalQueue.pop_front(); } return e; } + /** + * Note that this method is not very efficient but it is only called very rarely when the connection is closed + * Complexity: N2 + * @param itemToBeRemoved The item to be removed from the queue + * @return number of items removed from the queue + */ + int removeAll(const T *itemToBeRemoved) { + util::LockGuard lg(m); + int numErased = 0; + bool isFound; + do { + isFound = false; + for (typename std::deque::iterator it = internalQueue.begin();it != internalQueue.end(); ++it) { + T *e = *it; + if (itemToBeRemoved == e) { + internalQueue.erase(it); + isFound = true; + ++numErased; + break; + } + } + } while (isFound); + return numErased; + } + private: util::Mutex m; - std::queue internalQueue; + /** + * Did not choose std::list which shall give better removeAll performance since deque is more efficient on + * offer and poll due to data locality (best would be std::vector but it does not allow pop_front). + */ + std::deque internalQueue; }; } } diff --git a/hazelcast/include/hazelcast/util/SocketSet.h b/hazelcast/include/hazelcast/util/SocketSet.h index 1d4a55c3c7..40c86f2c28 100644 --- a/hazelcast/include/hazelcast/util/SocketSet.h +++ b/hazelcast/include/hazelcast/util/SocketSet.h @@ -44,8 +44,7 @@ namespace hazelcast { void removeSocket(client::Socket const *); private: - typedef std::set SocketContainer; - SocketContainer sockets; + std::set sockets; util::Mutex accessLock; }; diff --git a/hazelcast/include/hazelcast/util/Thread.h b/hazelcast/include/hazelcast/util/Thread.h index d931f79c16..c566b85954 100644 --- a/hazelcast/include/hazelcast/util/Thread.h +++ b/hazelcast/include/hazelcast/util/Thread.h @@ -16,12 +16,14 @@ // // Created by sancar koyunlu on 31/03/14. // - #ifndef HAZELCAST_Thread #define HAZELCAST_Thread - #include "hazelcast/util/ThreadArgs.h" +#include "hazelcast/util/ConditionVariable.h" +#include "hazelcast/util/Mutex.h" +#include "hazelcast/util/AtomicBoolean.h" + #include #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) @@ -65,7 +67,9 @@ namespace hazelcast { void interruptibleSleep(int seconds); - void interrupt(); + void wakeup(); + + void cancel(); bool join(); @@ -75,8 +79,8 @@ namespace hazelcast { void init(void (func)(ThreadArgs &), void *arg0, void *arg1, void *arg2, void *arg3 ); std::string threadName; - bool isJoined; - bool isInterrupted; + util::AtomicBoolean isJoined; + util::AtomicBoolean isInterrupted; HANDLE thread; DWORD id; ConditionVariable condition; @@ -114,19 +118,22 @@ namespace hazelcast { void interruptibleSleep(int seconds); - void interrupt(); + void wakeup(); - bool join(); + void cancel(); + bool join(); private: static void *controlledThread(void *args); void init(void (func)(ThreadArgs &), void *arg0, void *arg1, void *arg2, void *arg3 ); std::string threadName; - bool isJoined; + util::AtomicBoolean isJoined; pthread_t thread; pthread_attr_t attr; + ConditionVariable wakeupCondition; + Mutex wakeupMutex; }; } } diff --git a/hazelcast/src/hazelcast/client/Socket.cpp b/hazelcast/src/hazelcast/client/Socket.cpp index cce697cc89..23719fc354 100644 --- a/hazelcast/src/hazelcast/client/Socket.cpp +++ b/hazelcast/src/hazelcast/client/Socket.cpp @@ -126,7 +126,14 @@ namespace hazelcast { int Socket::send(const void *buffer, int len) const { errno = 0; int bytesSend = 0; - if ((bytesSend = ::send(socketId, (char *)buffer, (size_t)len, 0)) == -1) { + /** + * In linux, sometimes SIGBUS may be received during this call when the server closes the connection. + * The returned error code is still error when this flag is set. Hence, it is safe to use. + * MSG_NOSIGNAL (since Linux 2.2) + * Requests not to send SIGPIPE on errors on stream oriented sockets when the other end breaks the connection. + * The EPIPE error is still returned. + */ + if ((bytesSend = ::send(socketId, (char *)buffer, (size_t)len, MSG_NOSIGNAL)) == -1) { if (errno == EAGAIN) { return 0; } diff --git a/hazelcast/src/hazelcast/client/connection/CallPromise.cpp b/hazelcast/src/hazelcast/client/connection/CallPromise.cpp index 3a8d833a25..b24959858b 100644 --- a/hazelcast/src/hazelcast/client/connection/CallPromise.cpp +++ b/hazelcast/src/hazelcast/client/connection/CallPromise.cpp @@ -16,7 +16,6 @@ // // Created by sancar koyunlu on 03/01/14. // - #include "hazelcast/client/spi/InvocationService.h" #include "hazelcast/client/impl/BaseEventHandler.h" #include "hazelcast/client/Address.h" diff --git a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp index 15319b58d9..042fc86b8b 100644 --- a/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp +++ b/hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp @@ -49,11 +49,15 @@ namespace hazelcast { } void ClusterListenerThread::run(util::Thread *currentThread) { + Address previousConnectionAddr; + Address *previousConnectionAddrPtr = NULL; while (clientContext.getLifecycleService().isRunning()) { try { if (conn.get() == NULL) { try { - conn = clientContext.getClusterService().connectToOne(); + conn = clientContext.getClusterService().connectToOne(previousConnectionAddrPtr); + previousConnectionAddr = conn->getRemoteEndpoint(); + previousConnectionAddrPtr = &previousConnectionAddr; } catch (std::exception &e) { util::ILogger::getLogger().severe( std::string("Error while connecting to cluster! =>") + e.what()); @@ -86,9 +90,7 @@ namespace hazelcast { } currentThread->interruptibleSleep(1); } - } - } void ClusterListenerThread::stop() { @@ -97,18 +99,18 @@ namespace hazelcast { conn.reset(); deletingConnection = false; } - clusterListenerThread->interrupt(); + clusterListenerThread->cancel(); clusterListenerThread->join(); } - std::vector
ClusterListenerThread::getSocketAddresses() { - std::vector
addresses; + std::set ClusterListenerThread::getSocketAddresses() { + std::set addresses; if (!members.empty()) { std::vector
clusterAddresses = getClusterAddresses(); - addresses.insert(addresses.begin(), clusterAddresses.begin(), clusterAddresses.end()); + addresses.insert(clusterAddresses.begin(), clusterAddresses.end()); } std::vector
configAddresses = getConfigAddresses(); - addresses.insert(addresses.end(), configAddresses.begin(), configAddresses.end()); + addresses.insert(configAddresses.begin(), configAddresses.end()); return addresses; } @@ -116,6 +118,8 @@ namespace hazelcast { std::auto_ptr request = protocol::codec::ClientAddMembershipListenerCodec::RequestParameters::encode(false); + request->setCorrelationId(clientContext.getConnectionManager().getNextCallId()); + conn->writeBlocking(*request); std::auto_ptr response; @@ -191,7 +195,7 @@ namespace hazelcast { util::snprintf(buf, 50, "Unknown event type :%d", eventType); util::ILogger::getLogger().warning(buf); } - clientContext.getPartitionService().refreshPartitions(); + clientContext.getPartitionService().wakeup(); } void ClusterListenerThread::handleMemberList(const std::vector &initialMembers) { diff --git a/hazelcast/src/hazelcast/client/connection/Connection.cpp b/hazelcast/src/hazelcast/client/connection/Connection.cpp index a3377f9a33..8a6cff922e 100644 --- a/hazelcast/src/hazelcast/client/connection/Connection.cpp +++ b/hazelcast/src/hazelcast/client/connection/Connection.cpp @@ -50,7 +50,8 @@ namespace hazelcast { , _isOwnerConnection(isOwner) , receiveBuffer(new byte[16 << 10]) , receiveByteBuffer((char *)receiveBuffer, 16 << 10) - , messageBuilder(this, *this) { + , messageBuilder(*this, *this) + , connectionId(-1) { wrapperMessage.wrapForDecode(receiveBuffer, (int32_t)16 << 10, false); assert(receiveByteBuffer.remaining() >= protocol::ClientMessage::HEADER_SIZE); // Note: Always make sure that the size >= ClientMessage header size. } @@ -64,11 +65,6 @@ namespace hazelcast { int error = socket.connect(timeoutInMillis); if (error) { throw exception::IOException("Socket::connect", strerror(error)); - } else { - std::stringstream message; - message << "Connected to " << socket.getAddress() << " with socket id " << socket.getSocketId() << - (_isOwnerConnection ? " as the owner connection." : "."); - util::ILogger::getLogger().info(message.str()); } } @@ -82,8 +78,11 @@ namespace hazelcast { return; } + const Address &serverAddr = getRemoteEndpoint(); + int socketId = socket.getSocketId(); + std::stringstream message; - message << "Closing connection to " << getRemoteEndpoint() << " with socket id " << socket.getSocketId() << + message << "Closing connection (id:" << connectionId << ") to " << serverAddr << " with socket id " << socketId << (_isOwnerConnection ? " as the owner connection." : "."); util::ILogger::getLogger().warning(message.str()); if (!_isOwnerConnection) { @@ -94,7 +93,8 @@ namespace hazelcast { return; } - clientContext.getConnectionManager().onConnectionClose(socket.getRemoteEndpoint()); + clientContext.getConnectionManager().onConnectionClose(serverAddr, socketId); + clientContext.getInvocationService().cleanResources(*this); } @@ -200,6 +200,14 @@ namespace hazelcast { responseMessage = message; } + int Connection::getConnectionId() const { + return connectionId; + } + + void Connection::setConnectionId(int connectionId) { + Connection::connectionId = connectionId; + } + bool Connection::isOwnerConnection() const { return _isOwnerConnection; } diff --git a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp index 4468752210..c82ee97e4c 100644 --- a/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp +++ b/hazelcast/src/hazelcast/client/connection/ConnectionManager.cpp @@ -46,7 +46,7 @@ namespace hazelcast { : clientContext(clientContext), inSelector(*this), outSelector(*this), inSelectorThread(NULL), outSelectorThread(NULL), live(true), heartBeater(clientContext), heartBeatThread(NULL), smartRouting(smartRouting), ownerConnectionFuture(clientContext), - callIdGenerator(0) { + callIdGenerator(0), connectionIdCounter(0) { const byte protocol_bytes[3] = {'C', 'B', '2'}; PROTOCOL.insert(PROTOCOL.begin(), &protocol_bytes[0], &protocol_bytes[3]); } @@ -69,19 +69,19 @@ namespace hazelcast { live = false; heartBeater.shutdown(); if (heartBeatThread.get() != NULL) { - heartBeatThread->interrupt(); + heartBeatThread->cancel(); heartBeatThread->join(); heartBeatThread.reset(); } inSelector.shutdown(); outSelector.shutdown(); if (inSelectorThread.get() != NULL) { - inSelectorThread->interrupt(); + inSelectorThread->cancel(); inSelectorThread->join(); inSelectorThread.reset(); } if (outSelectorThread.get() != NULL) { - outSelectorThread->interrupt(); + outSelectorThread->cancel(); outSelectorThread->join(); outSelectorThread.reset(); } @@ -103,31 +103,68 @@ namespace hazelcast { return getOrConnect(address, tryCount); } + boost::shared_ptr ConnectionManager::getRandomConnection(int tryCount, + const std::string &lastTriedAddress, + int retryWaitTime) { + if (!smartRouting) { + boost::shared_ptr conn = getOwnerConnection(); + // Check if the retrieved connection is the same as the last one, if so we need to close it so that + // a connection to a new member is established. + if ((Connection *)NULL != conn.get() && + lastTriedAddress == util::IOUtil::to_string(conn->getRemoteEndpoint())) { + // close the connection + conn->close(); + + // get the connection again + conn = getOwnerConnection(); + } + return conn; + } + + Address address = clientContext.getClientConfig().getLoadBalancer()->next().getAddress(); + std::string newAddr = util::IOUtil::to_string(address); + if (newAddr == lastTriedAddress) { + address = clientContext.getClientConfig().getLoadBalancer()->next().getAddress(); + } + newAddr = util::IOUtil::to_string(address); + if (newAddr == lastTriedAddress) { + util::sleep(retryWaitTime); + } + return getOrConnect(address, tryCount); + } + boost::shared_ptr ConnectionManager::getOrConnect(const Address &target, int tryCount) { checkLive(); - std::auto_ptr lastError; - try { if (clientContext.getClusterService().isMemberExists(target)) { boost::shared_ptr connection = getOrConnect(target); - return connection; + // Only return the live connections + if (connection->live) { + return connection; + } + } + } catch (exception::IException &e) { + if (tryCount <= 0) { + throw e; } - } catch (exception::IOException &e) { - lastError = std::auto_ptr(new exception::IOException(e)); } int count = 0; - while (count < tryCount) { + while (true) { try { - return getRandomConnection(); - } catch (exception::IOException &e) { - lastError = std::auto_ptr(new exception::IOException(e)); + boost::shared_ptr conn = getRandomConnection(); + if (conn.get() != (Connection *) NULL && conn->live) { + return conn; + } + } catch (exception::IException &e) { + ++count; + if (count >= tryCount) { + throw e; + } } - count++; } - throw *lastError; } boost::shared_ptr ConnectionManager::getConnectionIfAvailable(const Address &address) { @@ -146,10 +183,14 @@ namespace hazelcast { checkLive(); if (smartRouting) { return getOrConnectResolved(address); - } else { - boost::shared_ptr ownerConnPtr = ownerConnectionFuture.getOrWaitForCreation(); - return getOrConnectResolved(ownerConnPtr->getRemoteEndpoint()); } + + return getOwnerConnection(); + } + + boost::shared_ptr ConnectionManager::getOwnerConnection() { + boost::shared_ptr ownerConnPtr = ownerConnectionFuture.getOrWaitForCreation(); + return getOrConnectResolved(ownerConnPtr->getRemoteEndpoint()); } @@ -283,13 +324,10 @@ namespace hazelcast { } } - void ConnectionManager::onConnectionClose(const Address &address) { - boost::shared_ptr conn = getConnectionIfAvailable(address); - if (NULL != conn) { - socketConnections.remove(conn->getSocket().getSocketId()); - connections.remove(address); - ownerConnectionFuture.closeIfAddressMatches(address); - } + void ConnectionManager::onConnectionClose(const Address &address, int socketId) { + socketConnections.remove(socketId); + connections.remove(address); + ownerConnectionFuture.closeIfAddressMatches(address); } void ConnectionManager::checkLive() { @@ -347,8 +385,11 @@ namespace hazelcast { std::auto_ptr uuid, std::auto_ptr ownerUuid) { connection->setRemoteEndpoint(*addr); + connection->setConnectionId(++connectionIdCounter); + std::stringstream message; - (message << "client authenticated by " << *addr); + (message << "Connected and authenticated by " << *addr << ". Connection id:" << connection->getConnectionId() + << " , socket id:" << connection->getSocket().getSocketId() << (connection->isOwnerConnection() ? " as owner connection." : ".")); util::ILogger::getLogger().info(message.str()); if (connection->isOwnerConnection()) { principal = std::auto_ptr(new protocol::Principal(uuid, ownerUuid)); diff --git a/hazelcast/src/hazelcast/client/connection/IOHandler.cpp b/hazelcast/src/hazelcast/client/connection/IOHandler.cpp index a9b08764a5..41041cd392 100644 --- a/hazelcast/src/hazelcast/client/connection/IOHandler.cpp +++ b/hazelcast/src/hazelcast/client/connection/IOHandler.cpp @@ -46,6 +46,7 @@ namespace hazelcast { } void IOHandler::deRegisterSocket() { + ioSelector.cancelTask(this); ioSelector.removeSocket(connection.getSocket()); } diff --git a/hazelcast/src/hazelcast/client/connection/IOSelector.cpp b/hazelcast/src/hazelcast/client/connection/IOSelector.cpp index 45c223c1c1..c2e8a05179 100644 --- a/hazelcast/src/hazelcast/client/connection/IOSelector.cpp +++ b/hazelcast/src/hazelcast/client/connection/IOSelector.cpp @@ -103,6 +103,10 @@ namespace hazelcast { listenerTasks.offer(listenerTask); } + void IOSelector::cancelTask(ListenerTask *listenerTask) { + listenerTasks.removeAll(listenerTask); + } + void IOSelector::addSocket(const Socket &socket) { socketSet.insertSocket(&socket); } diff --git a/hazelcast/src/hazelcast/client/connection/ReadHandler.cpp b/hazelcast/src/hazelcast/client/connection/ReadHandler.cpp index eebf2550b3..a331aa4480 100644 --- a/hazelcast/src/hazelcast/client/connection/ReadHandler.cpp +++ b/hazelcast/src/hazelcast/client/connection/ReadHandler.cpp @@ -36,7 +36,7 @@ namespace hazelcast { : IOHandler(connection, iListener) , buffer(new char[bufferSize]) , byteBuffer(buffer, bufferSize) - , builder(&clientContext.getInvocationService(), connection) { + , builder(clientContext.getInvocationService(), connection) { connection.lastRead = (int)time(NULL); } @@ -49,9 +49,6 @@ namespace hazelcast { } void ReadHandler::handle() { - if (!connection.live) { - return; - } connection.lastRead = (int)time(NULL); try { byteBuffer.readFrom(connection.getSocket()); diff --git a/hazelcast/src/hazelcast/client/connection/WriteHandler.cpp b/hazelcast/src/hazelcast/client/connection/WriteHandler.cpp index 2c8872c44a..bcfdb54bb3 100644 --- a/hazelcast/src/hazelcast/client/connection/WriteHandler.cpp +++ b/hazelcast/src/hazelcast/client/connection/WriteHandler.cpp @@ -38,13 +38,15 @@ namespace hazelcast { } void WriteHandler::run() { - informSelector = true; - if (ready) { - handle(); - } else { - registerHandler(); + if (this->connection.live) { + informSelector = true; + if (ready) { + handle(); + } else { + registerHandler(); + } + ready = false; } - ready = false; } // TODO: Add a fragmentation layer here before putting the message into the write queue @@ -57,10 +59,6 @@ namespace hazelcast { } void WriteHandler::handle() { - if (!connection.live) { - return; - } - if (lastMessage == NULL) { lastMessage = writeQueue.poll(); if (lastMessage == NULL) { diff --git a/hazelcast/src/hazelcast/client/exception/pimpl/ExceptionHandler.cpp b/hazelcast/src/hazelcast/client/exception/pimpl/ExceptionHandler.cpp index f367cb77aa..0482c41a8b 100644 --- a/hazelcast/src/hazelcast/client/exception/pimpl/ExceptionHandler.cpp +++ b/hazelcast/src/hazelcast/client/exception/pimpl/ExceptionHandler.cpp @@ -16,7 +16,7 @@ // // Created by sancar koyunlu on 07/04/14. // - +#include "hazelcast/client/exception/IllegalStateException.h" #include "hazelcast/client/exception/InterruptedException.h" #include "hazelcast/client/exception/InstanceNotActiveException.h" #include "hazelcast/client/exception/pimpl/ExceptionHandler.h" @@ -27,6 +27,7 @@ namespace hazelcast { namespace pimpl{ std::string ExceptionHandler::INTERRUPTED = "InterruptedException"; std::string ExceptionHandler::INSTANCE_NOT_ACTIVE = "HazelcastInstanceNotActiveException"; + std::string ExceptionHandler::ILLEGAL_STATE = "IllegalStateException"; void ExceptionHandler::rethrow(const std::string &exceptionName, const std::string &message) { if (INTERRUPTED == exceptionName) { @@ -35,6 +36,9 @@ namespace hazelcast { } else if (INSTANCE_NOT_ACTIVE == exceptionName) { exception::InstanceNotActiveException exception(message); throw exception; + } else if (ILLEGAL_STATE == exceptionName) { + exception::IllegalStateException exception("Client:", message); + throw exception; } else { exception::IException exception("Server:", message); throw exception; diff --git a/hazelcast/src/hazelcast/client/protocol/ClientMessageBuilder.cpp b/hazelcast/src/hazelcast/client/protocol/ClientMessageBuilder.cpp index e2b34f059e..d28d12307f 100644 --- a/hazelcast/src/hazelcast/client/protocol/ClientMessageBuilder.cpp +++ b/hazelcast/src/hazelcast/client/protocol/ClientMessageBuilder.cpp @@ -28,7 +28,7 @@ namespace hazelcast { namespace client { namespace protocol { - ClientMessageBuilder::ClientMessageBuilder(IMessageHandler *service, connection::Connection &connection) + ClientMessageBuilder::ClientMessageBuilder(IMessageHandler &service, connection::Connection &connection) : messageHandler(service), connection(connection) { } @@ -56,9 +56,7 @@ namespace hazelcast { if (offset == frameLen) { if (message->isFlagSet(ClientMessage::BEGIN_AND_END_FLAGS)) { //MESSAGE IS COMPLETE HERE - if (messageHandler) { - messageHandler->handleMessage(connection, message); - } + messageHandler.handleMessage(connection, message); isCompleted = true; } else { if (message->isFlagSet(ClientMessage::BEGIN_FLAG)) { @@ -93,9 +91,7 @@ namespace hazelcast { partialMessages.erase(foundItemIter, foundItemIter); - if (messageHandler) { - messageHandler->handleMessage(connection, foundMessage); - } + messageHandler.handleMessage(connection, foundMessage); result = true; } diff --git a/hazelcast/src/hazelcast/client/proxy/IQueueImpl.cpp b/hazelcast/src/hazelcast/client/proxy/IQueueImpl.cpp index 3ff45c6f66..a30a5418d4 100644 --- a/hazelcast/src/hazelcast/client/proxy/IQueueImpl.cpp +++ b/hazelcast/src/hazelcast/client/proxy/IQueueImpl.cpp @@ -105,7 +105,7 @@ namespace hazelcast { return invokeAndGetResult(request, partitionId); } - std::vector IQueueImpl::drainTo(int maxElements) { + std::vector IQueueImpl::drainTo(size_t maxElements) { std::auto_ptr request = protocol::codec::QueueDrainToMaxSizeCodec::RequestParameters::encode(getName(), maxElements); diff --git a/hazelcast/src/hazelcast/client/spi/ClusterService.cpp b/hazelcast/src/hazelcast/client/spi/ClusterService.cpp index dd801a0fa7..b5028542ed 100644 --- a/hazelcast/src/hazelcast/client/spi/ClusterService.cpp +++ b/hazelcast/src/hazelcast/client/spi/ClusterService.cpp @@ -149,7 +149,7 @@ namespace hazelcast { } //--------- Used by CLUSTER LISTENER THREAD ------------ - boost::shared_ptr ClusterService::connectToOne() { + boost::shared_ptr ClusterService::connectToOne(const Address *previousConnectionAddr) { active = false; const int connectionAttemptLimit = clientContext.getClientConfig().getConnectionAttemptLimit(); int attempt = 0; @@ -162,9 +162,13 @@ namespace hazelcast { } time_t tryStartTime = std::time(NULL); - std::vector
::const_iterator it; - std::vector
socketAddresses = clusterThread.getSocketAddresses(); - for (it = socketAddresses.begin(); it != socketAddresses.end(); it++) { + std::set socketAddresses = clusterThread.getSocketAddresses(); + if ((Address *)NULL != previousConnectionAddr && !socketAddresses.empty()) { + socketAddresses.erase(*previousConnectionAddr); + socketAddresses.insert(*previousConnectionAddr); + } + for (std::set::const_iterator it = socketAddresses.begin(); + it != socketAddresses.end(); it++) { try { boost::shared_ptr pConnection = clientContext.getConnectionManager().createOwnerConnection( *it); @@ -174,10 +178,11 @@ namespace hazelcast { } catch (exception::IException &e) { lastError = e; std::ostringstream errorStream; - errorStream << "IO error during initial connection =>" << e.what(); + errorStream << "IO error during initial connection to " << (*it) << " for owner connection =>" << e.what(); util::ILogger::getLogger().warning(errorStream.str()); } } + if (attempt++ >= connectionAttemptLimit) { break; } diff --git a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp index 03a070b3e7..b137337ef7 100644 --- a/hazelcast/src/hazelcast/client/spi/InvocationService.cpp +++ b/hazelcast/src/hazelcast/client/spi/InvocationService.cpp @@ -30,10 +30,10 @@ #include "hazelcast/client/spi/ClientContext.h" #include "hazelcast/client/connection/CallFuture.h" #include "hazelcast/client/ClientProperties.h" - #include "hazelcast/client/connection/Connection.h" #include "hazelcast/client/spi/ServerListenerService.h" #include "hazelcast/client/serialization/pimpl/SerializationService.h" +#include "hazelcast/client/exception/IllegalStateException.h" #include @@ -41,7 +41,7 @@ namespace hazelcast { namespace client { namespace spi { InvocationService::InvocationService(spi::ClientContext &clientContext) - : clientContext(clientContext) { + : clientContext(clientContext), isOpen(false) { redoOperation = clientContext.getClientConfig().isRedoOperation(); ClientProperties &properties = clientContext.getClientProperties(); retryWaitTime = properties.getRetryWaitTime().getInteger(); @@ -63,8 +63,12 @@ namespace hazelcast { } - void InvocationService::start() { + bool InvocationService::start() { + return isOpen.compareAndSet(false, true); + } + void InvocationService::shutdown() { + isOpen.compareAndSet(true, false); } connection::CallFuture InvocationService::invokeOnRandomTarget( @@ -84,30 +88,30 @@ namespace hazelcast { connection::CallFuture InvocationService::invokeOnRandomTarget( std::auto_ptr request, - hazelcast::client::impl::BaseEventHandler *eventHandler) { - std::auto_ptr managedEventHandler(eventHandler); + client::impl::BaseEventHandler *eventHandler) { + std::auto_ptr managedEventHandler(eventHandler); boost::shared_ptr connection = clientContext.getConnectionManager().getRandomConnection( retryCount); return doSend(request, managedEventHandler, connection, -1); } connection::CallFuture InvocationService::invokeOnTarget(std::auto_ptr request, - hazelcast::client::impl::BaseEventHandler *eventHandler, + client::impl::BaseEventHandler *eventHandler, const Address &address) { - std::auto_ptr managedEventHandler(eventHandler); + std::auto_ptr managedEventHandler(eventHandler); boost::shared_ptr connection = clientContext.getConnectionManager().getOrConnect( address, retryCount); return doSend(request, managedEventHandler, connection, -1); } connection::CallFuture InvocationService::invokeOnPartitionOwner( - std::auto_ptr request, hazelcast::client::impl::BaseEventHandler *handler, + std::auto_ptr request, client::impl::BaseEventHandler *handler, int partitionId) { boost::shared_ptr
owner = clientContext.getPartitionService().getPartitionOwner(partitionId); if (owner.get() != NULL) { boost::shared_ptr connection = clientContext.getConnectionManager().getOrConnect( *owner, retryCount); - std::auto_ptr managedEventHandler(handler); + std::auto_ptr managedEventHandler(handler); return doSend(request, managedEventHandler, connection, partitionId); } return invokeOnRandomTarget(request, handler); @@ -116,7 +120,7 @@ namespace hazelcast { connection::CallFuture InvocationService::invokeOnConnection( std::auto_ptr request, boost::shared_ptr connection) { - return doSend(request, std::auto_ptr(NULL), connection, -1); + return doSend(request, std::auto_ptr(NULL), connection, -1); } bool InvocationService::isRedoOperation() const { @@ -144,7 +148,7 @@ namespace hazelcast { connection::CallFuture InvocationService::doSend(std::auto_ptr request, - std::auto_ptr eventHandler, + std::auto_ptr eventHandler, boost::shared_ptr connection, int partitionId) { request->setPartitionId(partitionId); @@ -152,7 +156,7 @@ namespace hazelcast { promise->setRequest(request); promise->setEventHandler(eventHandler); - boost::shared_ptr conn = registerAndEnqueue(connection, promise, partitionId); + boost::shared_ptr conn = registerAndEnqueue(connection, promise); return connection::CallFuture(promise, conn, heartbeatTimeout, this); } @@ -187,8 +191,6 @@ namespace hazelcast { boost::shared_ptr InvocationService::resend( boost::shared_ptr promise, const std::string &lastTriedAddress) { - util::sleep(getRetryWaitTime()); - if (promise->getRequest()->isBindToSingleConnection()) { promise->setException(exception::pimpl::ExceptionHandler::INSTANCE_NOT_ACTIVE, lastTriedAddress); return boost::shared_ptr(); @@ -201,33 +203,44 @@ namespace hazelcast { boost::shared_ptr connection; try { connection::ConnectionManager &cm = clientContext.getConnectionManager(); - connection = cm.getRandomConnection(getRetryCount()); - } catch (exception::IOException &) { + connection = cm.getRandomConnection(getRetryCount(), lastTriedAddress, getRetryWaitTime()); + } catch (exception::IException &) { promise->setException(exception::pimpl::ExceptionHandler::INSTANCE_NOT_ACTIVE, lastTriedAddress); return boost::shared_ptr(); } - util::ILogger::getLogger().info("[InvocationService::resend] Re-sending the request with id " + - util::IOUtil::to_string( - promise->getRequest()->getCorrelationId()) + - " to connection " + - util::IOUtil::to_string
(connection->getRemoteEndpoint())); - promise->resetFuture(); - return registerAndEnqueue(connection, promise, -1); + int64_t correlationId = promise->getRequest()->getCorrelationId(); + + boost::shared_ptr actualConn = registerAndEnqueue(connection, promise); + + char msg[300]; + const Address &serverAddr = connection->getRemoteEndpoint(); + hazelcast::util::snprintf(msg, 300, "[InvocationService::resend] Re-sending the request with id %lld " + "originally destined for %s to server %s:%d using the new correlation id %lld", correlationId, + lastTriedAddress.c_str(), serverAddr.getHost().c_str(), serverAddr.getPort(), promise->getRequest()->getCorrelationId()); + util::ILogger::getLogger().info(msg); + + return actualConn; } boost::shared_ptr InvocationService::registerAndEnqueue( - boost::shared_ptr connection, - boost::shared_ptr promise, int partitionId) { + boost::shared_ptr &connection, + boost::shared_ptr promise) { + + if (!isOpen) { + promise->setException(exception::pimpl::ExceptionHandler::ILLEGAL_STATE, + "Invocation service is not open. Can not process the request."); + return boost::shared_ptr(); + } registerCall(*connection, promise); //Don't change the order with following line protocol::ClientMessage *request = promise->getRequest(); if (!isAllowedToSentRequest(*connection, *request)) { - deRegisterCall(*connection, request->getCorrelationId()); + deRegisterCall(connection->getConnectionId(), request->getCorrelationId()); std::string address = util::IOUtil::to_string(connection->getRemoteEndpoint()); return resend(promise, address); } @@ -240,10 +253,10 @@ namespace hazelcast { boost::shared_ptr promise) { int64_t callId = clientContext.getConnectionManager().getNextCallId(); promise->getRequest()->setCorrelationId(callId); - if (getCallPromiseMap(connection)->put(callId, promise).get()) { + if (getCallPromiseMap(connection.getConnectionId())->put(callId, promise).get()) { std::ostringstream out; out << "[InvocationService::registerCall] The call id map already contains the promise for call " - "id:" << callId << ". This is unexpected!!!"; + "id:" << callId << ". This is unexpected!!!"; hazelcast::util::ILogger::getLogger().severe(out.str()); assert(0); // just fail in debug mode } @@ -253,8 +266,8 @@ namespace hazelcast { } boost::shared_ptr InvocationService::deRegisterCall( - connection::Connection &connection, int64_t callId) { - return getCallPromiseMap(connection)->remove(callId); + int connectionId, int64_t callId) { + return getCallPromiseMap(connectionId)->remove(callId); } void InvocationService::registerEventHandler(int64_t correlationId, connection::Connection &connection, @@ -274,24 +287,24 @@ namespace hazelcast { return; } - boost::shared_ptr promise = deRegisterCall(connection, correlationId); + int connId = connection.getConnectionId(); + const Address &serverAddr = connection.getRemoteEndpoint(); + boost::shared_ptr promise = deRegisterCall(connId, correlationId); if (NULL == promise.get()) { if (connection.live) { std::ostringstream out; out << "[InvocationService::handleMessage] Could not find the promise for correlation id:" << - correlationId << ". This is unexpected!!! "; - std::vector entries = getCallPromiseMap(connection)->keys(); - out << "There are " << entries.size() << " entries in the call promise map. Entries are:" << std::endl; - for (std::vector::const_iterator it = entries.begin(); it != entries.end(); ++it) { - out << *it << std::endl; - } - hazelcast::util::ILogger::getLogger().severe(out.str()); - assert(0); // fail in debug mode - return; + correlationId << ". It may have been re-sent."; + hazelcast::util::ILogger::getLogger().finest(out.str()); } + + /** Do not proceed if no promise exist for the message, just drop it since it is most probably re-sent + * on another connection. + **/ + return; } - if (!handleException(message.get(), promise, connection.getRemoteEndpoint())) + if (!handleException(message.get(), promise, serverAddr)) return;//if response is exception,then return if (!handleEventUuid(message.get(), promise)) @@ -309,6 +322,11 @@ namespace hazelcast { if (error.className == "com.hazelcast.core.HazelcastInstanceNotActiveException") { std::string addrString = util::IOUtil::to_string(address); + char msg[300]; + util::snprintf(msg, 300, "[InvocationService::handleException] HazelcastInstanceNotActiveException " + "received. Shall retry the request. Response call id: %lld, request call id: %lld, server address:%s.", + response->getCorrelationId(), promise->getRequest()->getCorrelationId(), addrString.c_str()); + util::ILogger::getLogger().finest(msg); tryResend(promise, addrString); } else { promise->setException(error.className, error.toString()); @@ -322,7 +340,7 @@ namespace hazelcast { /* returns shouldSetResponse */ bool InvocationService::handleEventUuid(protocol::ClientMessage *response, boost::shared_ptr promise) { - hazelcast::client::impl::BaseEventHandler *eventHandler = promise->getEventHandler(); + client::impl::BaseEventHandler *eventHandler = promise->getEventHandler(); if (eventHandler != NULL) { if (eventHandler->registrationId.size() == 0) //if uuid is not set, it means it is first time that we are getting uuid. @@ -357,35 +375,54 @@ namespace hazelcast { return getEventHandlerPromiseMap(connection)->remove(callId); } - void InvocationService::cleanResources(connection::Connection& connection) { - std::vector > promises = getCallPromiseMap(connection)->values(); + void InvocationService::cleanResources(connection::Connection &connection) { + std::vector > > promises = getCallPromiseMap( + connection.getConnectionId())->clear(); + std::string address = util::IOUtil::to_string(connection.getRemoteEndpoint()); - for (std::vector >::iterator it = promises.begin(); + char msg[200]; + util::snprintf(msg, 200, "[cleanResources] There are %u waiting promises on connection with id:%d (%s) ", promises.size(), connection.getConnectionId(), address.c_str()); + util::ILogger::getLogger().info(msg); + + for (std::vector > >::iterator it = promises.begin(); it != promises.end(); ++it) { - tryResend(*it, address); + if (!isOpen) { + it->second->setException(exception::pimpl::ExceptionHandler::ILLEGAL_STATE, "Invocation service is not open."); + } else { + tryResend(it->second, address); + } } cleanEventHandlers(connection); } void InvocationService::cleanEventHandlers(connection::Connection &connection) { - std::vector > promises = getEventHandlerPromiseMap(connection)->values(); + std::vector > > promises = getEventHandlerPromiseMap( + connection)->clear(); + + char msg[200]; + util::snprintf(msg, 200, "[cleanEventHandlers] There are %ld event handler promises on connection with id:%d which shall be retried", + promises.size(), connection.getConnectionId()); + util::ILogger::getLogger().info(msg); + - for (std::vector >::iterator it = promises.begin(); + for (std::vector > >::const_iterator it = promises.begin(); it != promises.end(); ++it) { - clientContext.getServerListenerService().retryFailedListener(*it); + if (isOpen) { + clientContext.getServerListenerService().retryFailedListener(it->second); + } } } boost::shared_ptr > InvocationService::getCallPromiseMap( - connection::Connection &connection) { - return callPromises.getOrPutIfAbsent(&connection); + int connectionId) { + return callPromises.getOrPutIfAbsent(connectionId); } boost::shared_ptr > InvocationService::getEventHandlerPromiseMap( connection::Connection &connection) { - return eventHandlerPromises.getOrPutIfAbsent(&connection); + return eventHandlerPromises.getOrPutIfAbsent(connection.getConnectionId()); } } } diff --git a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp index 3d73790549..48cbd0528c 100644 --- a/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp +++ b/hazelcast/src/hazelcast/client/spi/LifecycleService.cpp @@ -50,7 +50,10 @@ namespace hazelcast { if (!clientContext.getClusterService().start()) { return false; } - clientContext.getInvocationService().start(); + + if (!clientContext.getInvocationService().start()) { + return false; + } if (!clientContext.getPartitionService().start()) { return false; @@ -64,9 +67,10 @@ namespace hazelcast { if (!active.compareAndSet(true, false)) return; fireLifecycleEvent(LifecycleEvent::SHUTTING_DOWN); - clientContext.getConnectionManager().shutdown(); - clientContext.getClusterService().shutdown(); + clientContext.getInvocationService().shutdown(); clientContext.getPartitionService().shutdown(); + clientContext.getClusterService().shutdown(); + clientContext.getConnectionManager().shutdown(); fireLifecycleEvent(LifecycleEvent::SHUTDOWN); } diff --git a/hazelcast/src/hazelcast/client/spi/PartitionService.cpp b/hazelcast/src/hazelcast/client/spi/PartitionService.cpp index e37b4e0088..f2e1a4bfba 100644 --- a/hazelcast/src/hazelcast/client/spi/PartitionService.cpp +++ b/hazelcast/src/hazelcast/client/spi/PartitionService.cpp @@ -54,7 +54,7 @@ namespace hazelcast { void PartitionService::shutdown() { util::LockGuard lg(lock); if (partitionListenerThread.get() != NULL) { - partitionListenerThread->interrupt(); + partitionListenerThread->cancel(); partitionListenerThread->join(); } } @@ -202,6 +202,12 @@ namespace hazelcast { updating = false; } } + + void PartitionService::wakeup() { + if (NULL != partitionListenerThread.get()) { + partitionListenerThread->wakeup(); + } + } } } } diff --git a/hazelcast/src/hazelcast/util/ByteBuffer.cpp b/hazelcast/src/hazelcast/util/ByteBuffer.cpp index 2d16b42609..db77643443 100644 --- a/hazelcast/src/hazelcast/util/ByteBuffer.cpp +++ b/hazelcast/src/hazelcast/util/ByteBuffer.cpp @@ -134,7 +134,7 @@ namespace hazelcast { } size_t ByteBuffer::readBytes(byte *target, size_t len) { - size_t numBytesToCopy = std::min(capacity - pos, len); + size_t numBytesToCopy = std::min(lim - pos, len); memcpy(target, ix(), numBytesToCopy); pos += len; return numBytesToCopy; diff --git a/hazelcast/src/hazelcast/util/SocketSet.cpp b/hazelcast/src/hazelcast/util/SocketSet.cpp index c39113d108..a9d041f336 100644 --- a/hazelcast/src/hazelcast/util/SocketSet.cpp +++ b/hazelcast/src/hazelcast/util/SocketSet.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "hazelcast/util/SocketSet.h" #include "hazelcast/util/ILogger.h" @@ -30,30 +31,46 @@ namespace hazelcast { void SocketSet::insertSocket(client::Socket const *socket) { assert(NULL != socket); - LockGuard lockGuard(accessLock); - sockets.insert(socket); + int socketId = socket->getSocketId(); + assert(socketId >= 0); + + if (socketId >= 0) { + LockGuard lockGuard(accessLock); + sockets.insert(socketId); + } else { + char msg[200]; + util::snprintf(msg, 200, "[SocketSet::insertSocket] Socket id:%d, Should be 0 or greater than 0.", + socketId); + util::ILogger::getLogger().warning(msg); + } } void SocketSet::removeSocket(client::Socket const *socket) { assert(NULL != socket); - int searchedSocketId = socket->getSocketId(); + int socketId = socket->getSocketId(); + assert(socketId >= 0); + bool found = false; - LockGuard lockGuard(accessLock); + if (socketId >= 0) { + LockGuard lockGuard(accessLock); - for (SocketContainer::const_iterator it = sockets.begin(); it != sockets.end(); it++) { - if (searchedSocketId == (*it)->getSocketId()) { // found - sockets.erase(it); - found = true; - break; + for (std::set::const_iterator it = sockets.begin(); it != sockets.end(); it++) { + if (socketId == *it) { // found + sockets.erase(it); + found = true; + break; + } } } + if (!found) { - ILogger &logger = util::ILogger::getLogger(); - std::ostringstream out; - out << "[SocketSet::removeSocket] Trying to remove an already removed socket with id " << searchedSocketId << "."; - logger.warning(out.str()); + char msg[200]; + util::snprintf(msg, 200, + "[SocketSet::removeSocket] Socket with id %d was not found among the sockets.", + socketId); + util::ILogger::getLogger().finest(msg); } } @@ -66,13 +83,12 @@ namespace hazelcast { LockGuard lockGuard(accessLock); if (!sockets.empty()) { - for (SocketContainer::const_iterator it = sockets.begin(); it != sockets.end(); it++) { - int socketId = (*it)->getSocketId(); - FD_SET(socketId, &resultSet); + for (std::set::const_iterator it = sockets.begin(); it != sockets.end(); it++) { + FD_SET(*it, &resultSet); } - result.max = (*(sockets.begin()))->getSocketId(); - result.min = (*(sockets.rbegin()))->getSocketId(); + result.max = *sockets.rbegin(); + result.min = *sockets.begin(); } return result; diff --git a/hazelcast/src/hazelcast/util/Thread.cpp b/hazelcast/src/hazelcast/util/Thread.cpp index 392dfe7575..c0d9241d4b 100644 --- a/hazelcast/src/hazelcast/util/Thread.cpp +++ b/hazelcast/src/hazelcast/util/Thread.cpp @@ -57,8 +57,10 @@ namespace hazelcast { } Thread::~Thread() { - interrupt(); - join(); + if (!isJoined) { + cancel(); + join(); + } CloseHandle(thread); } @@ -69,21 +71,25 @@ namespace hazelcast { throw thread_interrupted(); } bool wokenUpbyInterruption = condition.waitFor(mutex, seconds); - if(wokenUpbyInterruption){ + if(wokenUpbyInterruption && isInterrupted){ isInterrupted = false; throw thread_interrupted(); } + } + void Thread::wakeup() { + LockGuard lock(mutex); + condition.notify_all(); } - void Thread::interrupt() { + void Thread::cancel() { LockGuard lock(mutex); - isInterrupted = true; - condition.notify_all(); + isInterrupted = true; + condition.notify_all(); } bool Thread::join() { - if (isJoined) { + if (!isJoined.compareAndSet(false, true)) { return true; } DWORD err = WaitForSingleObject(thread, INFINITE); @@ -124,11 +130,10 @@ namespace hazelcast { } } - - #else #include +#include "hazelcast/util/LockGuard.h" namespace hazelcast { namespace util { @@ -158,29 +163,43 @@ namespace hazelcast { } Thread::~Thread() { - interrupt(); - join(); + if (!isJoined) { + cancel(); + join(); + } pthread_attr_destroy(&attr); } void Thread::interruptibleSleep(int seconds) { - util::sleep((unsigned int) seconds); + LockGuard guard(wakeupMutex); + wakeupCondition.waitFor(wakeupMutex, seconds); } std::string Thread::getThreadName() const { return threadName; } - void Thread::interrupt() { - pthread_cancel(thread); + void Thread::wakeup() { + LockGuard guard(wakeupMutex); + wakeupCondition.notify(); + } + + void Thread::cancel() { + if (!isJoined) { + LockGuard guard(wakeupMutex); + wakeupCondition.notify(); + + pthread_cancel(thread); + } } bool Thread::join() { - if (isJoined) { + if (!isJoined.compareAndSet(false, true)) { return true; } int err = pthread_join(thread, NULL); if (EINVAL == err || ESRCH == err || EDEADLK == err) { + isJoined = false; return false; } isJoined = true; diff --git a/hazelcast/src/hazelcast/util/Util.cpp b/hazelcast/src/hazelcast/util/Util.cpp index d5e0b1e74a..629aa1cfd0 100644 --- a/hazelcast/src/hazelcast/util/Util.cpp +++ b/hazelcast/src/hazelcast/util/Util.cpp @@ -81,7 +81,7 @@ namespace hazelcast { va_start(args, format); #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) - return vsnprintf_s(str, len, len-1, format, args); + return vsnprintf_s(str, len, _TRUNCATE, format, args); #else return vsnprintf(str, len, format, args); #endif diff --git a/hazelcast/test/src/ClientTestSupport.h b/hazelcast/test/src/ClientTestSupport.h index 1617790c0c..425aa3bcb4 100644 --- a/hazelcast/test/src/ClientTestSupport.h +++ b/hazelcast/test/src/ClientTestSupport.h @@ -35,7 +35,7 @@ namespace hazelcast { class ClientTestSupport : public ::testing::Test { public: - std::auto_ptr getConfig(); + virtual std::auto_ptr getConfig(); std::auto_ptr getNewClient(); }; diff --git a/hazelcast/test/src/HazelcastServer.cpp b/hazelcast/test/src/HazelcastServer.cpp index d8ce32b842..11ce6a4a3b 100644 --- a/hazelcast/test/src/HazelcastServer.cpp +++ b/hazelcast/test/src/HazelcastServer.cpp @@ -28,10 +28,9 @@ namespace hazelcast { namespace client { namespace test { - HazelcastServer::HazelcastServer(HazelcastServerFactory& factory) :factory(factory) - , id(factory.getInstanceId()) + , id(factory.getInstanceId(DEFAULT_RETRY_COUNT)) , isShutDown(false) { } diff --git a/hazelcast/test/src/HazelcastServer.h b/hazelcast/test/src/HazelcastServer.h index 6bef07b64b..613f9836b2 100644 --- a/hazelcast/test/src/HazelcastServer.h +++ b/hazelcast/test/src/HazelcastServer.h @@ -34,6 +34,8 @@ namespace hazelcast { class HazelcastServer { public: + static const int DEFAULT_RETRY_COUNT = 3; + HazelcastServer(HazelcastServerFactory &); /** diff --git a/hazelcast/test/src/HazelcastServerFactory.cpp b/hazelcast/test/src/HazelcastServerFactory.cpp index 8148583e0b..ff125b047b 100644 --- a/hazelcast/test/src/HazelcastServerFactory.cpp +++ b/hazelcast/test/src/HazelcastServerFactory.cpp @@ -18,11 +18,14 @@ -#include "HazelcastServerFactory.h" -#include "HazelcastServer.h" #include #include +#include "HazelcastServerFactory.h" +#include "HazelcastServer.h" +#include "hazelcast/util/ILogger.h" +#include "hazelcast/util/Util.h" + #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) #pragma warning(push) #pragma warning(disable: 4996) //for strerror @@ -32,23 +35,26 @@ namespace hazelcast { namespace client { namespace test { - HazelcastServerFactory::HazelcastServerFactory(const char* hostAddress) - : address(hostAddress, 6543) - , socket(address) - , outputSocketStream(socket) - , inputSocketStream(socket) { - if (int error = socket.connect(5000)) - std::cout << "HazelcastServerFactory " << strerror(error) << std::endl; - + HazelcastServerFactory::HazelcastServerFactory(const char *hostAddress) + : address(hostAddress, 6543), socket(address), outputSocketStream(socket), + inputSocketStream(socket), logger(util::ILogger::getLogger()) { + if (int error = socket.connect(5000)) { + char msg[200]; + util::snprintf(msg, 200, + "[HazelcastServerFactory] Could not connect to socket %s:6543. Errno:%d, %s", + hostAddress, error, strerror(error)); + logger.severe(msg); + } } HazelcastServerFactory::~HazelcastServerFactory() { try { - outputSocketStream.writeInt(END); + outputSocketStream.writeInt(END); inputSocketStream.readInt(); - } catch(std::exception &e) { - std::cout << e.what() << std::endl; - std::cout.flush(); + } catch (std::exception &e) { + char msg[200]; + util::snprintf(msg, 200, "[HazelcastServerFactory] ~HazelcastServerFactory() exception:%s", e.what()); + logger.severe(msg); } } @@ -57,7 +63,9 @@ namespace hazelcast { outputSocketStream.writeInt(id); int i = inputSocketStream.readInt(); if (i != OK) { - std::cout << "void HazelcastServerFactory::shutdownInstance(int id):" << i << std::endl; + char msg[200]; + util::snprintf(msg, 200, "[HazelcastServerFactory] shutdownInstance(int id): %d", i); + logger.info(msg); } } @@ -66,22 +74,40 @@ namespace hazelcast { try { int i = inputSocketStream.readInt(); if (i != OK) { - std::cout << "void HazelcastServerFactory::shutdownAll():" << i << std::endl; - std::cout.flush(); + char msg[200]; + util::snprintf(msg, 200, "[HazelcastServerFactory] shutdownAll(): %d", i); + logger.info(msg); } - } catch(std::exception &e) { - std::cout << e.what() << std::endl; + } catch (std::exception &e) { + char msg[200]; + util::snprintf(msg, 200, "[HazelcastServerFactory] shutdownAll exception:%s", e.what()); + logger.severe(msg); } } - int HazelcastServerFactory::getInstanceId() { + int HazelcastServerFactory::getInstanceId(int retryNumber) { outputSocketStream.writeInt(START); - return inputSocketStream.readInt(); + int id = inputSocketStream.readInt(); + if (FAIL == id) { + char msg[200]; + util::snprintf(msg, 200, "[HazelcastServerFactory::getInstanceId] Failed to start server"); + logger.warning(msg); + + while (id == FAIL && retryNumber > 0) { + util::snprintf(msg, 200, "[HazelcastServerFactory::getInstanceId] Retrying to start server. Retry number:%d", retryNumber); + logger.warning(msg); + outputSocketStream.writeInt(START); + id = inputSocketStream.readInt(); + --retryNumber; + } + } + + return id; } - const std::string& HazelcastServerFactory::getServerAddress() const{ + const std::string &HazelcastServerFactory::getServerAddress() const { return address.getHost(); } } diff --git a/hazelcast/test/src/HazelcastServerFactory.h b/hazelcast/test/src/HazelcastServerFactory.h index 9d8e76ce01..f6301016f1 100644 --- a/hazelcast/test/src/HazelcastServerFactory.h +++ b/hazelcast/test/src/HazelcastServerFactory.h @@ -26,6 +26,9 @@ #include "hazelcast/client/connection/InputSocketStream.h" namespace hazelcast { + namespace util { + class ILogger; + } namespace client { namespace test { @@ -36,6 +39,7 @@ namespace hazelcast { enum { OK = 5678, + FAIL = -1, END = 1, START = 2, SHUTDOWN = 3, @@ -48,7 +52,7 @@ namespace hazelcast { void shutdownAll(); - int getInstanceId(); + int getInstanceId(int retryNumber = 0); ~HazelcastServerFactory(); @@ -57,6 +61,7 @@ namespace hazelcast { Socket socket; connection::OutputSocketStream outputSocketStream; connection::InputSocketStream inputSocketStream; + util::ILogger &logger; void shutdownInstance(int id); }; diff --git a/hazelcast/test/src/SimpleMapTest.h b/hazelcast/test/src/SimpleMapTest.h index 24f8cc10c0..584cca0d68 100644 --- a/hazelcast/test/src/SimpleMapTest.h +++ b/hazelcast/test/src/SimpleMapTest.h @@ -125,15 +125,16 @@ class SimpleMapTest { } updateStats(updateIntervalCount, getCount, putCount, removeCount); } catch(hazelcast::client::exception::IOException &e) { - std::cerr << ">hz " << e.what() << std::endl; + hazelcast::util::ILogger::getLogger().warning(std::string("[SimpleMapTest IOException] ") + e.what()); } catch(hazelcast::client::exception::InstanceNotActiveException &e) { - std::cerr << ">std " << e.what() << std::endl; + hazelcast::util::ILogger::getLogger().warning(std::string("[SimpleMapTest InstanceNotActiveException] ") + e.what()); + } catch(hazelcast::client::exception::IException &e) { + hazelcast::util::ILogger::getLogger().warning(std::string("[SimpleMapTest IException] ") + e.what()); } catch(...) { - std::cerr << ">unkown exception" << std::endl; + hazelcast::util::ILogger::getLogger().warning("[SimpleMapTest unknown exception]"); running = false; throw; } - } } diff --git a/hazelcast/test/src/cluster/ClusterTest.cpp b/hazelcast/test/src/cluster/ClusterTest.cpp index 19e57ed242..1307261a4d 100644 --- a/hazelcast/test/src/cluster/ClusterTest.cpp +++ b/hazelcast/test/src/cluster/ClusterTest.cpp @@ -18,8 +18,8 @@ // #include "ClientTestSupport.h" -#include -#include +#include "hazelcast/util/CountDownLatch.h" +#include "hazelcast/client/MembershipListener.h" #include "hazelcast/client/InitialMembershipEvent.h" #include "hazelcast/client/InitialMembershipListener.h" #include "hazelcast/client/MemberAttributeEvent.h" diff --git a/hazelcast/test/src/faulttolerance/LoadTest.cpp b/hazelcast/test/src/faulttolerance/LoadTest.cpp new file mode 100644 index 0000000000..9219d31a9e --- /dev/null +++ b/hazelcast/test/src/faulttolerance/LoadTest.cpp @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// +// Created by İhsan Demir on Mar 6 2016. +// +#include +#include "hazelcast/util/Thread.h" +#include "hazelcast/util/CountDownLatch.h" +#include "hazelcast/util/ILogger.h" +#include "ClientTestSupport.h" +#include "HazelcastServer.h" +#include "hazelcast/client/IMap.h" +#include "hazelcast/client/HazelcastClient.h" +#include "hazelcast/client/ClientConfig.h" + +namespace hazelcast { + namespace client { + namespace test { + namespace faulttolerance { + class LoadTest : public ClientTestSupport { + public: + virtual std::auto_ptr getConfig() { + std::auto_ptr config = ClientTestSupport::getConfig(); + config->setRedoOperation(true); + config->setLogLevel(FINEST); + return config; + } + + static void loadClient(hazelcast::util::ThreadArgs &args) { + IMap *map = (IMap *) args.arg0; + int numberOfOps = *((int *) args.arg1); + util::CountDownLatch *latch = (util::CountDownLatch *) args.arg2; + + latch->countDown(); + + latch->await(20); + + for (int i = 0; i < numberOfOps; ++i) { + int mod = rand() % 3; + switch (mod) { + case 0: + ASSERT_NO_THROW(map->put(i, i)); + break; + case 1: + ASSERT_NO_THROW(map->remove(i)); + case 2: { + boost::shared_ptr val; + ASSERT_NO_THROW(val = map->get(i)); + if ((int *) NULL != val.get()) { + ASSERT_EQ(*val, i); + } + break; + } + default: + abort(); + } + } + } + + void addThread(util::Thread *thr) { + threads.push_back(thr); + } + + util::Thread *getThread(size_t i) const { + return threads[i]; + } + + ~LoadTest() { + for (std::vector::const_iterator it = threads.begin(); + it != threads.end(); ++it) { + delete *it; + } + } + + protected: + std::vector threads; + }; + + void loadIntMapTestWithConfig(ClientConfig &config, LoadTest &test) { + HazelcastServer instance1(*g_srvFactory); + HazelcastServer instance2(*g_srvFactory); + HazelcastServer instance3(*g_srvFactory); + HazelcastServer instance4(*g_srvFactory); + HazelcastClient client(config); + IMap imap = client.getMap("loadtest"); + + int numThreads = 40; + int numOps = 5000; + + util::CountDownLatch startLatch(numThreads); + + for (int i = 0; i < numThreads; ++i) { + test.addThread(new util::Thread(LoadTest::loadClient, &imap, &numOps, &startLatch)); + } + + startLatch.await(20); + + util::ILogger::getLogger().info( + "[LoadTest::loadIntMapTestWithConfig] Shutting down server instance 1"); + instance1.shutdown(); + util::ILogger::getLogger().info( + "[LoadTest::loadIntMapTestWithConfig] Shutting down server instance 2"); + instance2.shutdown(); + util::ILogger::getLogger().info( + "[LoadTest::loadIntMapTestWithConfig] Shutting down server instance 3"); + instance3.shutdown(); + + util::ILogger::getLogger().info("[LoadTest::loadIntMapTestWithConfig] Starting server instance 5"); + HazelcastServer instance5(*g_srvFactory); + + /*Note: Could not shutdown instance 5 here, since there may be some incomplete synchronization + * between instance 5 and instance 4. This caused problems in Linux environment. */ + + for (int i = 0; i < numThreads; ++i) { + util::Thread *thr = test.getThread(i); + char msg[100]; + util::snprintf(msg, 100, "[LoadTest::loadIntMapTestWithConfig] Waiting to join for thread %ld", + thr->getThreadID()); + util::ILogger::getLogger().info(msg); + ASSERT_TRUE(thr->join()); + } + + util::ILogger::getLogger().info( + "[LoadTest::loadIntMapTestWithConfig] Finished the test successfully :)"); + } + + TEST_F(LoadTest, testIntMapSmartClientServerRestart) { + std::auto_ptr config = getConfig(); + config->setSmart(true); + + loadIntMapTestWithConfig(*config, *this); + } + + TEST_F(LoadTest, testIntMapDummyClientServerRestart) { + std::auto_ptr config = getConfig(); + config->setSmart(false); + + loadIntMapTestWithConfig(*config, *this); + } + } + } + } +} diff --git a/hazelcast/test/src/issues/IssueTest.cpp b/hazelcast/test/src/issues/IssueTest.cpp index a72385b7f4..df03467098 100644 --- a/hazelcast/test/src/issues/IssueTest.cpp +++ b/hazelcast/test/src/issues/IssueTest.cpp @@ -114,7 +114,7 @@ namespace hazelcast { // 8. Verify that the 2nd entry is received by the listener ASSERT_EQ(true, latch.await(20, 0)); // timeout of 20 seconds - t.interrupt(); + t.cancel(); t.join(); // 9. Shut down the server diff --git a/hazelcast/test/src/queue/ClientQueueTest.cpp b/hazelcast/test/src/queue/ClientQueueTest.cpp index 7417790dbc..b1474af6dd 100644 --- a/hazelcast/test/src/queue/ClientQueueTest.cpp +++ b/hazelcast/test/src/queue/ClientQueueTest.cpp @@ -26,27 +26,26 @@ namespace hazelcast { namespace client { namespace test { ClientQueueTest::ClientQueueTest() - : instance(*g_srvFactory) - , client(getNewClient()) - , q(new IQueue< std::string>(client->getQueue< std::string >("clientQueueTest"))) { + : instance(*g_srvFactory), client(getNewClient()), + q(new IQueue(client->getQueue("clientQueueTest"))) { } ClientQueueTest::~ClientQueueTest() { } - class QueueTestItemListener : public ItemListener { + class QueueTestItemListener : public ItemListener { public: QueueTestItemListener(util::CountDownLatch &latch) - :latch(latch) { + : latch(latch) { } - void itemAdded(const ItemEvent& itemEvent) { + void itemAdded(const ItemEvent &itemEvent) { latch.countDown(); } - void itemRemoved(const ItemEvent& item) { + void itemRemoved(const ItemEvent &item) { } private: @@ -75,7 +74,7 @@ namespace hazelcast { IQueue *q = (IQueue *) args.arg0; util::sleep(2); q->offer("item1"); - std::cout << "item1 is offered" << std::endl; + std::cout << "item1 is offered" << std::endl; } TEST_F(ClientQueueTest, testOfferPoll) { @@ -89,14 +88,14 @@ namespace hazelcast { ASSERT_TRUE(result); for (int i = 0; i < 10; i++) { - ASSERT_NE(q->poll().get(), (std::string *)NULL); + ASSERT_NE(q->poll().get(), (std::string *) NULL); } ASSERT_EQ(0, q->size()); util::Thread t2(testOfferPollThread2, q.get()); - - boost::shared_ptr item = q->poll(30 * 1000); - ASSERT_NE(item.get(), (std::string *)NULL); + + boost::shared_ptr item = q->poll(30 * 1000); + ASSERT_NE(item.get(), (std::string *) NULL); ASSERT_EQ("item1", *item); t2.join(); } @@ -155,14 +154,14 @@ namespace hazelcast { ASSERT_TRUE(q->offer("item5")); std::vector list; - int result = q->drainTo(list, 2); - ASSERT_EQ(2, result); + size_t result = q->drainTo(list, 2); + ASSERT_EQ(2U, result); ASSERT_EQ("item1", list[0]); ASSERT_EQ("item2", list[1]); std::vector list2; result = q->drainTo(list2); - ASSERT_EQ(3, result); + ASSERT_EQ(3U, result); ASSERT_EQ("item3", list2[0]); ASSERT_EQ("item4", list2[1]); ASSERT_EQ("item5", list2[2]); @@ -176,15 +175,15 @@ namespace hazelcast { ASSERT_TRUE(q->offer("item5")); std::vector array = q->toArray(); - int size = array.size(); - for (int i = 0; i < size; i++) { + size_t size = array.size(); + for (size_t i = 0; i < size; i++) { ASSERT_EQ(std::string("item") + util::IOUtil::to_string(i + 1), array[i]); } } TEST_F(ClientQueueTest, testAddAll) { - std::vector coll; + std::vector coll; coll.push_back("item1"); coll.push_back("item2"); coll.push_back("item3"); @@ -235,7 +234,7 @@ namespace hazelcast { q->clear(); ASSERT_EQ(0, q->size()); - ASSERT_EQ(q->poll().get(), (std::string *)NULL); + ASSERT_EQ(q->poll().get(), (std::string *) NULL); } } } diff --git a/hazelcast/test/src/txn/ClientTxnTest.cpp b/hazelcast/test/src/txn/ClientTxnTest.cpp index 7efb6c8f50..5bf47fe545 100644 --- a/hazelcast/test/src/txn/ClientTxnTest.cpp +++ b/hazelcast/test/src/txn/ClientTxnTest.cpp @@ -28,11 +28,11 @@ namespace hazelcast { public: const Member next() { std::vector members = getMembers(); - long len = members.size(); + size_t len = members.size(); if (len == 0) { throw exception::IException("const Member& RoundRobinLB::next()", "No member in member list!!"); } - for (int i = 0; i < len; i++) { + for (size_t i = 0; i < len; i++) { if (members[i].getAddress().getPort() == 5701) { return members[i]; } @@ -80,7 +80,7 @@ namespace hazelcast { ClientTxnTest::~ClientTxnTest() { g_srvFactory->shutdownAll(); client->shutdown(); - client.reset(); + client.reset(); } TEST_F(ClientTxnTest, testTxnRollback) { diff --git a/hazelcast/test/src/util/BitsTest.cpp b/hazelcast/test/src/util/BitsTest.cpp index 727c2207ea..c248cbae1d 100644 --- a/hazelcast/test/src/util/BitsTest.cpp +++ b/hazelcast/test/src/util/BitsTest.cpp @@ -22,9 +22,6 @@ namespace hazelcast { namespace client { - - class HazelcastClient; - namespace test { namespace util { class BitsTest : public ::testing::Test diff --git a/hazelcast/test/src/util/ClientUtilTest.cpp b/hazelcast/test/src/util/ClientUtilTest.cpp index 40e0c19d26..9a5aa27cd6 100644 --- a/hazelcast/test/src/util/ClientUtilTest.cpp +++ b/hazelcast/test/src/util/ClientUtilTest.cpp @@ -152,7 +152,7 @@ namespace hazelcast { time_t beg = time(NULL); util::Thread thread(sleepyThread, &sleepTime); util::sleep(wakeUpTime); - thread.interrupt(); + thread.cancel(); thread.join(); time_t end = time(NULL); ASSERT_NEAR((int)(end - beg), wakeUpTime , 1); diff --git a/hazelcast/test/src/util/ConcurrentQueueTest.cpp b/hazelcast/test/src/util/ConcurrentQueueTest.cpp new file mode 100644 index 0000000000..5c901abba9 --- /dev/null +++ b/hazelcast/test/src/util/ConcurrentQueueTest.cpp @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// +// Created by İhsan Demir on Mar 6 2016. +// +#include +#include "hazelcast/util/ConcurrentQueue.h" +#include "hazelcast/util/Thread.h" +#include "hazelcast/util/CountDownLatch.h" +#include "hazelcast/util/ILogger.h" +#include "hazelcast/util/Util.h" + +namespace hazelcast { + namespace client { + namespace test { + namespace util { + class ConcurentQueueTest : public ::testing::Test + { + protected: + static void ConcurrentQueueTask(hazelcast::util::ThreadArgs &args) { + hazelcast::util::ConcurrentQueue *q = (hazelcast::util::ConcurrentQueue *)args.arg0; + hazelcast::util::CountDownLatch *startLatch = (hazelcast::util::CountDownLatch *)args.arg1; + hazelcast::util::CountDownLatch *startRemoveLatch = (hazelcast::util::CountDownLatch *)args.arg2; + int *removalValue = (int *)args.arg3; + + int numItems = 1000; + + std::vector values((size_t)numItems); + + startLatch->countDown(); + + ASSERT_TRUE(startLatch->await(10)); + + // insert items + for (int i = 0; i < numItems; ++i) { + values[i] = i; + q->offer(&values[i]); + } + + q->offer(removalValue); + startRemoveLatch->countDown(); + + // poll items + for (int i = 0; i < numItems; ++i) { + values[i] = i; + ASSERT_NE((int *)NULL, q->poll()); + } + } + }; + + TEST_F(ConcurentQueueTest, testSingleThread) { + hazelcast::util::ConcurrentQueue q; + + ASSERT_EQ((int *)NULL, q.poll()); + + int val1, val2; + + q.offer(&val1); + + ASSERT_EQ(&val1, q.poll()); + + ASSERT_EQ((int *)NULL, q.poll()); + + q.offer(&val1); + q.offer(&val2); + q.offer(&val2); + q.offer(&val1); + + ASSERT_EQ(2, q.removeAll(&val2)); + ASSERT_EQ(0, q.removeAll(&val2)); + + ASSERT_EQ(&val1, q.poll()); + ASSERT_EQ(&val1, q.poll()); + + ASSERT_EQ((int *)NULL, q.poll()); + } + + TEST_F(ConcurentQueueTest, testMultiThread) { + int numThreads = 40; + + hazelcast::util::CountDownLatch startLatch(numThreads); + + hazelcast::util::CountDownLatch startRemoveLatch(numThreads); + + hazelcast::util::ConcurrentQueue q; + + int removalValue = 10; + + std::vector threads((size_t)numThreads); + for (int i = 0; i < numThreads; ++i) { + // I would prefer using scoped_ptr or boost:::scoped_array for array if there was one available + threads[i] = new hazelcast::util::Thread(ConcurentQueueTest::ConcurrentQueueTask, &q, &startLatch, &startRemoveLatch, &removalValue); + } + + // wait for the remove start + ASSERT_TRUE(startRemoveLatch.await(30)); + + int numRemoved = q.removeAll(&removalValue); + + int numRemaining = numThreads - numRemoved; + + char msg[200]; + hazelcast::util::snprintf(msg, 200, "Was able to remove %d items and left %d items", (numThreads - numRemaining), numRemaining); + hazelcast::util::ILogger::getLogger().info(msg); + + for (int j = 0; j < numRemaining; ++j) { + ASSERT_NE((int *)NULL, q.poll()); + } + ASSERT_EQ(0, q.removeAll(&removalValue)); + + for (int i = 0; i < numThreads; ++i) { + ASSERT_TRUE(threads[i]->join()); + } + + for (int i = 0; i < numThreads; ++i) { + delete threads[i]; + } + } + } + } + } +} diff --git a/java/src/main/java/CppClientListener.java b/java/src/main/java/CppClientListener.java index ff4d9185cb..789b8e7402 100644 --- a/java/src/main/java/CppClientListener.java +++ b/java/src/main/java/CppClientListener.java @@ -19,6 +19,8 @@ import com.hazelcast.config.XmlConfigBuilder; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; import com.hazelcast.map.EntryBackupProcessor; import com.hazelcast.map.EntryProcessor; import com.hazelcast.nio.ObjectDataInput; @@ -206,6 +208,7 @@ public EntryBackupProcessor getBackupProcessor() { public class CppClientListener { static final int OK = 5678; + static final int FAIL = -1; static final int END = 1; static final int START = 2; static final int SHUTDOWN = 3; @@ -221,29 +224,54 @@ public static void main(String args[]) throws IOException { final DataInputStream dataInputStream = new DataInputStream(socket.getInputStream()); final DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream()); + ILogger logger = Logger.getLogger("CppClientListener"); while (true) { final int command = dataInputStream.readInt(); switch (command) { case START: - System.out.println("NEW INSTANCE OPEN "); - final int id = atomicInteger.incrementAndGet(); - map.put(id, getInstance(config)); - dataOutputStream.writeInt(id); + System.out.println("START command received: NEW INSTANCE OPEN "); + try { + final int id = atomicInteger.incrementAndGet(); + map.put(id, getInstance(config)); + dataOutputStream.writeInt(id); + } catch (Exception e) { + logger.warning("START command failed. Error:" + e); + dataOutputStream.writeInt(FAIL); + } break; case SHUTDOWN: - final int id2 = dataInputStream.readInt(); - final HazelcastInstance instance = map.get(id2); - if (instance == null) { + logger.info("SHUTDOWN command received"); + int id2 = -1; + try { + id2 = dataInputStream.readInt(); + + logger.info("SHUTDOWN command for instance " + id2); + + final HazelcastInstance instance = map.get(id2); + if (instance == null) { + dataOutputStream.writeInt(OK); + continue; + } + instance.getLifecycleService().shutdown(); dataOutputStream.writeInt(OK); - continue; + + logger.info("SHUTDOWN for instance " + id2 + " is completed."); + } catch (Exception e) { + logger.warning("SHUTDOWN failed for instance " + id2 + ". Error:" + e); + dataOutputStream.writeInt(FAIL); } - instance.getLifecycleService().shutdown(); - dataOutputStream.writeInt(OK); break; case SHUTDOWN_ALL: - Hazelcast.shutdownAll(); - map.clear(); - dataOutputStream.writeInt(OK); + logger.info("SHUTDOWN_ALL command received"); + try { + Hazelcast.shutdownAll(); + map.clear(); + dataOutputStream.writeInt(OK); + } catch (Exception e) { + logger.warning("SHUTDOWN_ALL command failed. Error:" + e); + dataOutputStream.writeInt(FAIL); + } + break; case END: System.exit(0);