Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,15 @@ jobs:
Pop-Location
}

- name: Ensure vcpkg has full history(windows)
if: runner.os == 'Windows'
shell: pwsh
run: |
$isShallow = (git -C "${{ env.VCPKG_ROOT }}" rev-parse --is-shallow-repository).Trim()
if ($isShallow -eq "true") {
git -C "${{ env.VCPKG_ROOT }}" fetch --unshallow
}

- name: remove system vcpkg(windows)
if: runner.os == 'Windows'
run: rm -rf "$VCPKG_INSTALLATION_ROOT"
Expand Down
535 changes: 247 additions & 288 deletions lib/ClientConnection.cc

Large diffs are not rendered by default.

52 changes: 37 additions & 15 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <any>
#include <atomic>
#include <cstdint>
#include <future>
#include <optional>
#ifdef USE_ASIO
#include <asio/bind_executor.hpp>
#include <asio/io_context.hpp>
Expand All @@ -41,8 +43,10 @@
#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "AsioTimer.h"
Expand Down Expand Up @@ -156,11 +160,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
* Close the connection.
*
* @param result all pending futures will complete with this result
* @param detach remove it from the pool if it's true
*
* `detach` should only be false when the connection pool is closed.
*/
void close(Result result = ResultConnectError, bool detach = true);
const std::future<void>& close(Result result = ResultConnectError);

bool isClosed() const;

Expand Down Expand Up @@ -193,7 +194,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

const std::string& brokerAddress() const;

const std::string& cnxString() const;
auto cnxString() const { return *std::atomic_load(&cnxStringPtr_); }

int getServerProtocolVersion() const;

Expand All @@ -219,28 +220,48 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
mockingRequests_.store(true, std::memory_order_release);
}

void handleKeepAliveTimeout();
void handleKeepAliveTimeout(const ASIO_ERROR& ec);

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
DeadlineTimerPtr timer;
std::shared_ptr<std::atomic_bool> hasGotResponse{std::make_shared<std::atomic_bool>(false)};

void fail(Result result) {
cancelTimer(*timer);
promise.setFailed(result);
}
};

struct LookupRequestData {
LookupDataResultPromisePtr promise;
DeadlineTimerPtr timer;

void fail(Result result) {
cancelTimer(*timer);
promise->setFailed(result);
}
};

struct LastMessageIdRequestData {
GetLastMessageIdResponsePromisePtr promise;
DeadlineTimerPtr timer;

void fail(Result result) {
cancelTimer(*timer);
promise->setFailed(result);
}
};

struct GetSchemaRequest {
Promise<Result, SchemaInfo> promise;
DeadlineTimerPtr timer;

void fail(Result result) {
cancelTimer(*timer);
promise.setFailed(result);
}
};

