Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions spectator/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint,
local_socket_(io_context_), bytes_to_buffer_(bytes_to_buffer) {
buffer_.reserve(bytes_to_buffer_ + 1024);
if (absl::StartsWith(endpoint, "unix:")) {
setup_unix_domain(endpoint.substr(5));
this->unixDomainPath_ = std::string(endpoint.substr(5));
setup_unix_domain();
} else if (absl::StartsWith(endpoint, "udp:")) {
auto pos = 4;
// if the user used udp://foo:1234 instead of udp:foo:1234
Expand Down Expand Up @@ -49,29 +50,68 @@ void SpectatordPublisher::local_reconnect(absl::string_view path) {
}
}

void SpectatordPublisher::setup_unix_domain(absl::string_view path) {
local_reconnect(path);
// get a copy of the file path
std::string local_path{path};
sender_ = [local_path, this](std::string_view msg) {
buffer_.append(msg);
if (buffer_.length() >= bytes_to_buffer_) {
for (auto i = 0; i < 3; ++i) {
try {
auto sent_bytes = local_socket_.send(asio::buffer(buffer_));
logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length());
break;
} catch (std::exception& e) {
local_reconnect(local_path);
logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i,
e.what());

bool SpectatordPublisher::try_to_send(const std::string& buffer) {
for (auto i = 0; i < 3; ++i) {
try {
auto sent_bytes = local_socket_.send(asio::buffer(buffer));
logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes,
buffer.length());
return true;
} catch (std::exception& e) {
local_reconnect(this->unixDomainPath_);
logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer, i, e.what());
}
}
return false;
}

void SpectatordPublisher::taskThreadFunction() try {
while (shutdown_.load() == false) {
std::string message {};
{
std::unique_lock<std::mutex> lock(mtx_);
cv_sender_.wait(lock, [this] { return buffer_.size() > bytes_to_buffer_ || shutdown_.load();});
if (shutdown_.load() == true) {
return;
}
message = std::move(buffer_);
buffer_ = std::string();
buffer_.reserve(bytes_to_buffer_);
}
cv_receiver_.notify_one();
try_to_send(message);
}
} catch (const std::exception& e) {
logger_->error("Fatal error in message processing thread: {}", e.what());
}

void SpectatordPublisher::setup_unix_domain(){
// Reset connection to the unix domain socket
local_reconnect(this->unixDomainPath_);
if (bytes_to_buffer_ == 0) {
sender_ = [this](std::string_view msg) {
try_to_send(std::string(msg));
};
return;
}
else {
sender_ = [this](std::string_view msg) {
unsigned int currentBufferSize = buffer_.size();
{
std::unique_lock<std::mutex> lock(mtx_);
cv_receiver_.wait(lock, [this] { return buffer_.size() <= bytes_to_buffer_ || shutdown_.load(); });
if (shutdown_.load()) {
return;
}
buffer_.append(msg.data(), msg.size());
buffer_.append(1, NEW_LINE);
currentBufferSize = buffer_.size();
}
buffer_.clear();
} else {
buffer_.push_back(NEW_LINE);
}
};
currentBufferSize > bytes_to_buffer_ ? cv_sender_.notify_one() : cv_receiver_.notify_one();
};
this->sendingThread_ = std::thread(&SpectatordPublisher::taskThreadFunction, this);
}
}

inline asio::ip::udp::endpoint resolve_host_port(
Expand Down
21 changes: 20 additions & 1 deletion spectator/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@ class SpectatordPublisher {
std::shared_ptr<spdlog::logger> logger = DefaultLogger());
SpectatordPublisher(const SpectatordPublisher&) = delete;

~SpectatordPublisher() {
shutdown_.store(true);
cv_receiver_.notify_all();
cv_sender_.notify_all();
if (sendingThread_.joinable()) {
sendingThread_.join();
}
}

void send(std::string_view measurement) { sender_(measurement); };

void taskThreadFunction();
bool try_to_send(const std::string& buffer);

protected:
using sender_fun = std::function<void(std::string_view)>;
sender_fun sender_;

private:
void setup_nop_sender();
void setup_unix_domain(absl::string_view path);
void setup_unix_domain();
void setup_udp(absl::string_view host_port);
void local_reconnect(absl::string_view path);
void udp_reconnect(const asio::ip::udp::endpoint& endpoint);
Expand All @@ -34,6 +46,13 @@ class SpectatordPublisher {
asio::local::datagram_protocol::socket local_socket_;
std::string buffer_;
uint32_t bytes_to_buffer_;

std::thread sendingThread_;
std::mutex mtx_;
std::condition_variable cv_receiver_;
std::condition_variable cv_sender_;
std::string unixDomainPath_;
std::atomic<bool> shutdown_{false};
};

} // namespace spectator
75 changes: 74 additions & 1 deletion spectator/publisher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "test_server.h"
#include <gtest/gtest.h>
#include <unistd.h>
#include <regex>

namespace {

Expand Down Expand Up @@ -77,7 +78,7 @@ TEST(Publisher, UnixBuffer) {
c.Increment();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
msgs = server.GetMessages();
std::vector<std::string> expected{"c:counter:1\nc:counter:1\nc:counter:1"};
std::vector<std::string> expected{"c:counter:1\nc:counter:1\nc:counter:1\n"};
EXPECT_EQ(msgs, expected);
server.Stop();
unlink(path.c_str());
Expand All @@ -90,4 +91,76 @@ TEST(Publisher, Nop) {
c.Add(2);
}

TEST(Publisher, MultiThreadedCounters) {
auto logger = spectator::DefaultLogger();
const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp");
auto path = fmt::format("{}/testserver.{}", dir, getpid());
TestUnixServer server{path};
server.Start();
logger->info("Unix Server started on path {}", path);

// Create publisher with a small buffer size to ensure flushing
SpectatordPublisher publisher{fmt::format("unix:{}", path), 50};

// Number of threads and counters to create
const int numThreads = 4;
const int countersPerThread = 3;
const int incrementsPerCounter = 5;

// Function for worker threads
auto worker = [&](int threadId) {
// Create several counters per thread with unique names
for (int i = 0; i < countersPerThread; i++) {
std::string counterName = fmt::format("counter.thread{}.{}", threadId, i);
Counter counter(std::make_shared<Id>(counterName, Tags{}), &publisher);

// Increment each counter multiple times
for (int j = 0; j < incrementsPerCounter; j++) {
counter.Increment();
}
}
};

// Start worker threads
std::vector<std::thread> threads;
for (int i = 0; i < numThreads; i++) {
threads.emplace_back(worker, i);
}

// Wait for all threads to complete
for (auto& t : threads) {
t.join();
}

// Give some time for messages to be sent
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// Check messages
auto msgs = server.GetMessages();
EXPECT_FALSE(msgs.empty());

// Verify total number of increments
int expectedIncrements = numThreads * countersPerThread * incrementsPerCounter;
int actualIncrements = 0;

// Verify every string in msgs follows the form counter.thread<digit>.<digit>
std::regex counter_regex(R"(c:counter\.thread\d+\.\d+:1)");
for (const auto& msg : msgs) {
std::stringstream ss(msg);
std::string line;
while (std::getline(ss, line)) {
if (!line.empty()) {
EXPECT_TRUE(std::regex_match(line, counter_regex))
<< "Unexpected counter format: " << line;
actualIncrements++;
}
}
}

EXPECT_EQ(actualIncrements, expectedIncrements);

server.Stop();
unlink(path.c_str());
}

} // namespace
Loading