Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Make some clean up methods thread safe #11762

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,21 @@ void ClientImpl::shutdown() {
}
}

pool_.close();
if (producers.size() + consumers.size() > 0) {
LOG_DEBUG(producers.size() << " producers and " << consumers.size()
<< " consumers have been shutdown.");
}
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called before.
return;
}
LOG_DEBUG("ConnectionPool is closed");
ioExecutorProvider_->close();
LOG_DEBUG("ioExecutorProvider_ is closed");
listenerExecutorProvider_->close();
LOG_DEBUG("listenerExecutorProvider_ is closed");
partitionListenerExecutorProvider_->close();
LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
}

uint64_t ClientImpl::newProducerId() {
Expand Down
14 changes: 13 additions & 1 deletion pulsar-client-cpp/lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
poolConnections_(poolConnections),
mutex_() {}

void ConnectionPool::close() {
bool ConnectionPool::close() {
bool expectedState = false;
if (!closed_.compare_exchange_strong(expectedState, true)) {
return false;
}

std::unique_lock<std::mutex> lock(mutex_);
if (poolConnections_) {
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
Expand All @@ -52,10 +57,17 @@ void ConnectionPool::close() {
}
pool_.clear();
}
return true;
}

Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
const std::string& logicalAddress, const std::string& physicalAddress) {
if (closed_) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}

std::unique_lock<std::mutex> lock(mutex_);

if (poolConnections_) {
Expand Down
9 changes: 8 additions & 1 deletion pulsar-client-cpp/lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "ClientConnection.h"

#include <atomic>
#include <string>
#include <map>
#include <mutex>
Expand All @@ -36,7 +37,12 @@ class PULSAR_PUBLIC ConnectionPool {
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
const AuthenticationPtr& authentication, bool poolConnections = true);

void close();
/**
* Close the connection pool.
*
* @return false if it has already been closed.
*/
bool close();

/**
* Get a connection from the pool.
Expand Down Expand Up @@ -65,6 +71,7 @@ class PULSAR_PUBLIC ConnectionPool {
PoolMap pool_;
bool poolConnections_;
std::mutex mutex_;
std::atomic_bool closed_{false};

friend class ConnectionPoolTest;
};
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/ExecutorService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
}

void ExecutorService::close() {
bool expectedState = false;
if (!closed_.compare_exchange_strong(expectedState, true)) {
return;
}

io_service_->stop();
work_.reset();
// If this thread is attempting to join itself, do not. The destructor's
Expand Down Expand Up @@ -95,6 +100,8 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
}

void ExecutorServiceProvider::close() {
Lock lock(mutex_);

for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) {
if (*it != NULL) {
(*it)->close();
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ExecutorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef _PULSAR_EXECUTOR_SERVICE_HEADER_
#define _PULSAR_EXECUTOR_SERVICE_HEADER_

#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
Expand Down Expand Up @@ -73,6 +74,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
* io_service
*/
std::thread worker_;

std::atomic_bool closed_{false};
};

typedef std::shared_ptr<ExecutorService> ExecutorServicePtr;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/LogUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace pulsar {
#endif

#define DECLARE_LOG_OBJECT() \
inline pulsar::Logger* logger() { \
static pulsar::Logger* logger() { \
static thread_local std::unique_ptr<pulsar::Logger> threadSpecificLogPtr; \
pulsar::Logger* ptr = threadSpecificLogPtr.get(); \
if (PULSAR_UNLIKELY(!ptr)) { \
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "ProducerImpl.h"

namespace pulsar {
DECLARE_LOG_OBJECT()

static const std::string EMPTY_STRING;

Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,17 @@
#include <boost/property_tree/ptree.hpp>
namespace ptree = boost::property_tree;

#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunknown-warning-option"
#endif

#include <boost/xpressive/xpressive.hpp>

#if defined(__clang__)
#pragma clang diagnostic pop
#endif

#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>

Expand Down
2 changes: 0 additions & 2 deletions pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
#include <pulsar/Authentication.h>
#include <boost/exception/all.hpp>

DECLARE_LOG_OBJECT()

using namespace pulsar;

TEST(BinaryLookupServiceTest, basicLookup) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/tests/CustomLoggerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
// reset to previous log factory
Client client("pulsar://localhost:6650", clientConfig);
client.close();
ASSERT_EQ(logLines.size(), 3);
ASSERT_EQ(logLines.size(), 7);
LogUtils::resetLoggerFactory();
});
testThread.join();
Expand All @@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
Client client("pulsar://localhost:6650", clientConfig);
client.close();
// custom logger didn't get any new lines
ASSERT_EQ(logLines.size(), 3);
ASSERT_EQ(logLines.size(), 7);
}

TEST(CustomLoggerTest, testConsoleLoggerFactory) {
Expand Down
2 changes: 0 additions & 2 deletions pulsar-client-cpp/tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include <string>
#include <lib/LogUtils.h>

DECLARE_LOG_OBJECT()

using namespace pulsar;
TEST(MessageTest, testMessageContents) {
MessageBuilder msgBuilder1;
Expand Down
3 changes: 0 additions & 3 deletions pulsar-client-cpp/tests/ReaderConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <lib/LogUtils.h>
#include <lib/ReaderImpl.h>
#include "NoOpsCryptoKeyReader.h"

DECLARE_LOG_OBJECT()

using namespace pulsar;

static const std::string lookupUrl = "pulsar://localhost:6650";
Expand Down