Skip to content

Commit

Permalink
append: introduce MessageReceiver
Browse files Browse the repository at this point in the history
This class reads data from the given socket asynchroniously and makes
them available in a preparsed manner on the main thread.  It is
important that the given socket is only used by one thread at a time,
as such, if you create a MessageHandler for a certain socket, you need
to take care to not use that socket at all while the message handler
is alive.
  • Loading branch information
sebsura committed Nov 7, 2023
1 parent 4a0061b commit 80c5f43
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 10 deletions.
232 changes: 223 additions & 9 deletions core/src/stored/append.cc
Expand Up @@ -42,6 +42,13 @@
#include "lib/berrno.h"
#include "lib/crypto.h"

#include <thread>
#include <variant>
#include <deque>
#include <utility>
#include <condition_variable>
// #include "lib/channel.h"

namespace storagedaemon {

/* Responses sent to the daemon */
Expand Down Expand Up @@ -111,6 +118,158 @@ static bool SaveFullyProcessedFilesAttributes(
return false;
}

class MessageHandler {
public:
using signal = int;

struct message {
std::size_t size;
PoolMem data;
};

enum class error
{
HARDEOF,
// both ERROR and SOCKET_ERROR are taken by windows.h
INTERNAL_ERROR,
};

using result_type = std::variant<signal, message, error>;

MessageHandler(BareosSocket* fd) : fd{fd}, receive_thread{enlist, this} {}

std::optional<result_type> get_msg()
{
if (cache.size() == 0) {
std::unique_lock l(mut);
not_empty.wait(l, [this] { return results.size() > 0 || finished; });

if (results.size() == 0) { return std::nullopt; }

std::swap(cache, results);
}

ASSERT(cache.size() > 0);

result_type t = std::move(cache.front());
cache.pop_front();

bytes_out += sizeof(t);
if (auto* content = std::get_if<message>(&t)) {
bytes_out += content->size;
}
return t;
}

BareosSocket* close_and_get()
{
end.store(true);
receive_thread.join();
return fd;
}

private:
bool error_while_reading{false};
std::atomic<bool> end{false};

BareosSocket* fd;
// channel::out<result_type> chan;

std::mutex mut{};
bool finished{false};
std::size_t bytes_out{0};
std::condition_variable not_empty{};
std::deque<result_type> results{};

std::deque<result_type> cache;

// receive_thread has to be defined last!
// The thread created will try to access this class immediately after
// being created! As such everything else has to be initialized.
std::thread receive_thread;
void do_work()
{
POOLMEM* save = fd->msg;
bool cont = true;
std::size_t max_size{0};
std::size_t max_bytes{0};
std::size_t bytes_in{0};
std::size_t message_count{0};
std::size_t full_message_count{0};
std::size_t signal_count{0};
for (int res = 0; cont; res = fd->WaitData(0, 100'000)) {
if (res == 1) {
PoolMem msg(PM_MESSAGE);
fd->msg = msg.addr();
result_type result;
int n = BgetMsg(fd);
// fd->msg might have been relocated
msg.addr() = fd->msg;
if (n < 0) {
if (n == BNET_SIGNAL) {
result = signal{fd->message_length};
// break; /* end of data */
} else if (n == BNET_HARDEOF) {
result = error::HARDEOF;
cont = false;
} else {
result = error::INTERNAL_ERROR;
cont = false;
}

// Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s.
// ERR=%s\n"),
// what, bs->bstrerror());
// ok = false;
// break;
signal_count += 1;
} else {
std::size_t length = n;
result = message{length, std::move(msg)};
if (n == 64 * 1024) full_message_count += 1;
message_count += 1;
}
fd->msg = nullptr;

{
std::unique_lock l(mut);
results.emplace_back(std::move(result));
if (results.size() > max_size) { max_size = results.size(); }
if (n > 0) { bytes_in += n; }
bytes_in += sizeof(result);
ASSERT(bytes_in >= full_message_count * 64ULL * 1024ULL);
if (bytes_in - bytes_out > max_bytes) {
ASSERT(bytes_in > bytes_out);
max_bytes = bytes_in - bytes_out;
}
if (message_count % 10000 == 0) {
Dmsg1(50,
"MsgQueue Message Count = %llu, Signal Count = %llu, Size = "
"%llu\n",
message_count, signal_count, bytes_in - bytes_out);
}
not_empty.notify_one();
}
} else if (res == -1) {
cont = false;
error_while_reading = true;
}
if (end.load()) { cont = false; }
}

std::unique_lock l(mut);
finished = true;
not_empty.notify_one();
fd->msg = save;
Dmsg1(50,
"MsgQueue Max Msg = %llu, Max Size = %llu\n"
"Messages = %llu (Full: %llu), Signals = %llu\n",
max_size, max_bytes, message_count, full_message_count, signal_count);
}

static void enlist(MessageHandler* handler) { handler->do_work(); }
};

// Append Data sent from File daemon
bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what)
{
Expand Down Expand Up @@ -222,6 +381,8 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what)
ProcessedFile file_currently_processed;
uint32_t current_block_number = jcr->sd_impl->dcr->block->BlockNumber;

MessageHandler handler(std::exchange(bs, nullptr));

for (last_file_index = 0; ok && !jcr->IsJobCanceled();) {
/* Read Stream header from the daemon.
*
Expand All @@ -230,17 +391,35 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what)
* - stream (Bareos number to distinguish parts of data)
* - info (Info for Storage daemon -- compressed, encrypted, ...)
* info is not currently used, so is read, but ignored! */
if ((n = BgetMsg(bs)) <= 0) {
if (n == BNET_SIGNAL && bs->message_length == BNET_EOD) {
break; /* end of data */
}
auto msg = handler.get_msg();
if (!msg) {
Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"),
what, "bs->bstrerror()");
ok = false;
break;
}

if (auto* error = std::get_if<MessageHandler::error>(&msg.value())) {
(void)error;
Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"),
what, bs->bstrerror());
what, "bs->bstrerror()");
ok = false;
break;
}

