Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED)
MESSAGE(STATUS "Threads library: " ${CMAKE_THREAD_LIBS_INIT})

if (NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif ()

# Compiler specific configuration:
# https://stackoverflow.com/questions/10046114/in-cmake-how-can-i-test-if-the-compiler-is-clang
if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
Expand Down Expand Up @@ -112,9 +116,6 @@ set(AUTOGEN_DIR ${PROJECT_BINARY_DIR}/generated)
file(MAKE_DIRECTORY ${AUTOGEN_DIR})

if (INTEGRATE_VCPKG)
if (NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 11)
endif ()
set(CMAKE_C_STANDARD 11)
set(Boost_NO_BOOST_CMAKE ON)
find_package(Boost REQUIRED)
Expand Down
5 changes: 5 additions & 0 deletions LegacyFindPackages.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ if (MSVC)
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_DEBUG ${CMAKE_CXX_FLAGS_DEBUG})
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELEASE ${CMAKE_CXX_FLAGS_RELEASE})
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELWITHDEBINFO ${CMAKE_CXX_FLAGS_RELWITHDEBINFO})
if (NOT CMAKE_CL_64)
# When building with a 32-bit cl.exe, the virtual address space is limited to 2GB, which could be
# reached with /O2 optimization. Use /Os for smaller code size.
string(REGEX REPLACE "/O2" "/Os" CMAKE_CXX_FLAGS_RELEASE ${CMAKE_CXX_FLAGS_RELEASE})
endif ()
message(STATUS "CMAKE_CXX_FLAGS_DEBUG: " ${CMAKE_CXX_FLAGS_DEBUG})
message(STATUS "CMAKE_CXX_FLAGS_RELEASE: " ${CMAKE_CXX_FLAGS_RELEASE})
message(STATUS "CMAKE_CXX_FLAGS_RELWITHDEBINFO: " ${CMAKE_CXX_FLAGS_RELWITHDEBINFO})
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ cmake -B build -DINTEGRATE_VCPKG=ON
cmake --build build -j8
```

> - Before 4.0.0, C++11 is required.
> - Since 4.0.0, C++17 is required.

The 1st step will download vcpkg and then install all dependencies according to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the CMake build directory.

> You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have vcpkg installed.
Expand Down
5 changes: 2 additions & 3 deletions lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
auto weakSelf = weak_from_this();
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
auto self = weakSelf.lock();
if (self && !ec) {
if (auto self = weakSelf.lock(); self && !ec) {
auto consumer = consumer_.lock();
if (!consumer || consumer->isClosingOrClosed()) {
return;
Expand Down
19 changes: 9 additions & 10 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <openssl/x509.h>
#include <pulsar/MessageIdBuilder.h>

#include <boost/optional.hpp>
#include <fstream>

#include "AsioDefines.h"
Expand Down Expand Up @@ -1127,19 +1126,19 @@ void ClientConnection::sendPendingCommands() {

if (--pendingWriteOperations_ > 0) {
assert(!pendingWriteBuffers_.empty());
boost::any any = pendingWriteBuffers_.front();
auto any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();

auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
SharedBuffer buffer = std::any_cast<SharedBuffer>(any);
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler(
[this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));

auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
auto args = std::any_cast<std::shared_ptr<SendArguments>>(any);
BaseCommand outgoingCmd;
PairSharedBuffer buffer =
Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
Expand Down Expand Up @@ -1702,9 +1701,9 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
data.topicEpoch = std::make_optional(producerSuccess.topic_epoch());
} else {
data.topicEpoch = boost::none;
data.topicEpoch = std::nullopt;
}
requestData.promise.setValue(data);
cancelTimer(*requestData.timer);
Expand Down Expand Up @@ -1805,7 +1804,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co
}
}

boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer) {
if (tlsSocket_) {
if (closeProducer.has_assignedbrokerserviceurltls()) {
Expand All @@ -1814,10 +1813,10 @@ boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
} else if (closeProducer.has_assignedbrokerserviceurl()) {
return closeProducer.assignedbrokerserviceurl();
}
return boost::none;
return {};
}

boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseConsumer& closeConsumer) {
if (tlsSocket_) {
if (closeConsumer.has_assignedbrokerserviceurltls()) {
Expand All @@ -1826,7 +1825,7 @@ boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
} else if (closeConsumer.has_assignedbrokerserviceurl()) {
return closeConsumer.assignedbrokerserviceurl();
}
return boost::none;
return {};
}

void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) {
Expand Down
18 changes: 8 additions & 10 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pulsar/ClientConfiguration.h>
#include <pulsar/defines.h>

#include <any>
#include <atomic>
#include <cstdint>
#ifdef USE_ASIO
Expand All @@ -37,8 +38,6 @@
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#endif
#include <boost/any.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <functional>
#include <memory>
Expand All @@ -53,6 +52,9 @@
#include "SharedBuffer.h"
#include "TimeUtils.h"
#include "UtilAllocator.h"

using std::optional;

namespace pulsar {

class PulsarFriend;
Expand Down Expand Up @@ -108,7 +110,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
boost::optional<uint64_t> topicEpoch;
optional<uint64_t> topicEpoch;
};

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
Expand Down Expand Up @@ -141,10 +143,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
ConnectionPool& pool, size_t poolIndex);
~ClientConnection();

#if __cplusplus < 201703L
std::weak_ptr<ClientConnection> weak_from_this() noexcept { return shared_from_this(); }
#endif

/*
* starts tcp connect_async
* @return future<ConnectionPtr> which is not yet set
Expand Down Expand Up @@ -378,7 +376,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::unique_lock<std::mutex> Lock;

// Pending buffers to write on the socket
std::deque<boost::any> pendingWriteBuffers_;
std::deque<std::any> pendingWriteBuffers_;
int pendingWriteOperations_ = 0;

SharedBuffer outgoingBuffer_;
Expand Down Expand Up @@ -426,8 +424,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
void handleAckResponse(const proto::CommandAckResponse&);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
// This method must be called when `mutex_` is held
void unsafeRemovePendingRequest(long requestId);
Expand Down
5 changes: 3 additions & 2 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "OpSendMsg.h"
#include "PulsarApi.pb.h"
#include "Url.h"
#include "boost/throw_exception.hpp"
#include "checksum/ChecksumProvider.h"

using namespace pulsar;
Expand Down Expand Up @@ -329,7 +330,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
Expand Down Expand Up @@ -416,7 +417,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
ProducerAccessMode accessMode, optional<uint64_t> topicEpoch,
const std::string& initialSubscriptionName) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
Expand Down
23 changes: 13 additions & 10 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>

#include <boost/optional.hpp>
#include <optional>
#include <set>

#include "ProtoApiEnums.h"
Expand All @@ -41,6 +41,7 @@ class MessageIdImpl;
using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
class BitSet;
struct SendArguments;
using std::optional;

namespace proto {
class BaseCommand;
Expand Down Expand Up @@ -102,14 +103,16 @@ class Commands {
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, ChecksumType checksumType,
const SendArguments& args);

static SharedBuffer newSubscribe(
const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
const KeySharedPolicy& keySharedPolicy, int priorityLevel = 0);
static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName,
SubscriptionMode subscriptionMode, optional<MessageId> startMessageId,
bool readCompacted, const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
bool replicateSubscriptionState, const KeySharedPolicy& keySharedPolicy,
int priorityLevel = 0);

static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

Expand All @@ -118,7 +121,7 @@ class Commands {
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
ProducerAccessMode accessMode, optional<uint64_t> topicEpoch,
const std::string& initialSubscriptionName);

static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,
Expand Down
Loading
Loading