Skip to content

Commit

Permalink
[C++] Make some clean up methods thread safe (#11762)
Browse files Browse the repository at this point in the history
* Make some close methods thread safe

* Restore shutdown() in ClientImpl's destructor and check whether connection pool is closed
  • Loading branch information
BewareMyPower committed Aug 25, 2021
1 parent 241de4b commit 098ba16
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 14 deletions.
13 changes: 12 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,21 @@ void ClientImpl::shutdown() {
}
}

pool_.close();
if (producers.size() + consumers.size() > 0) {
LOG_DEBUG(producers.size() << " producers and " << consumers.size()
<< " consumers have been shutdown.");
}
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called before.
return;
}
LOG_DEBUG("ConnectionPool is closed");
ioExecutorProvider_->close();
LOG_DEBUG("ioExecutorProvider_ is closed");
listenerExecutorProvider_->close();
LOG_DEBUG("listenerExecutorProvider_ is closed");
partitionListenerExecutorProvider_->close();
LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
}

uint64_t ClientImpl::newProducerId() {
Expand Down
14 changes: 13 additions & 1 deletion pulsar-client-cpp/lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
poolConnections_(poolConnections),
mutex_() {}

void ConnectionPool::close() {
bool ConnectionPool::close() {
bool expectedState = false;
if (!closed_.compare_exchange_strong(expectedState, true)) {
return false;
}

std::unique_lock<std::mutex> lock(mutex_);
if (poolConnections_) {
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
Expand All @@ -52,10 +57,17 @@ void ConnectionPool::close() {
}
pool_.clear();
}
return true;
}

Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
const std::string& logicalAddress, const std::string& physicalAddress) {
if (closed_) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}

std::unique_lock<std::mutex> lock(mutex_);

if (poolConnections_) {
Expand Down
9 changes: 8 additions & 1 deletion pulsar-client-cpp/lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "ClientConnection.h"

#include <atomic>
#include <string>
#include <map>
#include <mutex>
Expand All @@ -36,7 +37,12 @@ class PULSAR_PUBLIC ConnectionPool {
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
const AuthenticationPtr& authentication, bool poolConnections = true);

void close();
/**
* Close the connection pool.
*
* @return false if it has already been closed.
*/
bool close();

/**
* Get a connection from the pool.
Expand Down Expand Up @@ -65,6 +71,7 @@ class PULSAR_PUBLIC ConnectionPool {
PoolMap pool_;
bool poolConnections_;
std::mutex mutex_;
std::atomic_bool closed_{false};

friend class ConnectionPoolTest;
};
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/ExecutorService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
}

void ExecutorService::close() {
bool expectedState = false;
if (!closed_.compare_exchange_strong(expectedState, true)) {
return;
}

io_service_->stop();
work_.reset();
// If this thread is attempting to join itself, do not. The destructor's
Expand Down Expand Up @@ -95,6 +100,8 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
}

void ExecutorServiceProvider::close() {
Lock lock(mutex_);

for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) {
if (*it != NULL) {
(*it)->close();
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ExecutorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef _PULSAR_EXECUTOR_SERVICE_HEADER_
#define _PULSAR_EXECUTOR_SERVICE_HEADER_

#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
Expand Down Expand Up @@ -73,6 +74,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
* io_service
*/
std::thread worker_;

std::atomic_bool closed_{false};
};

typedef std::shared_ptr<ExecutorService> ExecutorServicePtr;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/LogUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace pulsar {
#endif

#define DECLARE_LOG_OBJECT() \
inline pulsar::Logger* logger() { \
static pulsar::Logger* logger() { \
static thread_local std::unique_ptr<pulsar::Logger> threadSpecificLogPtr; \
pulsar::Logger* ptr = threadSpecificLogPtr.get(); \
if (PULSAR_UNLIKELY(!ptr)) { \
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "ProducerImpl.h"

namespace pulsar {
DECLARE_LOG_OBJECT()

static const std::string EMPTY_STRING;

Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,17 @@
#include <boost/property_tree/ptree.hpp>
namespace ptree = boost::property_tree;

#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunknown-warning-option"
#endif

#include <boost/xpressive/xpressive.hpp>

#if defined(__clang__)
#pragma clang diagnostic pop
#endif

#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>

Expand Down
2 changes: 0 additions & 2 deletions pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
#include <pulsar/Authentication.h>
#include <boost/exception/all.hpp>

DECLARE_LOG_OBJECT()

using namespace pulsar;

TEST(BinaryLookupServiceTest, basicLookup) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/tests/CustomLoggerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
// reset to previous log factory
Client client("pulsar://localhost:6650", clientConfig);
client.close();
ASSERT_EQ(logLines.size(), 3);
ASSERT_EQ(logLines.size(), 7);
LogUtils::resetLoggerFactory();
});
testThread.join();
Expand All @@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
Client client("pulsar://localhost:6650", clientConfig);
client.close();
// custom logger didn't get any new lines
ASSERT_EQ(logLines.size(), 3);
ASSERT_EQ(logLines.size(), 7);
}

TEST(CustomLoggerTest, testConsoleLoggerFactory) {
Expand Down
2 changes: 0 additions & 2 deletions pulsar-client-cpp/tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include <string>
#include <lib/LogUtils.h>

DECLARE_LOG_OBJECT()

using namespace pulsar;
TEST(MessageTest, testMessageContents) {
MessageBuilder msgBuilder1;
Expand Down
3 changes: 0 additions & 3 deletions pulsar-client-cpp/tests/ReaderConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <lib/LogUtils.h>
#include <lib/ReaderImpl.h>
#include "NoOpsCryptoKeyReader.h"

DECLARE_LOG_OBJECT()

using namespace pulsar;

static const std::string lookupUrl = "pulsar://localhost:6650";
Expand Down

0 comments on commit 098ba16

Please sign in to comment.