if (sscanf(bs->msg, "%ld %ld", &file_index, &stream) != 2) {
if (auto* signal = std::get_if<MessageHandler::signal>(&msg.value())) {
if (*signal != BNET_EOD) {
Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"),
what, "bs->bstrerror()");
ok = false;
}
break;
}

auto content = std::get<MessageHandler::message>(std::move(msg).value());
n = content.size;

if (sscanf(content.data.c_str(), "%ld %ld", &file_index, &stream) != 2) {
Jmsg2(jcr, M_FATAL, 0, _("Malformed data header from %s: %s\n"), what,
bs->msg);
ok = false;
Expand Down Expand Up @@ -279,15 +458,47 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what)
* We save the original data pointer from the record so we can restore
* that after the loop ends. */
rec_data = jcr->sd_impl->dcr->rec->data;
while ((n = BgetMsg(bs)) > 0 && !jcr->IsJobCanceled()) {
while (!jcr->IsJobCanceled()) {
auto msg = handler.get_msg();

if (!msg) {
Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"),
what, "bs->bstrerror()");
ok = false;
break;
}

if (auto* error = std::get_if<MessageHandler::error>(&msg.value())) {
(void)error;
Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"),
what, "bs->bstrerror()");
ok = false;
break;
}

if (auto* signal = std::get_if<MessageHandler::signal>(&msg.value())) {
if (*signal != BNET_EOD) {
Jmsg2(jcr, M_FATAL, 0,
_("Error reading data header from %s. ERR=%s\n"), what,
"bs->bstrerror()");
ok = false;
}
break;
}

MessageHandler::message content
= std::get<MessageHandler::message>(std::move(msg).value());
n = content.size;

jcr->sd_impl->dcr->rec->VolSessionId = jcr->VolSessionId;
jcr->sd_impl->dcr->rec->VolSessionTime = jcr->VolSessionTime;
jcr->sd_impl->dcr->rec->FileIndex = file_index;
jcr->sd_impl->dcr->rec->Stream = stream;
jcr->sd_impl->dcr->rec->maskedStream
= stream & STREAMMASK_TYPE; /* strip high bits */
jcr->sd_impl->dcr->rec->data_len = bs->message_length;
jcr->sd_impl->dcr->rec->data = bs->msg; /* use message buffer */
jcr->sd_impl->dcr->rec->data_len = content.size;
jcr->sd_impl->dcr->rec->data
= content.data.addr(); /* use message buffer */

Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n",
jcr->sd_impl->dcr->rec->FileIndex,
Expand Down Expand Up @@ -342,6 +553,7 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what)
// Restore the original data pointer.
jcr->sd_impl->dcr->rec->data = rec_data;

#if 0
if (bs->IsError()) {
if (!jcr->IsJobCanceled()) {
Dmsg2(350, "Network read error from %s. ERR=%s\n", what,
Expand All @@ -352,8 +564,10 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what)
ok = false;
break;
}
#endif
}

bs = handler.close_and_get();
// Create Job status for end of session label
jcr->setJobStatusWithPriorityCheck(ok ? JS_Terminated : JS_ErrorTerminated);

Expand Down
2 changes: 1 addition & 1 deletion systemtests/tests/restore/testrunner-create-backup
Expand Up @@ -24,7 +24,7 @@ cat <<END_OF_DATA >"$tmp/bconcmds"
@$out /dev/null
messages
@$out $log_home/create-backup.out
setdebug level=100 storage=File
setdebug level=100 storage=File trace=1
label volume=TestVolume001 storage=File pool=Full
run job=$JobName yes
status director
Expand Down

0 comments on commit 80c5f43

Please sign in to comment.