Skip to content

Commit

Permalink
Merge pull request #7430 from rgacogne/rec41-redo-remotelogger
Browse files Browse the repository at this point in the history
rec-4.1.x: Reduce systemcall usage in protobuf logging
  • Loading branch information
ahupowerdns committed Jan 30, 2019
2 parents 73fe1da + c32ea3c commit c925ec3
Show file tree
Hide file tree
Showing 8 changed files with 627 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pdns/namespaces.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#ifndef PDNS_NAMESPACES_HH
#define PDNS_NAMESPACES_HH
#include <boost/tuple/tuple.hpp>

#include <boost/shared_array.hpp>
#include <boost/scoped_array.hpp>
#include <boost/optional.hpp>
Expand Down Expand Up @@ -78,4 +77,5 @@ using boost::trim_right_copy_if;
using boost::equals;
using boost::ends_with;
using boost::iends_with;

#endif
4 changes: 2 additions & 2 deletions pdns/rec-lua-conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de
try {
ComboAddress server(server_);
if (!lci.protobufServer) {
lci.protobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);
lci.protobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, 100*(maxQueuedEntries ? *maxQueuedEntries : 100), reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);

if (maskV4) {
lci.protobufMaskV4 = *maskV4;
Expand Down Expand Up @@ -310,7 +310,7 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de
try {
ComboAddress server(server_);
if (!lci.outgoingProtobufServer) {
lci.outgoingProtobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);
lci.outgoingProtobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, 100*(maxQueuedEntries ? *maxQueuedEntries : 100), reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);
}
else {
theL()<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufServer->toString()<<endl;
Expand Down
145 changes: 105 additions & 40 deletions pdns/remote_logger.cc
Original file line number Diff line number Diff line change
@@ -1,13 +1,77 @@
#include <unistd.h>
#include "remote_logger.hh"

#include <sys/uio.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#ifdef PDNS_CONFIG_ARGS
#include "logger.hh"
#define WE_ARE_RECURSOR
#else
#include "dolog.hh"
#endif

void CircularWriteBuffer::write(const std::string& str)
{
if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
flush();

if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
throw std::runtime_error("Full!");

uint16_t len = htons(str.size());
char* ptr = (char*)&len;
d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
d_buffer.insert(d_buffer.end(), str.begin(), str.end());
}

void CircularWriteBuffer::flush()
{
if(d_buffer.empty()) // not optional, we report EOF otherwise
return;

auto arr1 = d_buffer.array_one();
auto arr2 = d_buffer.array_two();

struct iovec iov[2];
int pos=0;
size_t total=0;
for(const auto& arr : {arr1, arr2}) {
if(arr.second) {
iov[pos].iov_base = arr.first;
iov[pos].iov_len = arr.second;
total += arr.second;
++pos;
}
}

int res = writev(d_fd, iov, pos);
if(res < 0) {
throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno)));
}
if(!res) {
throw std::runtime_error("EOF");
}
// cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
if((size_t)res == d_buffer.size())
d_buffer.clear();
else {
while(res--)
d_buffer.pop_front();
}
}

RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
{
if (!d_asyncConnect) {
if(reconnect())
d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
}
d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
}

