Skip to content

Commit c64e0e9

Browse files
Bump the C++ standard to 17 (#525)
- Replace `boost::optional` with `std::optional` - Replace `boost::any` with `std::any` - Use `std::weak_from_this` to create a `weak_ptr` from a `shared_ptr` - Leverage initializers for if to simplify code
1 parent 93f10ef commit c64e0e9

31 files changed

+154
-150
lines changed

CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ set(THREADS_PREFER_PTHREAD_FLAG TRUE)
8383
find_package(Threads REQUIRED)
8484
MESSAGE(STATUS "Threads library: " ${CMAKE_THREAD_LIBS_INIT})
8585

86+
if (NOT CMAKE_CXX_STANDARD)
87+
set(CMAKE_CXX_STANDARD 17)
88+
endif ()
89+
8690
# Compiler specific configuration:
8791
# https://stackoverflow.com/questions/10046114/in-cmake-how-can-i-test-if-the-compiler-is-clang
8892
if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
@@ -112,9 +116,6 @@ set(AUTOGEN_DIR ${PROJECT_BINARY_DIR}/generated)
112116
file(MAKE_DIRECTORY ${AUTOGEN_DIR})
113117

114118
if (INTEGRATE_VCPKG)
115-
if (NOT CMAKE_CXX_STANDARD)
116-
set(CMAKE_CXX_STANDARD 11)
117-
endif ()
118119
set(CMAKE_C_STANDARD 11)
119120
set(Boost_NO_BOOST_CMAKE ON)
120121
find_package(Boost REQUIRED)

LegacyFindPackages.cmake

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ if (MSVC)
270270
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_DEBUG ${CMAKE_CXX_FLAGS_DEBUG})
271271
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELEASE ${CMAKE_CXX_FLAGS_RELEASE})
272272
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELWITHDEBINFO ${CMAKE_CXX_FLAGS_RELWITHDEBINFO})
273+
if (NOT CMAKE_CL_64)
274+
# When building with a 32-bit cl.exe, the virtual address space is limited to 2GB, which could be
275+
# reached with /O2 optimization. Use /Os for smaller code size.
276+
string(REGEX REPLACE "/O2" "/Os" CMAKE_CXX_FLAGS_RELEASE ${CMAKE_CXX_FLAGS_RELEASE})
277+
endif ()
273278
message(STATUS "CMAKE_CXX_FLAGS_DEBUG: " ${CMAKE_CXX_FLAGS_DEBUG})
274279
message(STATUS "CMAKE_CXX_FLAGS_RELEASE: " ${CMAKE_CXX_FLAGS_RELEASE})
275280
message(STATUS "CMAKE_CXX_FLAGS_RELWITHDEBINFO: " ${CMAKE_CXX_FLAGS_RELWITHDEBINFO})

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ cmake -B build -DINTEGRATE_VCPKG=ON
5757
cmake --build build -j8
5858
```
5959

60+
> - Before 4.0.0, C++11 is required.
61+
> - Since 4.0.0, C++17 is required.
62+
6063
The 1st step will download vcpkg and then install all dependencies according to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the CMake build directory.
6164

6265
> You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have vcpkg installed.

lib/AckGroupingTrackerEnabled.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
198198
std::lock_guard<std::mutex> lock(this->mutexTimer_);
199199
this->timer_ = this->executor_->createDeadlineTimer();
200200
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
201-
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
201+
auto weakSelf = weak_from_this();
202202
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
203-
auto self = weakSelf.lock();
204-
if (self && !ec) {
203+
if (auto self = weakSelf.lock(); self && !ec) {
205204
auto consumer = consumer_.lock();
206205
if (!consumer || consumer->isClosingOrClosed()) {
207206
return;

lib/ClientConnection.cc

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <openssl/x509.h>
2222
#include <pulsar/MessageIdBuilder.h>
2323

24-
#include <boost/optional.hpp>
2524
#include <fstream>
2625

2726
#include "AsioDefines.h"
@@ -1127,19 +1126,19 @@ void ClientConnection::sendPendingCommands() {
11271126

11281127
if (--pendingWriteOperations_ > 0) {
11291128
assert(!pendingWriteBuffers_.empty());
1130-
boost::any any = pendingWriteBuffers_.front();
1129+
auto any = pendingWriteBuffers_.front();
11311130
pendingWriteBuffers_.pop_front();
11321131

11331132
auto self = shared_from_this();
11341133
if (any.type() == typeid(SharedBuffer)) {
1135-
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
1134+
SharedBuffer buffer = std::any_cast<SharedBuffer>(any);
11361135
asyncWrite(buffer.const_asio_buffer(),
11371136
customAllocWriteHandler(
11381137
[this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); }));
11391138
} else {
11401139
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
11411140

1142-
auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
1141+
auto args = std::any_cast<std::shared_ptr<SendArguments>>(any);
11431142
BaseCommand outgoingCmd;
11441143
PairSharedBuffer buffer =
11451144
Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
@@ -1702,9 +1701,9 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
17021701
data.schemaVersion = producerSuccess.schema_version();
17031702
}
17041703
if (producerSuccess.has_topic_epoch()) {
1705-
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
1704+
data.topicEpoch = std::make_optional(producerSuccess.topic_epoch());
17061705
} else {
1707-
data.topicEpoch = boost::none;
1706+
data.topicEpoch = std::nullopt;
17081707
}
17091708
requestData.promise.setValue(data);
17101709
cancelTimer(*requestData.timer);
@@ -1805,7 +1804,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co
18051804
}
18061805
}
18071806

1808-
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
1807+
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
18091808
const proto::CommandCloseProducer& closeProducer) {
18101809
if (tlsSocket_) {
18111810
if (closeProducer.has_assignedbrokerserviceurltls()) {
@@ -1814,10 +1813,10 @@ boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
18141813
} else if (closeProducer.has_assignedbrokerserviceurl()) {
18151814
return closeProducer.assignedbrokerserviceurl();
18161815
}
1817-
return boost::none;
1816+
return {};
18181817
}
18191818

1820-
boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
1819+
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
18211820
const proto::CommandCloseConsumer& closeConsumer) {
18221821
if (tlsSocket_) {
18231822
if (closeConsumer.has_assignedbrokerserviceurltls()) {
@@ -1826,7 +1825,7 @@ boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
18261825
} else if (closeConsumer.has_assignedbrokerserviceurl()) {
18271826
return closeConsumer.assignedbrokerserviceurl();
18281827
}
1829-
return boost::none;
1828+
return {};
18301829
}
18311830

18321831
void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) {

lib/ClientConnection.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/ClientConfiguration.h>
2323
#include <pulsar/defines.h>
2424

25+
#include <any>
2526
#include <atomic>
2627
#include <cstdint>
2728
#ifdef USE_ASIO
@@ -37,8 +38,6 @@
3738
#include <boost/asio/ssl/stream.hpp>
3839
#include <boost/asio/strand.hpp>
3940
#endif
40-
#include <boost/any.hpp>
41-
#include <boost/optional.hpp>
4241
#include <deque>
4342
#include <functional>
4443
#include <memory>
@@ -53,6 +52,9 @@
5352
#include "SharedBuffer.h"
5453
#include "TimeUtils.h"
5554
#include "UtilAllocator.h"
55+
56+
using std::optional;
57+
5658
namespace pulsar {
5759

5860
class PulsarFriend;
@@ -108,7 +110,7 @@ struct ResponseData {
108110
std::string producerName;
109111
int64_t lastSequenceId;
110112
std::string schemaVersion;
111-
boost::optional<uint64_t> topicEpoch;
113+
optional<uint64_t> topicEpoch;
112114
};
113115

114116
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
@@ -141,10 +143,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
141143
ConnectionPool& pool, size_t poolIndex);
142144
~ClientConnection();
143145

144-
#if __cplusplus < 201703L
145-
std::weak_ptr<ClientConnection> weak_from_this() noexcept { return shared_from_this(); }
146-
#endif
147-
148146
/*
149147
* starts tcp connect_async
150148
* @return future<ConnectionPtr> which is not yet set
@@ -378,7 +376,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
378376
typedef std::unique_lock<std::mutex> Lock;
379377

380378
// Pending buffers to write on the socket
381-
std::deque<boost::any> pendingWriteBuffers_;
379+
std::deque<std::any> pendingWriteBuffers_;
382380
int pendingWriteOperations_ = 0;
383381

384382
SharedBuffer outgoingBuffer_;
@@ -426,8 +424,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
426424
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
427425
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
428426
void handleAckResponse(const proto::CommandAckResponse&);
429-
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
430-
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
427+
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
428+
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
431429
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
432430
// This method must be called when `mutex_` is held
433431
void unsafeRemovePendingRequest(long requestId);

lib/Commands.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "OpSendMsg.h"
3535
#include "PulsarApi.pb.h"
3636
#include "Url.h"
37+
#include "boost/throw_exception.hpp"
3738
#include "checksum/ChecksumProvider.h"
3839

3940
using namespace pulsar;
@@ -329,7 +330,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
329330
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
330331
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
331332
const std::string& consumerName, SubscriptionMode subscriptionMode,
332-
boost::optional<MessageId> startMessageId, bool readCompacted,
333+
optional<MessageId> startMessageId, bool readCompacted,
333334
const std::map<std::string, std::string>& metadata,
334335
const std::map<std::string, std::string>& subscriptionProperties,
335336
const SchemaInfo& schemaInfo,
@@ -416,7 +417,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
416417
const std::map<std::string, std::string>& metadata,
417418
const SchemaInfo& schemaInfo, uint64_t epoch,
418419
bool userProvidedProducerName, bool encrypted,
419-
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
420+
ProducerAccessMode accessMode, optional<uint64_t> topicEpoch,
420421
const std::string& initialSubscriptionName) {
421422
BaseCommand cmd;
422423
cmd.set_type(BaseCommand::PRODUCER);

lib/Commands.h

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#include <pulsar/Schema.h>
2626
#include <pulsar/defines.h>
2727

28-
#include <boost/optional.hpp>
28+
#include <optional>
2929
#include <set>
3030

3131
#include "ProtoApiEnums.h"
@@ -41,6 +41,7 @@ class MessageIdImpl;
4141
using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
4242
class BitSet;
4343
struct SendArguments;
44+
using std::optional;
4445

4546
namespace proto {
4647
class BaseCommand;
@@ -102,14 +103,16 @@ class Commands {
102103
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, ChecksumType checksumType,
103104
const SendArguments& args);
104105

105-
static SharedBuffer newSubscribe(
106-
const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
107-
CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
108-
boost::optional<MessageId> startMessageId, bool readCompacted,
109-
const std::map<std::string, std::string>& metadata,
110-
const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
111-
CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
112-
const KeySharedPolicy& keySharedPolicy, int priorityLevel = 0);
106+
static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
107+
uint64_t consumerId, uint64_t requestId,
108+
CommandSubscribe_SubType subType, const std::string& consumerName,
109+
SubscriptionMode subscriptionMode, optional<MessageId> startMessageId,
110+
bool readCompacted, const std::map<std::string, std::string>& metadata,
111+
const std::map<std::string, std::string>& subscriptionProperties,
112+
const SchemaInfo& schemaInfo,
113+
CommandSubscribe_InitialPosition subscriptionInitialPosition,
114+
bool replicateSubscriptionState, const KeySharedPolicy& keySharedPolicy,
115+
int priorityLevel = 0);
113116

114117
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
115118

@@ -118,7 +121,7 @@ class Commands {
118121
const std::map<std::string, std::string>& metadata,
119122
const SchemaInfo& schemaInfo, uint64_t epoch,
120123
bool userProvidedProducerName, bool encrypted,
121-
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
124+
ProducerAccessMode accessMode, optional<uint64_t> topicEpoch,
122125
const std::string& initialSubscriptionName);
123126

124127
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,

0 commit comments

Comments
 (0)