Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hazelcast/include/hazelcast/client/connection/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ namespace hazelcast {

int connectionId;
};

std::ostream HAZELCAST_API &operator << (std::ostream &out, const Connection &connection);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ namespace hazelcast {
void onConnectionClose(const Address &address, int socketId);

/**
* Shutdown clientConnectionManager
* Shutdown clientConnectionManager. It does not throw any excpetion.
*/
void shutdown();

Expand Down
5 changes: 2 additions & 3 deletions hazelcast/include/hazelcast/client/spi/InvocationService.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "hazelcast/util/AtomicInt.h"
#include "hazelcast/util/SynchronizedMap.h"
#include "hazelcast/client/protocol/IMessageHandler.h"
#include "hazelcast/util/AtomicBoolean.h"
#include "hazelcast/client/protocol/ClientExceptionFactory.h"

#include <boost/shared_ptr.hpp>
Expand Down Expand Up @@ -65,6 +64,7 @@ namespace hazelcast {

namespace spi {
class ClientContext;
class LifecycleService;

class HAZELCAST_API InvocationService : public protocol::IMessageHandler {
public:
Expand Down Expand Up @@ -110,7 +110,7 @@ namespace hazelcast {
* @param callId of event handler registration request
* @return true if found and removed, false otherwise
*/
void removeEventHandler(int64_t callId);
bool removeEventHandler(int64_t callId);

/**
* Clean all promises (both request and event handlers). Retries requests on available connections if applicable.
Expand Down Expand Up @@ -142,7 +142,6 @@ namespace hazelcast {
// Is not using the Connection* for the key due to a possible ABA problem.
util::SynchronizedMap<int , util::SynchronizedMap<int64_t, connection::CallPromise > > callPromises;
util::SynchronizedMap<int, util::SynchronizedMap<int64_t, connection::CallPromise > > eventHandlerPromises;
util::AtomicBoolean isOpen;
protocol::ClientExceptionFactory exceptionFactory;

bool isAllowedToSentRequest(connection::Connection& connection, protocol::ClientMessage const&);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ namespace hazelcast {
previousConnectionAddr = conn->getRemoteEndpoint();
previousConnectionAddrPtr = &previousConnectionAddr;
} catch (std::exception &e) {
util::ILogger::getLogger().severe(
std::string("Error while connecting to cluster! =>") + e.what());
if (clientContext.getLifecycleService().isRunning()) {
util::ILogger::getLogger().severe(
std::string("Error while connecting to cluster! =>") + e.what());
}
isStartedSuccessfully = false;
clientContext.getLifecycleService().shutdown();
startLatch.countDown();
Expand Down
15 changes: 15 additions & 0 deletions hazelcast/src/hazelcast/client/connection/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,21 @@ namespace hazelcast {
bool Connection::isOwnerConnection() const {
return _isOwnerConnection;
}

std::ostream HAZELCAST_API &operator << (std::ostream &out, const Connection &connection) {
Connection &conn = const_cast<Connection &>(connection);
time_t lastRead = conn.lastRead;
bool live = conn.live;
out << "ClientConnection{"
<< "alive=" << live
<< ", connectionId=" << connection.getConnectionId()
<< ", remoteEndpoint=" << connection.getRemoteEndpoint()
<< ", lastReadTime=" << lastRead
<< ", isHeartbeating=" << conn.isHeartBeating()
<< '}';

return out;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ namespace hazelcast {
live = false;
// close connections
BOOST_FOREACH(boost::shared_ptr<Connection> connection , connections.values()) {
connection->close("Hazelcast client is shutting down");
// prevent any exceptions
util::IOUtil::closeResource(connection.get(), "Hazelcast client is shutting down");
}
heartBeater.shutdown();
if (heartBeatThread.get() != NULL) {
Expand Down
80 changes: 44 additions & 36 deletions hazelcast/src/hazelcast/client/spi/InvocationService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "hazelcast/client/ClientProperties.h"
#include "hazelcast/client/connection/Connection.h"
#include "hazelcast/client/spi/ServerListenerService.h"
#include "hazelcast/client/spi/LifecycleService.h"
#include "hazelcast/client/serialization/pimpl/SerializationService.h"
#include "hazelcast/client/exception/IllegalStateException.h"
#include "hazelcast/client/exception/InstanceNotActiveException.h"
Expand All @@ -42,7 +43,7 @@ namespace hazelcast {
namespace client {
namespace spi {
InvocationService::InvocationService(spi::ClientContext &clientContext)
: clientContext(clientContext), isOpen(false) {
: clientContext(clientContext) {
redoOperation = clientContext.getClientConfig().isRedoOperation();
ClientProperties &properties = clientContext.getClientProperties();
retryWaitTime = properties.getRetryWaitTime().getInteger();
Expand All @@ -65,11 +66,10 @@ namespace hazelcast {
}

bool InvocationService::start() {
return isOpen.compareAndSet(false, true);
return true;
}

void InvocationService::shutdown() {
isOpen.compareAndSet(true, false);
}

connection::CallFuture InvocationService::invokeOnRandomTarget(
Expand Down Expand Up @@ -136,15 +136,18 @@ namespace hazelcast {
return retryCount;
}

void InvocationService::removeEventHandler(int64_t callId) {
bool InvocationService::removeEventHandler(int64_t callId) {
std::vector<boost::shared_ptr<connection::Connection> > connections = clientContext.getConnectionManager().getConnections();
std::vector<boost::shared_ptr<connection::Connection> >::iterator it;
for (it = connections.begin(); it != connections.end(); ++it) {
boost::shared_ptr<connection::Connection> &connectionPtr = *it;
if (deRegisterEventHandler(*connectionPtr, callId) != NULL) {
return;
boost::shared_ptr<connection::CallPromise> eventPromise = deRegisterEventHandler(*connectionPtr,
callId);
if (eventPromise.get() != (connection::CallPromise *) NULL) {
return true;
}
}
return false;
}


Expand Down Expand Up @@ -221,12 +224,11 @@ namespace hazelcast {
boost::shared_ptr<connection::Connection> actualConn = registerAndEnqueue(connection, promise);

if (NULL != actualConn.get()) {
char msg[300];
const Address &serverAddr = connection->getRemoteEndpoint();
hazelcast::util::snprintf(msg, 300, "[InvocationService::resend] Re-sending the request with id %lld "
"originally destined for %s to server [%s:%d] using the new correlation id %lld", correlationId,
lastTriedAddress.c_str(), serverAddr.getHost().c_str(), serverAddr.getPort(), promise->getRequest()->getCorrelationId());
util::ILogger::getLogger().info(msg);
std::ostringstream out;
out << "[InvocationService::resend] Re-sending the request with id " << correlationId <<
" originally destined for " << lastTriedAddress << " on connection " << *actualConn <<
" using the new correlation id " << promise->getRequest()->getCorrelationId();
util::ILogger::getLogger().info(out.str());
}

return actualConn;
Expand All @@ -235,15 +237,13 @@ namespace hazelcast {
boost::shared_ptr<connection::Connection> InvocationService::registerAndEnqueue(
boost::shared_ptr<connection::Connection> &connection,
boost::shared_ptr<connection::CallPromise> promise) {
if (!isOpen) {
if (!clientContext.getLifecycleService().isRunning()) {
char msg[200];
util::snprintf(msg, 200, "[InvocationService::registerAndEnqueue] InvocationService is shutdown. "
"Did not register the promise for message correlation id:%lld",
promise->getRequest()->getCorrelationId());
hazelcast::util::ILogger::getLogger().info(msg);
util::snprintf(msg, 200, "Client is not running. Did not register the promise for message "
"correlation id:%lld", promise->getRequest()->getCorrelationId());

std::auto_ptr<exception::IException> exception(new exception::IllegalStateException(
"InvocationService::registerAndEnqueue", "Invocation service is not open. Can not process the request."));
"InvocationService::registerAndEnqueue", msg));
promise->setException(exception);

return boost::shared_ptr<connection::Connection>();
Expand Down Expand Up @@ -343,9 +343,13 @@ namespace hazelcast {
boost::shared_ptr<connection::CallPromise> promise) {
client::impl::BaseEventHandler *eventHandler = promise->getEventHandler();
if (eventHandler != NULL) {
if (eventHandler->registrationId.size() ==
0) //if uuid is not set, it means it is first time that we are getting uuid.
return true; // then no need to handle it, just set as normal response
/**
* if uuid is not set, it means it is first time that we are getting uuid.
* then no need to handle it, just treat non-event response.
*/
if (eventHandler->registrationId.empty()) {
return true;
}

// result->registrationId is the alias for the original registration
clientContext.getServerListenerService().reRegisterListener(eventHandler->registrationId, response);
Expand All @@ -359,8 +363,8 @@ namespace hazelcast {
void InvocationService::tryResend(std::auto_ptr<exception::IException> exception,
boost::shared_ptr<connection::CallPromise> promise,
const std::string &lastTriedAddress) {
bool serviceOpen = isOpen;
if (serviceOpen && (promise->getRequest()->isRetryable() || isRedoOperation())) {
if (clientContext.getLifecycleService().isRunning() &&
(promise->getRequest()->isRetryable() || isRedoOperation())) {
resend(promise, lastTriedAddress);
return;
}
Expand All @@ -385,16 +389,19 @@ namespace hazelcast {

std::string address = util::IOUtil::to_string(connection.getRemoteEndpoint());

char msg[200];
util::snprintf(msg, 200, "[cleanResources] There are %u waiting promises on connection with id:%d (%s) ",
promises.size(), connection.getConnectionId(), address.c_str());
util::ILogger::getLogger().info(msg);
util::ILogger &logger = util::ILogger::getLogger();
if (logger.isFinestEnabled()) {
std::ostringstream out;
out << "[InvocationService::cleanResources] There are " << promises.size() << " waiting promises on "
"connection " << connection;
logger.finest(out.str());
}

for (std::vector<std::pair<int64_t, boost::shared_ptr<connection::CallPromise> > >::iterator it = promises.begin();
it != promises.end(); ++it) {
if (!isOpen) {
if (!clientContext.getLifecycleService().isRunning()) {
std::auto_ptr<exception::IException> exception(new exception::IllegalStateException(
"InvocationService::cleanResources", "Invocation service is not open."));
"InvocationService::cleanResources", "Client is not running."));
it->second->setException(exception);
} else {
std::auto_ptr<exception::IException> exception(new exception::IOException(
Expand All @@ -411,19 +418,20 @@ namespace hazelcast {
connection)->clear();

util::ILogger &logger = util::ILogger::getLogger();
if (logger.isFinestEnabled()) {
char msg[200];
util::snprintf(msg, 200, "[InvocationService::cleanEventHandlers] There are %ld event handler promises "
"on connection with id:%d to be retried", promises.size(), connection.getConnectionId());
logger.finest(msg);
}

char msg[200];
util::snprintf(msg, 200, "[InvocationService::cleanEventHandlers] There are %ld event handler promises on connection with id:%d to be retried",
promises.size(), connection.getConnectionId());
logger.info(msg);

if (isOpen) {
if (clientContext.getLifecycleService().isRunning()) {
for (std::vector<std::pair<int64_t, boost::shared_ptr<connection::CallPromise> > >::const_iterator it = promises.begin();
it != promises.end(); ++it) {
clientContext.getServerListenerService().retryFailedListener(it->second);
}
} else {
logger.info("[InvocationService::cleanEventHandlers] The service is closed. Shall not retry "
logger.finest("[InvocationService::cleanEventHandlers] The service is closed. Shall not retry "
"registering any event handler if exists.");
}
}
Expand Down
11 changes: 6 additions & 5 deletions hazelcast/src/hazelcast/client/spi/LifecycleService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ namespace hazelcast {
fireLifecycleEvent(LifecycleEvent::STARTING);
active = true;

if (!clientContext.getConnectionManager().start()) {

if (!clientContext.getInvocationService().start()) {
return false;
}

if (!clientContext.getClusterService().start()) {
if (!clientContext.getConnectionManager().start()) {
return false;
}

if (!clientContext.getInvocationService().start()) {
if (!clientContext.getClusterService().start()) {
return false;
}

Expand All @@ -70,10 +71,10 @@ namespace hazelcast {
return;
}
fireLifecycleEvent(LifecycleEvent::SHUTTING_DOWN);
clientContext.getInvocationService().shutdown();
clientContext.getPartitionService().shutdown();
clientContext.getConnectionManager().shutdown();
clientContext.getClusterService().shutdown();
clientContext.getPartitionService().shutdown();
clientContext.getInvocationService().shutdown();
clientContext.getNearCacheManager().destroyAllNearCaches();
fireLifecycleEvent(LifecycleEvent::SHUTDOWN);
}
Expand Down
30 changes: 20 additions & 10 deletions hazelcast/src/hazelcast/client/spi/PartitionService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ namespace hazelcast {
responseMessage = future.get();

} catch (exception::IOException& e) {
util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what());
if (clientContext.getLifecycleService().isRunning()) {
util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what());
}
}
return responseMessage;
}
Expand All @@ -123,7 +125,9 @@ namespace hazelcast {
responseMessage = future.get();

} catch (exception::IOException& e) {
util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what());
if (clientContext.getLifecycleService().isRunning()) {
util::ILogger::getLogger().severe(std::string("Error while fetching cluster partition table => ") + e.what());
}
}
return responseMessage;
}
Expand Down Expand Up @@ -170,12 +174,14 @@ namespace hazelcast {
}
}

if (!result) {
util::ILogger::getLogger().severe("PartitionService::getInitialPartitions Cannot get initial partitions!");
} else {
util::ILogger::getLogger().finest("PartitionService::getInitialPartitions Got " +
util::IOUtil::to_string<int>(partitionCount) +
" initial partitions successfully.");
if (clientContext.getLifecycleService().isRunning()) {
if (!result) {
util::ILogger::getLogger().severe("PartitionService::getInitialPartitions Cannot get initial partitions!");
} else {
util::ILogger::getLogger().finest("PartitionService::getInitialPartitions Got " +
util::IOUtil::to_string<int>(partitionCount) +
" initial partitions successfully.");
}
}
return result;
}
Expand All @@ -195,9 +201,13 @@ namespace hazelcast {
processPartitionResponse(*partitionResponse);
}
} catch (hazelcast::client::exception::IException& e) {
util::ILogger::getLogger().finest(std::string("Exception in partitionService::refreshPartitions ") + e.what());
if (clientContext.getLifecycleService().isRunning()) {
util::ILogger::getLogger().finest(std::string("Exception in partitionService::refreshPartitions ") + e.what());
}
} catch (...) {
util::ILogger::getLogger().finest(std::string("Unkown exception in partitionService::refreshPartitions "));
if (clientContext.getLifecycleService().isRunning()) {
util::ILogger::getLogger().finest(std::string("Unkown exception in partitionService::refreshPartitions "));
}
throw;
}
updating = false;
Expand Down
Loading