From 49100020018a71b484af46de6d960f8df7e66e83 Mon Sep 17 00:00:00 2001 From: Sebastian Sura Date: Mon, 21 Aug 2023 14:38:09 +0200 Subject: [PATCH] backup: send on second thread --- core/src/filed/backup.cc | 57 +++++++++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/core/src/filed/backup.cc b/core/src/filed/backup.cc index a45a1ec7f87..bc11033b770 100644 --- a/core/src/filed/backup.cc +++ b/core/src/filed/backup.cc @@ -51,6 +51,8 @@ #include "lib/serial.h" #include "lib/compression.h" +#include "lib/channel.h" + #include namespace filedaemon { @@ -1121,12 +1123,37 @@ static inline bool SendPlainData(b_ctx& bctx) DIGEST* signing = bctx.signing_digest; size_t bytes_read{0}; - size_t bytes_send{0}; thread_pool& pool = bctx.jcr->fd_impl->pool; std::optional> update_digest; + // FIXME(ssura): How big should the buffer be ? + auto [in, out] = channel::CreateBufferedChannel< + std::pair, std::size_t>>(5); + + // FIXME(ssura): We have to wrap out in a shared_ptr, because borrow_thread + // needs a copyable function. Once we move to + // move_only_function we can remove this. + std::shared_ptr chunks = std::make_shared(std::move(out)); + std::future> bytes_send_fut = borrow_thread( + pool, [chunks = std::move(chunks), sd]() mutable -> result { + std::size_t accumulated = 0; + for (std::optional pair = chunks->get(); pair.has_value(); + pair = chunks->get()) { + auto [ptr, size] = pair.value(); + + result ret = SendData(sd, ptr.get(), size); + if (ret.holds_error()) { + return ret; + } else { + accumulated += ret.value_unchecked(); + } + } + return accumulated; + }); + + // Read the file data for (;;) { std::shared_ptr Buffer = std::make_shared(); @@ -1194,16 +1221,9 @@ static inline bool SendPlainData(b_ctx& bctx) ? header_size + buf_size /* include bytes_read/offset in size */ : buf_size; - result sendres = SendData(sd, Buffer->addr(), total_length); - - if (auto* error = sendres.error()) { - if (!bctx.jcr->IsJobCanceled()) { - Jmsg1(bctx.jcr, M_FATAL, 0, "%s\n", error->c_str()); - } + if (!in.emplace(std::shared_ptr(Buffer, Buffer->addr()), + total_length)) { goto bail_out; - } else { - bytes_send += sendres.value_unchecked(); /* count bytes saved possibly - compressed/encrypted */ } } @@ -1212,10 +1232,21 @@ static inline bool SendPlainData(b_ctx& bctx) retval = true; bail_out: + in.close(); if (update_digest) { update_digest->get(); } - bctx.jcr->ReadBytes += bytes_read; /* count bytes read */ - bctx.jcr->JobBytes += bytes_send; /* count bytes read */ - sd->msg = bctx.msgsave; /* restore read buffer */ + result sendres = bytes_send_fut.get(); + if (auto* error = sendres.error()) { + if (!bctx.jcr->IsJobCanceled()) { + Jmsg1(bctx.jcr, M_FATAL, 0, "%s\n", error->c_str()); + } + retval = false; + } else { + bctx.jcr->JobBytes + += sendres.value_unchecked(); /* count bytes saved possibly + compressed/encrypted */ + bctx.jcr->ReadBytes += bytes_read; /* count bytes read */ + } + sd->msg = bctx.msgsave; /* restore read buffer */ return retval; }