/*
Expand Down Expand Up @@ -297,26 +318,26 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
}

template <typename ConstBufferSequence, typename WriteHandler>
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler handler) {
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler&& handler) {
if (isClosed()) {
return;
}
if (tlsSocket_) {
ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, handler));
ASIO::async_write(*tlsSocket_, buffers, std::forward<WriteHandler>(handler));
} else {
ASIO::async_write(*socket_, buffers, handler);
ASIO::async_write(*socket_, buffers, std::forward<WriteHandler>(handler));
}
}

template <typename MutableBufferSequence, typename ReadHandler>
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler handler) {
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler&& handler) {
if (isClosed()) {
return;
}
if (tlsSocket_) {
tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, handler));
tlsSocket_->async_read_some(buffers, std::forward<ReadHandler>(handler));
} else {
socket_->async_receive(buffers, handler);
socket_->async_receive(buffers, std::forward<ReadHandler>(handler));
}
}

Expand All @@ -337,7 +358,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;
ASIO::strand<ASIO::io_context::executor_type> strand_;

const std::string logicalAddress_;
/*
Expand All @@ -350,7 +370,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
ClientConfiguration::ProxyProtocol proxyProtocol_;

// Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
std::string cnxString_;
std::shared_ptr<std::string> cnxStringPtr_;

/*
* indicates if async connection establishment failed
Expand All @@ -360,7 +380,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
SharedBuffer incomingBuffer_;

Promise<Result, ClientConnectionWeakPtr> connectPromise_;
std::shared_ptr<PeriodicTask> connectTimeoutTask_;
const std::chrono::milliseconds connectTimeout_;
const DeadlineTimerPtr connectTimer_;

typedef std::map<long, PendingRequestData> PendingRequestsMap;
PendingRequestsMap pendingRequests_;
Expand Down Expand Up @@ -419,6 +440,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
const std::string clientVersion_;
ConnectionPool& pool_;
const size_t poolIndex_;
std::optional<std::future<void>> closeFuture_;

friend class PulsarFriend;
friend class ConsumerTest;
Expand Down
37 changes: 32 additions & 5 deletions lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,43 @@ bool ConnectionPool::close() {
return false;
}

std::vector<ClientConnectionPtr> connectionsToClose;
// ClientConnection::close() will remove the connection from the pool, which is not allowed when iterating
// over a map, so we store the connections to close in a vector first and don't iterate the pool when
// closing the connections.
std::unique_lock<std::recursive_mutex> lock(mutex_);
connectionsToClose.reserve(pool_.size());
for (auto&& kv : pool_) {
connectionsToClose.emplace_back(kv.second);
}
pool_.clear();
lock.unlock();

for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
auto& cnx = cnxIt->second;
for (auto&& cnx : connectionsToClose) {
if (cnx) {
// The 2nd argument is false because removing a value during the iteration will cause segfault
cnx->close(ResultDisconnected, false);
// Close with a fatal error to not let client retry
auto& future = cnx->close(ResultAlreadyClosed);
using namespace std::chrono_literals;
if (auto status = future.wait_for(5s); status != std::future_status::ready) {
LOG_WARN("Connection close timed out for " << cnx.get()->cnxString());
}
if (cnx.use_count() > 1) {
// There are some asynchronous operations that hold the reference on the connection, we should
// wait until them to finish. Otherwise, `io_context::stop()` will be called in
// `ClientImpl::shutdown()` when closing the `ExecutorServiceProvider`. Then
// `io_context::run()` will return and the `io_context` object will be destroyed. In this
// case, if there is any pending handler, it will crash.
for (int i = 0; i < 500 && cnx.use_count() > 1; i++) {
std::this_thread::sleep_for(10ms);
}
if (cnx.use_count() > 1) {
LOG_WARN("Connection still has " << (cnx.use_count() - 1)
<< " references after waiting for 5 seconds for "
<< cnx.get()->cnxString());
}
}
}
}
pool_.clear();
return true;
}

Expand Down
2 changes: 0 additions & 2 deletions lib/ExecutorService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ void ExecutorService::close(long timeoutMs) {
}
}

void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, std::move(task)); }

/////////////////////

ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
Expand Down
19 changes: 18 additions & 1 deletion lib/ExecutorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@

#include <atomic>
#ifdef USE_ASIO
#include <asio/dispatch.hpp>
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/post.hpp>
#include <asio/ssl.hpp>
#else
#include <boost/asio/dispatch.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/ssl.hpp>
#endif
#include <chrono>
Expand All @@ -37,6 +41,7 @@
#include <memory>
#include <mutex>
#include <thread>
#include <utility>

#include "AsioTimer.h"

Expand All @@ -62,7 +67,19 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
TcpResolverPtr createTcpResolver();
// throws std::runtime_error if failed
DeadlineTimerPtr createDeadlineTimer();
void postWork(std::function<void(void)> task);

// Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop
// queue and executed later.
template <typename T>
void postWork(T &&task) {
ASIO::post(io_context_, std::forward<T>(task));
}

// Different from `postWork`, if it's already in the event loop, execute the task immediately
template <typename T>
void dispatch(T &&task) {
ASIO::dispatch(io_context_, std::forward<T>(task));
}

// See TimeoutProcessor for the semantics of the parameter.
void close(long timeoutMs = 3000);
Expand Down
2 changes: 1 addition & 1 deletion lib/PeriodicTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {

void stop() noexcept;

void setCallback(CallbackType callback) noexcept { callback_ = callback; }
void setCallback(CallbackType&& callback) noexcept { callback_ = std::move(callback); }

State getState() const noexcept { return state_; }
int getPeriodMs() const noexcept { return periodMs_; }
Expand Down
44 changes: 37 additions & 7 deletions tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3188,7 +3188,17 @@ static void expectTimeoutOnRecv(Consumer &consumer) {
ASSERT_EQ(ResultTimeout, res);
}

void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
static std::vector<std::string> expectedNegativeAckMessages(size_t numMessages) {
std::vector<std::string> expected;
expected.reserve(numMessages);
for (size_t i = 0; i < numMessages; i++) {
expected.emplace_back("test-" + std::to_string(i));
}
return expected;
}

void testNegativeAcks(const std::string &topic, bool batchingEnabled, bool expectOrdered = true) {
constexpr size_t numMessages = 10;
Client client(lookupUrl);
Consumer consumer;
ConsumerConfiguration conf;
Expand All @@ -3202,22 +3212,32 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
result = client.createProducer(topic, producerConf, producer);
ASSERT_EQ(ResultOk, result);

for (int i = 0; i < 10; i++) {
for (size_t i = 0; i < numMessages; i++) {
Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
producer.sendAsync(msg, nullptr);
}

producer.flush();

std::vector<std::string> receivedMessages;
receivedMessages.reserve(numMessages);
std::vector<MessageId> toNeg;
for (int i = 0; i < 10; i++) {
for (size_t i = 0; i < numMessages; i++) {
Message msg;
consumer.receive(msg);

LOG_INFO("Received message " << msg.getDataAsString());
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
if (expectOrdered) {
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
}
receivedMessages.emplace_back(msg.getDataAsString());
toNeg.push_back(msg.getMessageId());
}
if (!expectOrdered) {
auto expectedMessages = expectedNegativeAckMessages(numMessages);
std::sort(receivedMessages.begin(), receivedMessages.end());
ASSERT_EQ(expectedMessages, receivedMessages);
}
// No more messages expected
expectTimeoutOnRecv(consumer);

Expand All @@ -3228,15 +3248,25 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
}
PulsarFriend::setNegativeAckEnabled(consumer, true);

for (int i = 0; i < 10; i++) {
std::vector<std::string> redeliveredMessages;
redeliveredMessages.reserve(numMessages);
for (size_t i = 0; i < numMessages; i++) {
Message msg;
consumer.receive(msg);
LOG_INFO("-- Redelivery -- Received message " << msg.getDataAsString());

ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
if (expectOrdered) {
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
}
redeliveredMessages.emplace_back(msg.getDataAsString());

consumer.acknowledge(msg);
}
if (!expectOrdered) {
auto expectedMessages = expectedNegativeAckMessages(numMessages);
std::sort(redeliveredMessages.begin(), redeliveredMessages.end());
ASSERT_EQ(expectedMessages, redeliveredMessages);
}

// No more messages expected
expectTimeoutOnRecv(consumer);
Expand All @@ -3262,7 +3292,7 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);

testNegativeAcks(topicName, true);
testNegativeAcks(topicName, true, false);
}

void testNegativeAckPrecisionBitCnt(const std::string &topic, int precisionBitCnt) {
Expand Down
Loading
Loading