bool RemoteLogger::reconnect()
{
if (d_socket >= 0) {
Expand All @@ -30,58 +94,59 @@ bool RemoteLogger::reconnect()
return true;
}

void RemoteLogger::worker()
void RemoteLogger::queueData(const std::string& data)
{
if (d_asyncConnect) {
reconnect();
if(!d_writer) {
d_drops++;
return;
}

while(true) {
std::string data;
{
std::unique_lock<std::mutex> lock(d_writeMutex);
d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;});
if (d_exiting) {
return;
}
data = d_writeQueue.front();
d_writeQueue.pop();
}

std::unique_lock<std::mutex> lock(d_mutex);
if(d_writer) {
try {
uint16_t len = static_cast<uint16_t>(data.length());
sendSizeAndMsgWithTimeout(d_socket, len, data.c_str(), static_cast<int>(d_timeout), nullptr, nullptr, 0, 0, 0);
d_writer->write(data);
}
catch(const std::runtime_error& e) {
#ifdef WE_ARE_RECURSOR
L<<Logger::Info<<"Error sending data to remote logger "<<d_remote.toStringWithPort()<<": "<< e.what()<<endl;
#else
vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
#endif
while (!reconnect()) {
sleep(d_reconnectWaitTime);
}
catch(std::exception& e) {
// cout << "Got exception writing: "<<e.what()<<endl;
d_drops++;
d_writer.reset();
close(d_socket);
d_socket = -1;
}
}
}

void RemoteLogger::queueData(const std::string& data)
void RemoteLogger::maintenanceThread()
try
{
{
std::unique_lock<std::mutex> lock(d_writeMutex);
if (d_writeQueue.size() >= d_maxQueuedEntries) {
d_writeQueue.pop();
for(;;) {
if(d_exiting)
break;

if(d_writer) {
std::unique_lock<std::mutex> lock(d_mutex);
if(d_writer) { // check if it is still set
// cout<<"Flush"<<endl;
try {
d_writer->flush();
}
catch(std::exception& e) {
// cout<<"Flush failed!"<<endl;
d_writer.reset();
close(d_socket);
d_socket = -1;
}
}
}
d_writeQueue.push(data);
else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
std::unique_lock<std::mutex> lock(d_mutex);
d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
}
sleep(d_reconnectWaitTime);
}
d_queueCond.notify_one();
}

RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_thread(&RemoteLogger::worker, this)
catch(std::exception& e)
{
if (!d_asyncConnect) {
reconnect();
}
cerr<<"Thead died on: "<<e.what()<<endl;
}

RemoteLogger::~RemoteLogger()
Expand All @@ -91,6 +156,6 @@ RemoteLogger::~RemoteLogger()
close(d_socket);
d_socket = -1;
}
d_queueCond.notify_one();

d_thread.join();
}
64 changes: 55 additions & 9 deletions pdns/remote_logger.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,76 @@
#include <thread>

#include "iputils.hh"
#include <boost/circular_buffer.hpp>

class RemoteLogger
/* Writes can be submitted and they are atomically accepted. Either the whole write
ends up in the buffer or nothing ends up in the buffer.
In case nothing ends up in the buffer, an exception is thrown.
Similarly, EOF leads to this treatment
The filedescriptor can be in non-blocking mode.
This class is not threadsafe.
*/

class CircularWriteBuffer
{
public:
explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size)
{
}

void write(const std::string& str);
void flush();
private:
int d_fd;
boost::circular_buffer<char> d_buffer;
};

class RemoteLoggerInterface
{
public:
RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1, bool asyncConnect=false);
virtual ~RemoteLoggerInterface() {};
virtual void queueData(const std::string& data) = 0;
virtual std::string toString() const = 0;
};

/* Thread safe. Will connect asynchronously on request.
Runs a reconnection thread that also periodicall flushes.
Note that the buffer only runs as long as there is a connection.
If there is no connection we don't buffer a thing
*/
class RemoteLogger : public RemoteLoggerInterface
{
public:
RemoteLogger(const ComboAddress& remote, uint16_t timeout=2,
uint64_t maxQueuedBytes=100000,
uint8_t reconnectWaitTime=1,
bool asyncConnect=false);
~RemoteLogger();
void queueData(const std::string& data);
std::string toString()
void queueData(const std::string& data) override;
std::string toString() const override
{
return d_remote.toStringWithPort();
}
void stop()
{
d_exiting = true;
}
std::atomic<uint32_t> d_drops{0};
private:
bool reconnect();
void worker();
void maintenanceThread();

std::queue<std::string> d_writeQueue;
std::mutex d_writeMutex;
std::condition_variable d_queueCond;
ComboAddress d_remote;
uint64_t d_maxQueuedEntries;
uint64_t d_maxQueuedBytes;
int d_socket{-1};
std::unique_ptr<CircularWriteBuffer> d_writer;
uint16_t d_timeout;
uint8_t d_reconnectWaitTime;
std::atomic<bool> d_exiting{false};

bool d_asyncConnect{false};
std::thread d_thread;
std::mutex d_mutex;
};
1 change: 1 addition & 0 deletions regression-tests.recursor-dnssec/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/.venv
/configs
/vars
/*_pb2.py
2 changes: 2 additions & 0 deletions regression-tests.recursor-dnssec/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dnspython>=1.11
nose>=1.3.7
protobuf>=2.5; sys_platform != 'darwin'
protobuf>=3.0; sys_platform == 'darwin'
pysnmp>=4.3.4
Twisted>0.15.0
2 changes: 2 additions & 0 deletions regression-tests.recursor-dnssec/runtests
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ fi
python -V
pip install -r requirements.txt

protoc -I=../pdns/ --python_out=. ../pdns/dnsmessage.proto

mkdir -p configs

[ -f ./vars ] && . ./vars
Expand Down
Loading

0 comments on commit c925ec3

Please sign in to comment.