diff --git a/core/src/stored/append.cc b/core/src/stored/append.cc index fd1757d70cb..894d1b505a7 100644 --- a/core/src/stored/append.cc +++ b/core/src/stored/append.cc @@ -42,6 +42,13 @@ #include "lib/berrno.h" #include "lib/crypto.h" +#include +#include +#include +#include +#include +// #include "lib/channel.h" + namespace storagedaemon { /* Responses sent to the daemon */ @@ -111,6 +118,158 @@ static bool SaveFullyProcessedFilesAttributes( return false; } +class MessageHandler { + public: + using signal = int; + + struct message { + std::size_t size; + PoolMem data; + }; + + enum class error + { + HARDEOF, + // both ERROR and SOCKET_ERROR are taken by windows.h + INTERNAL_ERROR, + }; + + using result_type = std::variant; + + MessageHandler(BareosSocket* fd) : fd{fd}, receive_thread{enlist, this} {} + + std::optional get_msg() + { + if (cache.size() == 0) { + std::unique_lock l(mut); + not_empty.wait(l, [this] { return results.size() > 0 || finished; }); + + if (results.size() == 0) { return std::nullopt; } + + std::swap(cache, results); + } + + ASSERT(cache.size() > 0); + + result_type t = std::move(cache.front()); + cache.pop_front(); + + bytes_out += sizeof(t); + if (auto* content = std::get_if(&t)) { + bytes_out += content->size; + } + return t; + } + + BareosSocket* close_and_get() + { + end.store(true); + receive_thread.join(); + return fd; + } + + private: + bool error_while_reading{false}; + std::atomic end{false}; + + BareosSocket* fd; + // channel::out chan; + + std::mutex mut{}; + bool finished{false}; + std::size_t bytes_out{0}; + std::condition_variable not_empty{}; + std::deque results{}; + + std::deque cache; + + // receive_thread has to be defined last! + // The thread created will try to access this class immediately after + // being created! As such everything else has to be initialized. + std::thread receive_thread; + void do_work() + { + POOLMEM* save = fd->msg; + bool cont = true; + std::size_t max_size{0}; + std::size_t max_bytes{0}; + std::size_t bytes_in{0}; + std::size_t message_count{0}; + std::size_t full_message_count{0}; + std::size_t signal_count{0}; + for (int res = 0; cont; res = fd->WaitData(0, 100'000)) { + if (res == 1) { + PoolMem msg(PM_MESSAGE); + fd->msg = msg.addr(); + result_type result; + int n = BgetMsg(fd); + // fd->msg might have been relocated + msg.addr() = fd->msg; + if (n < 0) { + if (n == BNET_SIGNAL) { + result = signal{fd->message_length}; + // break; /* end of data */ + } else if (n == BNET_HARDEOF) { + result = error::HARDEOF; + cont = false; + } else { + result = error::INTERNAL_ERROR; + cont = false; + } + + // Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. + // ERR=%s\n"), + // what, bs->bstrerror()); + // ok = false; + // break; + signal_count += 1; + } else { + std::size_t length = n; + result = message{length, std::move(msg)}; + if (n == 64 * 1024) full_message_count += 1; + message_count += 1; + } + fd->msg = nullptr; + + { + std::unique_lock l(mut); + results.emplace_back(std::move(result)); + if (results.size() > max_size) { max_size = results.size(); } + if (n > 0) { bytes_in += n; } + bytes_in += sizeof(result); + ASSERT(bytes_in >= full_message_count * 64ULL * 1024ULL); + if (bytes_in - bytes_out > max_bytes) { + ASSERT(bytes_in > bytes_out); + max_bytes = bytes_in - bytes_out; + } + if (message_count % 10000 == 0) { + Dmsg1(50, + "MsgQueue Message Count = %llu, Signal Count = %llu, Size = " + "%llu\n", + message_count, signal_count, bytes_in - bytes_out); + } + not_empty.notify_one(); + } + } else if (res == -1) { + cont = false; + error_while_reading = true; + } + if (end.load()) { cont = false; } + } + + std::unique_lock l(mut); + finished = true; + not_empty.notify_one(); + fd->msg = save; + Dmsg1(50, + "MsgQueue Max Msg = %llu, Max Size = %llu\n" + "Messages = %llu (Full: %llu), Signals = %llu\n", + max_size, max_bytes, message_count, full_message_count, signal_count); + } + + static void enlist(MessageHandler* handler) { handler->do_work(); } +}; + // Append Data sent from File daemon bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what) { @@ -222,6 +381,8 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what) ProcessedFile file_currently_processed; uint32_t current_block_number = jcr->sd_impl->dcr->block->BlockNumber; + MessageHandler handler(std::exchange(bs, nullptr)); + for (last_file_index = 0; ok && !jcr->IsJobCanceled();) { /* Read Stream header from the daemon. * @@ -230,17 +391,35 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what) * - stream (Bareos number to distinguish parts of data) * - info (Info for Storage daemon -- compressed, encrypted, ...) * info is not currently used, so is read, but ignored! */ - if ((n = BgetMsg(bs)) <= 0) { - if (n == BNET_SIGNAL && bs->message_length == BNET_EOD) { - break; /* end of data */ - } + auto msg = handler.get_msg(); + if (!msg) { + Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"), + what, "bs->bstrerror()"); + ok = false; + break; + } + + if (auto* error = std::get_if(&msg.value())) { + (void)error; Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"), - what, bs->bstrerror()); + what, "bs->bstrerror()"); ok = false; break; } - if (sscanf(bs->msg, "%ld %ld", &file_index, &stream) != 2) { + if (auto* signal = std::get_if(&msg.value())) { + if (*signal != BNET_EOD) { + Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"), + what, "bs->bstrerror()"); + ok = false; + } + break; + } + + auto content = std::get(std::move(msg).value()); + n = content.size; + + if (sscanf(content.data.c_str(), "%ld %ld", &file_index, &stream) != 2) { Jmsg2(jcr, M_FATAL, 0, _("Malformed data header from %s: %s\n"), what, bs->msg); ok = false; @@ -279,15 +458,47 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what) * We save the original data pointer from the record so we can restore * that after the loop ends. */ rec_data = jcr->sd_impl->dcr->rec->data; - while ((n = BgetMsg(bs)) > 0 && !jcr->IsJobCanceled()) { + while (!jcr->IsJobCanceled()) { + auto msg = handler.get_msg(); + + if (!msg) { + Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"), + what, "bs->bstrerror()"); + ok = false; + break; + } + + if (auto* error = std::get_if(&msg.value())) { + (void)error; + Jmsg2(jcr, M_FATAL, 0, _("Error reading data header from %s. ERR=%s\n"), + what, "bs->bstrerror()"); + ok = false; + break; + } + + if (auto* signal = std::get_if(&msg.value())) { + if (*signal != BNET_EOD) { + Jmsg2(jcr, M_FATAL, 0, + _("Error reading data header from %s. ERR=%s\n"), what, + "bs->bstrerror()"); + ok = false; + } + break; + } + + MessageHandler::message content + = std::get(std::move(msg).value()); + n = content.size; + jcr->sd_impl->dcr->rec->VolSessionId = jcr->VolSessionId; jcr->sd_impl->dcr->rec->VolSessionTime = jcr->VolSessionTime; jcr->sd_impl->dcr->rec->FileIndex = file_index; jcr->sd_impl->dcr->rec->Stream = stream; jcr->sd_impl->dcr->rec->maskedStream = stream & STREAMMASK_TYPE; /* strip high bits */ - jcr->sd_impl->dcr->rec->data_len = bs->message_length; - jcr->sd_impl->dcr->rec->data = bs->msg; /* use message buffer */ + jcr->sd_impl->dcr->rec->data_len = content.size; + jcr->sd_impl->dcr->rec->data + = content.data.addr(); /* use message buffer */ Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n", jcr->sd_impl->dcr->rec->FileIndex, @@ -342,6 +553,7 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what) // Restore the original data pointer. jcr->sd_impl->dcr->rec->data = rec_data; +#if 0 if (bs->IsError()) { if (!jcr->IsJobCanceled()) { Dmsg2(350, "Network read error from %s. ERR=%s\n", what, @@ -352,8 +564,10 @@ bool DoAppendData(JobControlRecord* jcr, BareosSocket* bs, const char* what) ok = false; break; } +#endif } + bs = handler.close_and_get(); // Create Job status for end of session label jcr->setJobStatusWithPriorityCheck(ok ? JS_Terminated : JS_ErrorTerminated); diff --git a/systemtests/tests/restore/testrunner-create-backup b/systemtests/tests/restore/testrunner-create-backup index 7750ed041ea..f004bf3f231 100755 --- a/systemtests/tests/restore/testrunner-create-backup +++ b/systemtests/tests/restore/testrunner-create-backup @@ -24,7 +24,7 @@ cat <"$tmp/bconcmds" @$out /dev/null messages @$out $log_home/create-backup.out -setdebug level=100 storage=File +setdebug level=100 storage=File trace=1 label volume=TestVolume001 storage=File pool=Full run job=$JobName yes status director