Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace hazelcast {

void connect(int timeoutInMillis);

void close();
void close(const char *closeReason = NULL);

void write(protocol::ClientMessage *message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ namespace hazelcast {
namespace exception {
class HAZELCAST_API ProtocolException : public IException {
public:
ProtocolException(const std::string& source, const std::string& message, int32_t errorNo, int32_t causeCode)
: IException(source, message), errorCode(errorNo), causeErrorCode(causeCode) {
ProtocolException(const std::string& message, const std::string& details, int32_t errorNo,
int32_t causeCode)
: IException("Cluster", details), errorCode(errorNo), causeErrorCode(causeCode) {
}

ProtocolException(const std::string& source, const std::string& message)
Expand All @@ -56,8 +57,8 @@ namespace hazelcast {
#define DEFINE_PROTOCOL_EXCEPTION(ClassName) \
class HAZELCAST_API ClassName : public ProtocolException {\
public:\
ClassName(const std::string& source, const std::string& message, int32_t errorCode, int32_t causeCode) \
: ProtocolException(source, message, errorCode, causeCode) {\
ClassName(const std::string& message, const std::string& details, int32_t errorNo, int32_t causeCode) \
: ProtocolException(message, details, errorNo, causeCode) {\
}\
ClassName(const std::string& source, const std::string& message) \
: ProtocolException(source, message) {\
Expand Down
5 changes: 1 addition & 4 deletions hazelcast/include/hazelcast/client/spi/LifecycleService.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
//
// Created by sancar koyunlu on 6/17/13.




#ifndef HAZELCAST_LIFECYCLE_SERVICE
#define HAZELCAST_LIFECYCLE_SERVICE

Expand Down Expand Up @@ -67,7 +64,7 @@ namespace hazelcast {
std::set<LifecycleListener *> listeners;
util::Mutex listenerLock;
util::AtomicBoolean active;

util::Mutex shutdownLock;
};

}
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/include/hazelcast/util/Closeable.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace hazelcast {
public:
virtual ~Closeable();

virtual void close() = 0;
virtual void close(const char *closeReason) = 0;
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/include/hazelcast/util/IOUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace hazelcast {
return value;
}

static void closeResource(Closeable *closable);
static void closeResource(Closeable *closable, const char *closeReason = NULL);

};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ namespace hazelcast {
util::ILogger::getLogger().severe(
std::string("Error while connecting to cluster! =>") + e.what());
isStartedSuccessfully = false;
clientContext.getLifecycleService().shutdown();
startLatch.countDown();
return;
}
Expand All @@ -105,7 +106,7 @@ namespace hazelcast {

clientContext.getConnectionManager().onCloseOwnerConnection();
if (deletingConnection.compareAndSet(false, true)) {
util::IOUtil::closeResource(conn.get());
util::IOUtil::closeResource(conn.get(), "Error while listening cluster events");
conn.reset();
deletingConnection = false;
clientContext.getLifecycleService().fireLifecycleEvent(LifecycleEvent::CLIENT_DISCONNECTED);
Expand All @@ -117,7 +118,7 @@ namespace hazelcast {

void ClusterListenerThread::stop() {
if (deletingConnection.compareAndSet(false, true)) {
util::IOUtil::closeResource(conn.get());
util::IOUtil::closeResource(conn.get(), "Cluster listener thread is stopping");
conn.reset();
deletingConnection = false;
}
Expand Down
8 changes: 5 additions & 3 deletions hazelcast/src/hazelcast/client/connection/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ namespace hazelcast {
outputSocketStream.write(PROTOCOL);
}

void Connection::close() {
void Connection::close(const char *closeReason) {
if (!live.compareAndSet(true, false)) {
return;
}
Expand All @@ -98,8 +98,10 @@ namespace hazelcast {
int socketId = socket->getSocketId();

std::stringstream message;
message << "Closing connection (id:" << connectionId << ") to " << serverAddr << " with socket id " << socketId <<
(_isOwnerConnection ? " as the owner connection." : ".");
message << "Closing connection (id:" << connectionId << ") to " << serverAddr <<
" with socket id " << socketId <<
(_isOwnerConnection ? " as the owner connection." : ". ") <<
(NULL != closeReason ? closeReason : "");
util::ILogger::getLogger().warning(message.str());
if (!_isOwnerConnection) {
readHandler.deRegisterSocket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//
// Created by sancar koyunlu on 8/21/13.

#include <boost/foreach.hpp>
#include "hazelcast/util/Util.h"
#include "hazelcast/client/protocol/AuthenticationStatus.h"
#include "hazelcast/client/exception/AuthenticationException.h"
Expand Down Expand Up @@ -69,6 +70,10 @@ namespace hazelcast {

void ConnectionManager::shutdown() {
live = false;
// close connections
BOOST_FOREACH(boost::shared_ptr<Connection> connection , connections.values()) {
connection->close("Hazelcast client is shutting down");
}
heartBeater.shutdown();
if (heartBeatThread.get() != NULL) {
heartBeatThread->cancel();
Expand Down Expand Up @@ -350,7 +355,7 @@ namespace hazelcast {

void ConnectionManager::checkLive() {
if (!live) {
throw exception::HazelcastException("ConnectionManager is not active!");
throw exception::HazelcastException("ConnectionManager", "ConnectionManager is not active!");
}
}

Expand Down Expand Up @@ -384,7 +389,7 @@ namespace hazelcast {
} else {
ownerConnectionFuture.close();
}
util::IOUtil::closeResource(&connection);
util::IOUtil::closeResource(&connection, "Heartbeat failed");
}

void ConnectionManager::removeEndpoint(const Address &address) {
Expand Down
15 changes: 13 additions & 2 deletions hazelcast/src/hazelcast/client/connection/IOSelector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ namespace hazelcast {
:connectionManager(connectionManager) {
t.tv_sec = 5;
t.tv_usec = 0;
isAlive = true;
}

void IOSelector::staticListen(util::ThreadArgs &args) {
Expand All @@ -55,6 +54,10 @@ namespace hazelcast {
}

void IOSelector::wakeUp() {
if (!wakeUpSocket.get()) {
return;
}

int wakeUpSignal = 9;
try {
wakeUpSocket->send(&wakeUpSignal, sizeof(int));
Expand Down Expand Up @@ -91,6 +94,7 @@ namespace hazelcast {
sleepingSocket->setBlocking(false);
wakeUpSocketSet.insertSocket(sleepingSocket.get());
wakeUpListenerSocketId = sleepingSocket->getSocketId();
isAlive = true;
return true;
} else {
util::ILogger::getLogger().severe("IOSelector::initListenSocket " + std::string(strerror(errno)));
Expand All @@ -99,7 +103,14 @@ namespace hazelcast {
}

void IOSelector::shutdown() {
isAlive = false;
if (!isAlive.compareAndSet(true, false)) {
return;
}
try {
wakeUp();
} catch (exception::IOException &) {
// suppress io exception
}
}

void IOSelector::addTask(ListenerTask *listenerTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ namespace hazelcast {
std::stringstream message;
message << "Closing owner connection to " << currentOwnerConnection->getRemoteEndpoint();
util::ILogger::getLogger().finest(message.str());
util::IOUtil::closeResource(currentOwnerConnection.get());
util::IOUtil::closeResource(currentOwnerConnection.get(), message.str().c_str());
markAsClosed();
}
}
Expand Down
4 changes: 3 additions & 1 deletion hazelcast/src/hazelcast/client/spi/ClusterService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ namespace hazelcast {
}

void ClusterService::shutdown() {
active = false;
if (!active.compareAndSet(true, false)) {
return;
}
if (NULL != clusterThread.getThread()) {
// avoid anyone waiting on the start latch to get stuck
clusterThread.startLatch.countDown();
Expand Down
11 changes: 7 additions & 4 deletions hazelcast/src/hazelcast/client/spi/LifecycleService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@ namespace hazelcast {
}

void LifecycleService::shutdown() {
if (!active.compareAndSet(true, false))
util::LockGuard guard(shutdownLock);

if (!active.compareAndSet(true, false)) {
return;
}
fireLifecycleEvent(LifecycleEvent::SHUTTING_DOWN);
clientContext.getInvocationService().shutdown();
clientContext.getPartitionService().shutdown();
clientContext.getClusterService().shutdown();
clientContext.getConnectionManager().shutdown();
clientContext.getClusterService().shutdown();
clientContext.getPartitionService().shutdown();
clientContext.getInvocationService().shutdown();
clientContext.getNearCacheManager().destroyAllNearCaches();
fireLifecycleEvent(LifecycleEvent::SHUTDOWN);
}
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/src/hazelcast/util/IOUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

namespace hazelcast {
namespace util {
void IOUtil::closeResource(Closeable *closable) {
void IOUtil::closeResource(Closeable *closable, const char *closeReason) {
if (closable != NULL) {
try {
closable->close();
closable->close(closeReason);
} catch (client::exception::IException& e) {
std::stringstream message;
message << "closeResource failed" << e.what();
Expand Down
12 changes: 12 additions & 0 deletions hazelcast/src/hazelcast/util/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace hazelcast {
, isJoined(false)
, isInterrupted(false){
init(func, arg0, arg1, arg2, arg3);

}

long Thread::getThreadID() {
Expand Down Expand Up @@ -92,6 +93,11 @@ namespace hazelcast {
if (!isJoined.compareAndSet(false, true)) {
return true;
}
if (id == getThreadID()) {
// called from inside the thread, deadlock possibility
return false;
}

DWORD err = WaitForSingleObject(thread, INFINITE);
if (err != WAIT_OBJECT_0) {
return false;
Expand Down Expand Up @@ -197,6 +203,12 @@ namespace hazelcast {
if (!isJoined.compareAndSet(false, true)) {
return true;
}

if (pthread_equal(thread, pthread_self())) {
// called from inside the thread, deadlock possibility
return false;
}

int err = pthread_join(thread, NULL);
if (EINVAL == err || ESRCH == err || EDEADLK == err) {
isJoined = false;
Expand Down
7 changes: 0 additions & 7 deletions hazelcast/test/src/cluster/ClientConnectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ namespace hazelcast {
ASSERT_THROW(HazelcastClient client(config), exception::IllegalStateException);
}

TEST_F(ClientConnectionTest, testTcpSocketConnectionTimeout_withIntMax) {
HazelcastServer instance(*g_srvFactory, true);
ClientConfig config;
config.addAddress(Address("8.8.8.8", 5701));
ASSERT_THROW(HazelcastClient client(config), exception::IllegalStateException);
}

#ifdef HZ_BUILD_WITH_SSL
TEST_F(ClientConnectionTest, testSslSocketTimeoutToOutsideNetwork) {
HazelcastServer instance(*g_srvFactory, true);
Expand Down
Loading