diff --git a/CMakeLists.txt b/CMakeLists.txt index c2610317..0a0bd910 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -83,6 +83,10 @@ set(THREADS_PREFER_PTHREAD_FLAG TRUE) find_package(Threads REQUIRED) MESSAGE(STATUS "Threads library: " ${CMAKE_THREAD_LIBS_INIT}) +if (NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 17) +endif () + # Compiler specific configuration: # https://stackoverflow.com/questions/10046114/in-cmake-how-can-i-test-if-the-compiler-is-clang if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") @@ -112,9 +116,6 @@ set(AUTOGEN_DIR ${PROJECT_BINARY_DIR}/generated) file(MAKE_DIRECTORY ${AUTOGEN_DIR}) if (INTEGRATE_VCPKG) - if (NOT CMAKE_CXX_STANDARD) - set(CMAKE_CXX_STANDARD 11) - endif () set(CMAKE_C_STANDARD 11) set(Boost_NO_BOOST_CMAKE ON) find_package(Boost REQUIRED) diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake index 5004545b..3fa78d65 100644 --- a/LegacyFindPackages.cmake +++ b/LegacyFindPackages.cmake @@ -270,6 +270,11 @@ if (MSVC) string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_DEBUG ${CMAKE_CXX_FLAGS_DEBUG}) string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELEASE ${CMAKE_CXX_FLAGS_RELEASE}) string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELWITHDEBINFO ${CMAKE_CXX_FLAGS_RELWITHDEBINFO}) + if (NOT CMAKE_CL_64) + # When building with a 32-bit cl.exe, the virtual address space is limited to 2GB, which could be + # reached with /O2 optimization. Use /Os for smaller code size. + string(REGEX REPLACE "/O2" "/Os" CMAKE_CXX_FLAGS_RELEASE ${CMAKE_CXX_FLAGS_RELEASE}) + endif () message(STATUS "CMAKE_CXX_FLAGS_DEBUG: " ${CMAKE_CXX_FLAGS_DEBUG}) message(STATUS "CMAKE_CXX_FLAGS_RELEASE: " ${CMAKE_CXX_FLAGS_RELEASE}) message(STATUS "CMAKE_CXX_FLAGS_RELWITHDEBINFO: " ${CMAKE_CXX_FLAGS_RELWITHDEBINFO}) diff --git a/README.md b/README.md index 4c86b631..0bcae7be 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,9 @@ cmake -B build -DINTEGRATE_VCPKG=ON cmake --build build -j8 ``` +> - Before 4.0.0, C++11 is required. +> - Since 4.0.0, C++17 is required. + The 1st step will download vcpkg and then install all dependencies according to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the CMake build directory. > You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have vcpkg installed. diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc index 3a2a35d7..faeb0ad8 100644 --- a/lib/AckGroupingTrackerEnabled.cc +++ b/lib/AckGroupingTrackerEnabled.cc @@ -198,10 +198,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() { std::lock_guard lock(this->mutexTimer_); this->timer_ = this->executor_->createDeadlineTimer(); this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); - std::weak_ptr weakSelf = shared_from_this(); + auto weakSelf = weak_from_this(); this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { - auto self = weakSelf.lock(); - if (self && !ec) { + if (auto self = weakSelf.lock(); self && !ec) { auto consumer = consumer_.lock(); if (!consumer || consumer->isClosingOrClosed()) { return; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 49b2cbc2..0bd935d4 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -21,7 +21,6 @@ #include #include -#include #include #include "AsioDefines.h" @@ -1127,19 +1126,19 @@ void ClientConnection::sendPendingCommands() { if (--pendingWriteOperations_ > 0) { assert(!pendingWriteBuffers_.empty()); - boost::any any = pendingWriteBuffers_.front(); + auto any = pendingWriteBuffers_.front(); pendingWriteBuffers_.pop_front(); auto self = shared_from_this(); if (any.type() == typeid(SharedBuffer)) { - SharedBuffer buffer = boost::any_cast(any); + SharedBuffer buffer = std::any_cast(any); asyncWrite(buffer.const_asio_buffer(), customAllocWriteHandler( [this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); })); } else { assert(any.type() == typeid(std::shared_ptr)); - auto args = boost::any_cast>(any); + auto args = std::any_cast>(any); BaseCommand outgoingCmd; PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); @@ -1702,9 +1701,9 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess data.schemaVersion = producerSuccess.schema_version(); } if (producerSuccess.has_topic_epoch()) { - data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch()); + data.topicEpoch = std::make_optional(producerSuccess.topic_epoch()); } else { - data.topicEpoch = boost::none; + data.topicEpoch = std::nullopt; } requestData.promise.setValue(data); cancelTimer(*requestData.timer); @@ -1805,7 +1804,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co } } -boost::optional ClientConnection::getAssignedBrokerServiceUrl( +optional ClientConnection::getAssignedBrokerServiceUrl( const proto::CommandCloseProducer& closeProducer) { if (tlsSocket_) { if (closeProducer.has_assignedbrokerserviceurltls()) { @@ -1814,10 +1813,10 @@ boost::optional ClientConnection::getAssignedBrokerServiceUrl( } else if (closeProducer.has_assignedbrokerserviceurl()) { return closeProducer.assignedbrokerserviceurl(); } - return boost::none; + return {}; } -boost::optional ClientConnection::getAssignedBrokerServiceUrl( +optional ClientConnection::getAssignedBrokerServiceUrl( const proto::CommandCloseConsumer& closeConsumer) { if (tlsSocket_) { if (closeConsumer.has_assignedbrokerserviceurltls()) { @@ -1826,7 +1825,7 @@ boost::optional ClientConnection::getAssignedBrokerServiceUrl( } else if (closeConsumer.has_assignedbrokerserviceurl()) { return closeConsumer.assignedbrokerserviceurl(); } - return boost::none; + return {}; } void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index cf6be650..18a7d846 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #ifdef USE_ASIO @@ -37,8 +38,6 @@ #include #include #endif -#include -#include #include #include #include @@ -53,6 +52,9 @@ #include "SharedBuffer.h" #include "TimeUtils.h" #include "UtilAllocator.h" + +using std::optional; + namespace pulsar { class PulsarFriend; @@ -108,7 +110,7 @@ struct ResponseData { std::string producerName; int64_t lastSequenceId; std::string schemaVersion; - boost::optional topicEpoch; + optional topicEpoch; }; typedef std::shared_ptr> NamespaceTopicsPtr; @@ -141,10 +143,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this weak_from_this() noexcept { return shared_from_this(); } -#endif - /* * starts tcp connect_async * @return future which is not yet set @@ -378,7 +376,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this Lock; // Pending buffers to write on the socket - std::deque pendingWriteBuffers_; + std::deque pendingWriteBuffers_; int pendingWriteOperations_ = 0; SharedBuffer outgoingBuffer_; @@ -426,8 +424,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&); - boost::optional getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&); + optional getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&); + optional getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&); std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&); // This method must be called when `mutex_` is held void unsafeRemovePendingRequest(long requestId); diff --git a/lib/Commands.cc b/lib/Commands.cc index dd62b217..3c687c0a 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -34,6 +34,7 @@ #include "OpSendMsg.h" #include "PulsarApi.pb.h" #include "Url.h" +#include "boost/throw_exception.hpp" #include "checksum/ChecksumProvider.h" using namespace pulsar; @@ -329,7 +330,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication, SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, - boost::optional startMessageId, bool readCompacted, + optional startMessageId, bool readCompacted, const std::map& metadata, const std::map& subscriptionProperties, const SchemaInfo& schemaInfo, @@ -416,7 +417,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, boost::optional topicEpoch, + ProducerAccessMode accessMode, optional topicEpoch, const std::string& initialSubscriptionName) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); diff --git a/lib/Commands.h b/lib/Commands.h index 15c31662..8403d6e2 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -25,7 +25,7 @@ #include #include -#include +#include #include #include "ProtoApiEnums.h" @@ -41,6 +41,7 @@ class MessageIdImpl; using MessageIdImplPtr = std::shared_ptr; class BitSet; struct SendArguments; +using std::optional; namespace proto { class BaseCommand; @@ -102,14 +103,16 @@ class Commands { static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, ChecksumType checksumType, const SendArguments& args); - static SharedBuffer newSubscribe( - const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, - CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, - boost::optional startMessageId, bool readCompacted, - const std::map& metadata, - const std::map& subscriptionProperties, const SchemaInfo& schemaInfo, - CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState, - const KeySharedPolicy& keySharedPolicy, int priorityLevel = 0); + static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription, + uint64_t consumerId, uint64_t requestId, + CommandSubscribe_SubType subType, const std::string& consumerName, + SubscriptionMode subscriptionMode, optional startMessageId, + bool readCompacted, const std::map& metadata, + const std::map& subscriptionProperties, + const SchemaInfo& schemaInfo, + CommandSubscribe_InitialPosition subscriptionInitialPosition, + bool replicateSubscriptionState, const KeySharedPolicy& keySharedPolicy, + int priorityLevel = 0); static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId); @@ -118,7 +121,7 @@ class Commands { const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, boost::optional topicEpoch, + ProducerAccessMode accessMode, optional topicEpoch, const std::string& initialSubscriptionName); static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet, diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 3d5d294b..4781e966 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -59,8 +59,7 @@ DECLARE_LOG_OBJECT() using std::chrono::milliseconds; using std::chrono::seconds; -static boost::optional getStartMessageId(const boost::optional& startMessageId, - bool inclusive) { +static optional getStartMessageId(const optional& startMessageId, bool inclusive) { if (!inclusive || !startMessageId) { return startMessageId; } @@ -69,7 +68,7 @@ static boost::optional getStartMessageId(const boost::optional(Commands::getMessageIdImpl(startMessageId.value()).get()); if (chunkMsgIdImpl) { - return boost::optional{chunkMsgIdImpl->getChunkedMessageIds().front()}; + return optional{chunkMsgIdImpl->getChunkedMessageIds().front()}; } return startMessageId; } @@ -97,7 +96,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic bool hasParent /* = false by default */, const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, Commands::SubscriptionMode subscriptionMode, - const boost::optional& startMessageId) + const optional& startMessageId) : ConsumerImplBase( client, topic, Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()), @@ -243,7 +242,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c Lock lockForMessageId(mutexForMessageId_); clearReceiveQueue(); const auto subscribeMessageId = - (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_.get() : boost::none; + (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_.get() : std::nullopt; lockForMessageId.unlock(); unAckedMessageTrackerPtr_->clear(); @@ -433,7 +432,7 @@ void ConsumerImpl::discardChunkMessages(const std::string& uuid, const MessageId void ConsumerImpl::triggerCheckExpiredChunkedTimer() { checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { auto self = weakSelf.lock(); if (!self) { @@ -464,11 +463,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { }); } -boost::optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, - const proto::MessageMetadata& metadata, - const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx, - MessageId& messageId) { +optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, + const proto::MessageMetadata& metadata, + const proto::MessageIdData& messageIdData, + const ClientConnectionPtr& cnx, + MessageId& messageId) { const auto chunkId = metadata.chunk_id(); const auto& uuid = metadata.uuid(); LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid @@ -521,14 +520,14 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff lock.unlock(); increaseAvailablePermits(cnx); trackMessage(messageId); - return boost::none; + return {}; } chunkedMsgCtx.appendChunk(messageId, payload); if (!chunkedMsgCtx.isCompleted()) { lock.unlock(); increaseAvailablePermits(cnx); - return boost::none; + return {}; } messageId = std::make_shared(chunkedMsgCtx.moveChunkedMessageIds())->build(); @@ -541,7 +540,7 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) { return wholePayload; } else { - return boost::none; + return {}; } } @@ -1124,7 +1123,7 @@ void ConsumerImpl::clearReceiveQueue() { if (hasSoughtByTimestamp()) { // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare. - startMessageId_ = boost::none; + startMessageId_ = std::nullopt; } else { startMessageId_ = seekMessageId_.get(); } @@ -1313,11 +1312,11 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) { negativeAcksTracker_->add(messageId); } -void ConsumerImpl::disconnectConsumer() { disconnectConsumer(boost::none); } +void ConsumerImpl::disconnectConsumer() { disconnectConsumer(std::nullopt); } -void ConsumerImpl::disconnectConsumer(const boost::optional& assignedBrokerUrl) { +void ConsumerImpl::disconnectConsumer(const optional& assignedBrokerUrl) { LOG_INFO("Broker notification of Closed consumer: " - << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + assignedBrokerUrl.get()) : "")); + << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + *assignedBrokerUrl) : "")); resetCnx(); scheduleReconnection(assignedBrokerUrl); } @@ -1745,7 +1744,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c seekCallback_ = callback; LOG_INFO(getName() << " Seeking subscription to " << seekArg); - std::weak_ptr weakSelf{get_shared_this_ptr()}; + auto weakSelf = weak_from_this(); cnx->sendRequestWithId(seek, requestId) .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, @@ -1851,9 +1850,9 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, const Proces } for (const auto& message : messages.value()) { - std::weak_ptr weakSelf{get_shared_this_ptr()}; - deadLetterProducer_->getFuture().addListener([weakSelf, message, messageId, cb](Result res, - Producer producer) { + auto weakSelf = weak_from_this(); + deadLetterProducer_->getFuture().addListener([this, weakSelf, message, messageId, cb]( + Result res, Producer producer) { auto self = weakSelf.lock(); if (!self) { return; @@ -1872,30 +1871,29 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, const Proces if (message.hasOrderingKey()) { msgBuilder.setOrderingKey(message.getOrderingKey()); } - producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId, messageId, cb]( + producer.sendAsync(msgBuilder.build(), [this, weakSelf, originMessageId, messageId, cb]( Result res, const MessageId& messageIdInDLQ) { auto self = weakSelf.lock(); if (!self) { return; } if (res == ResultOk) { - if (self->state_ != Ready) { + if (state_ != Ready) { LOG_WARN( "Send to the DLQ successfully, but consumer is not ready. ignore acknowledge : " - << self->state_); + << state_); cb(false); return; } - self->possibleSendToDeadLetterTopicMessages_.remove(messageId); - self->acknowledgeAsync(originMessageId, [weakSelf, originMessageId, cb](Result result) { + possibleSendToDeadLetterTopicMessages_.remove(messageId); + acknowledgeAsync(originMessageId, [this, weakSelf, originMessageId, cb](Result result) { auto self = weakSelf.lock(); if (!self) { return; } if (result != ResultOk) { - LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {" - << self->getConsumerName() << "} Failed to acknowledge the message {" - << originMessageId + LOG_WARN("{" << topic() << "} {" << subscription_ << "} {" << getConsumerName() + << "} Failed to acknowledge the message {" << originMessageId << "} of the original topic but send to the DLQ successfully : " << result); cb(false); @@ -1906,9 +1904,9 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, const Proces } }); } else { - LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {" - << self->getConsumerName() << "} Failed to send DLQ message to {" - << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id " + LOG_WARN("{" << topic() << "} {" << subscription_ << "} {" << getConsumerName() + << "} Failed to send DLQ message to {" + << deadLetterPolicy_.getDeadLetterTopic() << "} for message id " << "{" << originMessageId << "} : " << res); cb(false); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 5e06723b..c1df0804 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -21,7 +21,6 @@ #include -#include #include #include #include @@ -92,7 +91,7 @@ class ConsumerImpl : public ConsumerImplBase { const ExecutorServicePtr& listenerExecutor = ExecutorServicePtr(), bool hasParent = false, const ConsumerTopicType consumerTopicType = NonPartitioned, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, - const boost::optional& startMessageId = boost::none); + const optional& startMessageId = optional()); ~ConsumerImpl(); void setPartitionIndex(int partitionIndex); int getPartitionIndex(); @@ -146,7 +145,7 @@ class ConsumerImpl : public ConsumerImplBase { void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) override; virtual void disconnectConsumer(); - virtual void disconnectConsumer(const boost::optional& assignedBrokerUrl); + virtual void disconnectConsumer(const optional& assignedBrokerUrl); Result fetchSingleMessageFromBroker(Message& msg); virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType); @@ -270,7 +269,7 @@ class ConsumerImpl : public ConsumerImplBase { std::atomic seekStatus_{SeekStatus::NOT_STARTED}; Synchronized seekCallback_{[](Result) {}}; - Synchronized> startMessageId_; + Synchronized> startMessageId_; Synchronized seekMessageId_{MessageId::earliest()}; std::atomic hasSoughtByTimestamp_{false}; @@ -368,10 +367,10 @@ class ConsumerImpl : public ConsumerImplBase { * @return the concatenated payload if chunks are concatenated into a completed message payload * successfully, else Optional::empty() */ - boost::optional processMessageChunk(const SharedBuffer& payload, - const proto::MessageMetadata& metadata, - const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx, MessageId& messageId); + optional processMessageChunk(const SharedBuffer& payload, + const proto::MessageMetadata& metadata, + const proto::MessageIdData& messageIdData, + const ClientConnectionPtr& cnx, MessageId& messageId); bool hasMoreMessages() const { std::lock_guard lock{mutexForMessageId_}; diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc index 171256d5..55d44d33 100644 --- a/lib/ConsumerImplBase.cc +++ b/lib/ConsumerImplBase.cc @@ -50,11 +50,10 @@ ConsumerImplBase::ConsumerImplBase(const ClientImplPtr& client, const std::strin void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { if (timeoutMs > 0) { batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs)); - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); - if (self && !ec) { - self->doBatchReceiveTimeTask(); + if (auto self = weakSelf.lock(); self && !ec) { + std::static_pointer_cast(self)->doBatchReceiveTimeTask(); } }); } diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index ffc4e2c0..37c6e2d5 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -61,10 +61,9 @@ void HandlerBase::start() { grabCnx(); } creationTimer_->expires_after(operationTimeut_); - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) { - auto self = weakSelf.lock(); - if (self && !error) { + if (auto self = weakSelf.lock(); self && !error) { LOG_WARN("Cancel the pending reconnection due to the start timeout"); connectionFailed(ResultTimeout); cancelTimer(*timer_); @@ -86,18 +85,18 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) { connection_ = cnx; } -void HandlerBase::grabCnx() { grabCnx(boost::none); } +void HandlerBase::grabCnx() { grabCnx(std::nullopt); } Future HandlerBase::getConnection( - const ClientImplPtr& client, const boost::optional& assignedBrokerUrl) { + const ClientImplPtr& client, const optional& assignedBrokerUrl) { if (assignedBrokerUrl && client->getLookupCount() > 0) { - return client->connect(getRedirectedClusterURI(), assignedBrokerUrl.get(), connectionKeySuffix_); + return client->connect(getRedirectedClusterURI(), *assignedBrokerUrl, connectionKeySuffix_); } else { return client->getConnection(getRedirectedClusterURI(), topic(), connectionKeySuffix_); } } -void HandlerBase::grabCnx(const boost::optional& assignedBrokerUrl) { +void HandlerBase::grabCnx(const optional& assignedBrokerUrl) { bool expectedState = false; if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) { LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection"); @@ -177,8 +176,8 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& break; } } -void HandlerBase::scheduleReconnection() { scheduleReconnection(boost::none); } -void HandlerBase::scheduleReconnection(const boost::optional& assignedBrokerUrl) { +void HandlerBase::scheduleReconnection() { scheduleReconnection(std::nullopt); } +void HandlerBase::scheduleReconnection(const optional& assignedBrokerUrl) { const auto state = state_.load(); if (state == Pending || state == Ready) { @@ -189,10 +188,9 @@ void HandlerBase::scheduleReconnection(const boost::optional& assig // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled // so we will not run into the case where grabCnx is invoked on out of scope handler auto name = getName(); - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); timer_->async_wait([name, weakSelf, assignedBrokerUrl](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); - if (self) { + if (auto self = weakSelf.lock()) { self->handleTimeout(ec, assignedBrokerUrl); } else { LOG_WARN(name << "Cancel the reconnection since the handler is destroyed"); @@ -201,7 +199,7 @@ void HandlerBase::scheduleReconnection(const boost::optional& assig } } -void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const boost::optional& assignedBrokerUrl) { +void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const optional& assignedBrokerUrl) { if (ec) { LOG_INFO(getName() << "Ignoring timer cancelled event, code[" << ec << "]"); } else { diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 967322fe..acce15d9 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -20,9 +20,9 @@ #define _PULSAR_HANDLER_BASE_HEADER_ #include -#include #include #include +#include #include #include "AsioTimer.h" @@ -30,6 +30,8 @@ #include "Future.h" #include "TimeUtils.h" +using std::optional; + namespace pulsar { class ClientImpl; @@ -60,7 +62,7 @@ class HandlerBase : public std::enable_shared_from_this { * tries reconnection and sets connection_ to valid object * @param assignedBrokerUrl assigned broker url to directly connect to without lookup */ - void grabCnx(const boost::optional& assignedBrokerUrl); + void grabCnx(const optional& assignedBrokerUrl); /* * tries reconnection and sets connection_ to valid object @@ -71,7 +73,7 @@ class HandlerBase : public std::enable_shared_from_this { * Schedule reconnection after backoff time * @param assignedBrokerUrl assigned broker url to directly connect to without lookup */ - void scheduleReconnection(const boost::optional& assignedBrokerUrl); + void scheduleReconnection(const optional& assignedBrokerUrl); /* * Schedule reconnection after backoff time */ @@ -108,11 +110,11 @@ class HandlerBase : public std::enable_shared_from_this { const std::shared_ptr topic_; Future getConnection(const ClientImplPtr& client, - const boost::optional& assignedBrokerUrl); + const optional& assignedBrokerUrl); void handleDisconnection(Result result, const ClientConnectionPtr& cnx); - void handleTimeout(const ASIO_ERROR& ec, const boost::optional& assignedBrokerUrl); + void handleTimeout(const ASIO_ERROR& ec, const optional& assignedBrokerUrl); protected: ClientImplWeakPtr client_; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 7d734036..6e0ba86c 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -46,7 +46,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const ClientImplPtr& client, co const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode subscriptionMode, - const boost::optional& startMessageId) + const optional& startMessageId) : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, lookupServicePtr, interceptors, subscriptionMode, startMessageId) { topicsPartitions_[topicName->toString()] = numPartitions; @@ -56,7 +56,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl( const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, - Commands::SubscriptionMode subscriptionMode, const boost::optional& startMessageId) + Commands::SubscriptionMode subscriptionMode, const optional& startMessageId) : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, client->getListenerExecutorProvider()->get()), @@ -448,7 +448,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( } void MultiTopicsConsumerImpl::closeAsync(const ResultCallback& originalCallback) { - std::weak_ptr weakSelf{get_shared_this_ptr()}; + auto weakSelf = weak_from_this(); auto callback = [weakSelf, originalCallback](Result result) { auto self = weakSelf.lock(); if (self) { @@ -935,7 +935,7 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, const ResultCall beforeSeek(); auto weakSelf = weak_from_this(); - optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result result) { + optConsumer.value()->seekAsync(msgId, [this, weakSelf, callback](Result result) { auto self = weakSelf.lock(); if (self) { afterSeek(); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index e92ec0ec..b22227e3 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -58,14 +58,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, - const boost::optional& startMessageId = boost::none); + const optional& startMessageId = optional{}); MultiTopicsConsumerImpl(const ClientImplPtr& client, const std::vector& topics, const std::string& subscriptionName, const TopicNamePtr& topicName, const ConsumerConfiguration& conf, const LookupServicePtr& lookupServicePtr_, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, - const boost::optional& startMessageId = boost::none); + const optional& startMessageId = optional{}); ~MultiTopicsConsumerImpl(); // overrided methods from ConsumerImplBase @@ -131,7 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { const std::vector topics_; std::queue pendingReceives_; const Commands::SubscriptionMode subscriptionMode_; - boost::optional startMessageId_; + optional startMessageId_; ConsumerInterceptorsPtr interceptors_; std::atomic_bool duringSeek_{false}; diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index b691b18c..cdff6173 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -55,7 +55,7 @@ void NegativeAcksTracker::scheduleTimer() { if (closed_) { return; } - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); timer_->expires_after(timerInterval_); timer_->async_wait([weakSelf](const ASIO_ERROR &ec) { if (auto self = weakSelf.lock()) { diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc index c68d23a2..9cc62153 100644 --- a/lib/PeriodicTask.cc +++ b/lib/PeriodicTask.cc @@ -28,11 +28,10 @@ void PeriodicTask::start() { } state_ = Ready; if (periodMs_ >= 0) { - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); timer_->expires_after(std::chrono::milliseconds(periodMs_)); timer_->async_wait([weakSelf](const ErrorCode& ec) { - auto self = weakSelf.lock(); - if (self) { + if (auto self = weakSelf.lock()) { self->handleTimeout(ec); } }); diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc index 278871c4..f2ac5926 100644 --- a/lib/ProducerConfiguration.cc +++ b/lib/ProducerConfiguration.cc @@ -38,7 +38,7 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat } ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) { - impl_->producerName = boost::make_optional(producerName); + impl_->producerName = std::make_optional(producerName); return *this; } @@ -47,7 +47,7 @@ const std::string& ProducerConfiguration::getProducerName() const { } ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) { - impl_->initialSequenceId = boost::make_optional(initialSequenceId); + impl_->initialSequenceId = std::make_optional(initialSequenceId); return *this; } diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index c3240209..1ca2ce4f 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -21,15 +21,16 @@ #include -#include -#include +#include + +using std::optional; namespace pulsar { struct ProducerConfigurationImpl { SchemaInfo schemaInfo; - boost::optional producerName; - boost::optional initialSequenceId; + optional producerName; + optional initialSequenceId; int sendTimeoutMs{30000}; CompressionType compressionType{CompressionNone}; int maxPendingMessages{1000}; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 21c38c40..9d6a9a08 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -974,14 +974,14 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer encryptedPayload); } -void ProducerImpl::disconnectProducer(const boost::optional& assignedBrokerUrl) { +void ProducerImpl::disconnectProducer(const optional& assignedBrokerUrl) { LOG_INFO("Broker notification of Closed producer: " - << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + assignedBrokerUrl.get()) : "")); + << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + *assignedBrokerUrl) : "")); resetCnx(); scheduleReconnection(assignedBrokerUrl); } -void ProducerImpl::disconnectProducer() { disconnectProducer(boost::none); } +void ProducerImpl::disconnectProducer() { disconnectProducer(std::nullopt); } void ProducerImpl::start() { HandlerBase::start(); diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 77bd6d1a..26207f80 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -26,7 +26,6 @@ #include #endif #include -#include #include #include @@ -99,7 +98,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { bool ackReceived(uint64_t sequenceId, MessageId& messageId); - virtual void disconnectProducer(const boost::optional& assignedBrokerUrl); + virtual void disconnectProducer(const optional& assignedBrokerUrl); virtual void disconnectProducer(); uint64_t getProducerId() const; @@ -209,7 +208,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { MemoryLimitController& memoryLimitController_; const bool chunkingEnabled_; - boost::optional topicEpoch; + optional topicEpoch; ProducerInterceptorsPtr interceptors_; diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index f2d390dc..fa4c8fc1 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -80,7 +80,7 @@ class RetryableOperationCache : public std::enable_shared_from_this weakSelf{this->shared_from_this()}; + auto weakSelf = this->weak_from_this(); future.addListener([this, weakSelf, key, operation](Result, const T&) { auto self = weakSelf.lock(); if (!self) { diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h index dacaf45c..5f51cfd1 100644 --- a/lib/SynchronizedHashMap.h +++ b/lib/SynchronizedHashMap.h @@ -19,14 +19,16 @@ #pragma once #include -#include #include #include #include +#include #include #include #include +using std::optional; + namespace pulsar { class SharedFuture { @@ -46,7 +48,7 @@ class SynchronizedHashMap { using Lock = std::lock_guard; public: - using OptValue = boost::optional; + using OptValue = optional; using PairVector = std::vector>; using MapType = std::unordered_map; using Iterator = typename MapType::iterator; @@ -60,12 +62,12 @@ class SynchronizedHashMap { } // Put a new key-value pair if the key does not exist. - // Return boost::none if the key already exists or the existing value. + // Return an empty optional if the key already exists or the existing value. OptValue putIfAbsent(const K& key, const V& value) { Lock lock(mutex_); auto pair = data_.emplace(key, value); if (pair.second) { - return boost::none; + return {}; } else { return pair.first->second; } @@ -157,7 +159,7 @@ class SynchronizedHashMap { if (it != data_.end()) { return it->second; } else { - return boost::none; + return {}; } } @@ -168,18 +170,18 @@ class SynchronizedHashMap { return kv.second; } } - return boost::none; + return {}; } OptValue remove(const K& key) { Lock lock(mutex_); auto it = data_.find(key); if (it != data_.end()) { - auto result = boost::make_optional(std::move(it->second)); + auto result = std::make_optional(std::move(it->second)); data_.erase(it); return result; } else { - return boost::none; + return {}; } } diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc index e283a6fc..f634fa52 100644 --- a/lib/TableViewImpl.cc +++ b/lib/TableViewImpl.cc @@ -70,7 +70,7 @@ bool TableViewImpl::getValue(const std::string& key, std::string& value) const { return false; } -bool TableViewImpl::containsKey(const std::string& key) const { return data_.find(key) != boost::none; } +bool TableViewImpl::containsKey(const std::string& key) const { return static_cast(data_.find(key)); } std::unordered_map TableViewImpl::snapshot() { return data_.move(); } @@ -120,7 +120,7 @@ void TableViewImpl::handleMessage(const Message& msg) { void TableViewImpl::readAllExistingMessages(const Promise& promise, long startTime, long messagesRead) { - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); reader_->hasMessageAvailableAsync( [weakSelf, promise, startTime, messagesRead](Result result, bool hasMessage) { auto self = weakSelf.lock(); diff --git a/lib/UnAckedMessageTrackerEnabled.cc b/lib/UnAckedMessageTrackerEnabled.cc index 3e9ce0ed..e5bd3d21 100644 --- a/lib/UnAckedMessageTrackerEnabled.cc +++ b/lib/UnAckedMessageTrackerEnabled.cc @@ -39,10 +39,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { ExecutorServicePtr executorService = client->getIOExecutorProvider()->get(); timer_ = executorService->createDeadlineTimer(); timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_)); - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); timer_->async_wait([weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); - if (self && !ec) { + if (auto self = weakSelf.lock(); self && !ec) { self->timeoutHandler(); } }); diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc index 3dd1a730..ab4edd61 100644 --- a/lib/stats/ConsumerStatsImpl.cc +++ b/lib/stats/ConsumerStatsImpl.cc @@ -81,7 +81,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy void ConsumerStatsImpl::scheduleTimer() { timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc index a42532d4..84bd1e28 100644 --- a/lib/stats/ProducerStatsImpl.cc +++ b/lib/stats/ProducerStatsImpl.cc @@ -110,7 +110,7 @@ ProducerStatsImpl::~ProducerStatsImpl() { cancelTimer(*timer_); } void ProducerStatsImpl::scheduleTimer() { timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); - std::weak_ptr weakSelf{shared_from_this()}; + auto weakSelf = weak_from_this(); timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { diff --git a/pkg/apk/build-apk.sh b/pkg/apk/build-apk.sh index 68fdf893..4b802288 100755 --- a/pkg/apk/build-apk.sh +++ b/pkg/apk/build-apk.sh @@ -45,7 +45,7 @@ cp -r /root/packages/pkg ./build apk add --allow-untrusted build/$PLATFORM/*.apk cd $ROOT_DIR/win-examples -g++ -o dynamic.out -std=c++11 ./example.cc -Wl,-rpath=/usr/lib -lpulsar +g++ -o dynamic.out -std=c++17 ./example.cc -Wl,-rpath=/usr/lib -lpulsar ./dynamic.out -g++ -o static.out -std=c++11 ./example.cc /usr/lib/libpulsarwithdeps.a -lpthread -ldl +g++ -o static.out -std=c++17 ./example.cc /usr/lib/libpulsarwithdeps.a -lpthread -ldl ./static.out diff --git a/pkg/mac/build-static-library.sh b/pkg/mac/build-static-library.sh index 449222b1..9190b860 100755 --- a/pkg/mac/build-static-library.sh +++ b/pkg/mac/build-static-library.sh @@ -70,7 +70,7 @@ cmake --build build-osx -j16 --target install cp ./build-osx/libpulsarwithdeps.a $INSTALL_DIR/lib/ # Test the libraries -clang++ win-examples/example.cc -o dynamic.out -std=c++11 -arch $ARCH -I $INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar +clang++ win-examples/example.cc -o dynamic.out -std=c++17 -arch $ARCH -I $INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar ./dynamic.out -clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I $INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a +clang++ win-examples/example.cc -o static.out -std=c++17 -arch $ARCH -I $INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a ./static.out diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc index 9bc1c52b..f690fd40 100644 --- a/tests/SynchronizedHashMapTest.cc +++ b/tests/SynchronizedHashMapTest.cc @@ -20,7 +20,6 @@ #include #include -#include #include #include #include @@ -101,8 +100,8 @@ TEST(SynchronizedHashMapTest, testForEach) { ASSERT_TRUE(values.empty()); ASSERT_EQ(result, 1); - ASSERT_EQ(m.putIfAbsent(1, 100), boost::none); - ASSERT_EQ(m.putIfAbsent(1, 101), boost::optional(100)); + ASSERT_EQ(m.putIfAbsent(1, 100), optional{}); + ASSERT_EQ(m.putIfAbsent(1, 101), optional(100)); m.forEachValue([&values](int value, const SharedFuture&) { values.emplace_back(value); }, [&result] { result = 2; }); ASSERT_EQ(values, (std::vector({100}))); @@ -116,8 +115,8 @@ TEST(SynchronizedHashMapTest, testForEach) { ASSERT_EQ(result, 1); values.clear(); - ASSERT_EQ(m.putIfAbsent(2, 200), boost::none); - ASSERT_EQ(m.putIfAbsent(2, 201), boost::optional(200)); + ASSERT_EQ(m.putIfAbsent(2, 200), optional{}); + ASSERT_EQ(m.putIfAbsent(2, 201), optional(200)); m.forEachValue([&values](int value, const SharedFuture&) { values.emplace_back(value); }, [&result] { result = 2; }); std::sort(values.begin(), values.end()); diff --git a/version.txt b/version.txt index 24a67297..8578fb2f 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -3.8.0-pre +4.0.0-pre