Skip to content

Commit

Permalink
backup: send on second thread
Browse files Browse the repository at this point in the history
  • Loading branch information
sebsura committed Nov 6, 2023
1 parent f724c84 commit 4910002
Showing 1 changed file with 44 additions and 13 deletions.
57 changes: 44 additions & 13 deletions core/src/filed/backup.cc
Expand Up @@ -51,6 +51,8 @@
#include "lib/serial.h"
#include "lib/compression.h"

#include "lib/channel.h"

#include <variant>

namespace filedaemon {
Expand Down Expand Up @@ -1121,12 +1123,37 @@ static inline bool SendPlainData(b_ctx& bctx)
DIGEST* signing = bctx.signing_digest;

size_t bytes_read{0};
size_t bytes_send{0};

thread_pool& pool = bctx.jcr->fd_impl->pool;

std::optional<std::future<void>> update_digest;

// FIXME(ssura): How big should the buffer be ?
auto [in, out] = channel::CreateBufferedChannel<
std::pair<std::shared_ptr<POOLMEM>, std::size_t>>(5);

// FIXME(ssura): We have to wrap out in a shared_ptr, because borrow_thread
// needs a copyable function. Once we move to
// move_only_function we can remove this.
std::shared_ptr chunks = std::make_shared<decltype(out)>(std::move(out));
std::future<result<std::size_t>> bytes_send_fut = borrow_thread(
pool, [chunks = std::move(chunks), sd]() mutable -> result<size_t> {
std::size_t accumulated = 0;
for (std::optional pair = chunks->get(); pair.has_value();
pair = chunks->get()) {
auto [ptr, size] = pair.value();

result ret = SendData(sd, ptr.get(), size);
if (ret.holds_error()) {
return ret;
} else {
accumulated += ret.value_unchecked();
}
}
return accumulated;
});


// Read the file data
for (;;) {
std::shared_ptr Buffer = std::make_shared<PoolMem>();
Expand Down Expand Up @@ -1194,16 +1221,9 @@ static inline bool SendPlainData(b_ctx& bctx)
? header_size + buf_size /* include bytes_read/offset in size */
: buf_size;

result sendres = SendData(sd, Buffer->addr(), total_length);

if (auto* error = sendres.error()) {
if (!bctx.jcr->IsJobCanceled()) {
Jmsg1(bctx.jcr, M_FATAL, 0, "%s\n", error->c_str());
}
if (!in.emplace(std::shared_ptr<POOLMEM>(Buffer, Buffer->addr()),
total_length)) {
goto bail_out;
} else {
bytes_send += sendres.value_unchecked(); /* count bytes saved possibly
compressed/encrypted */
}
}

Expand All @@ -1212,10 +1232,21 @@ static inline bool SendPlainData(b_ctx& bctx)
retval = true;

bail_out:
in.close();
if (update_digest) { update_digest->get(); }
bctx.jcr->ReadBytes += bytes_read; /* count bytes read */
bctx.jcr->JobBytes += bytes_send; /* count bytes read */
sd->msg = bctx.msgsave; /* restore read buffer */
result sendres = bytes_send_fut.get();
if (auto* error = sendres.error()) {
if (!bctx.jcr->IsJobCanceled()) {
Jmsg1(bctx.jcr, M_FATAL, 0, "%s\n", error->c_str());
}
retval = false;
} else {
bctx.jcr->JobBytes
+= sendres.value_unchecked(); /* count bytes saved possibly
compressed/encrypted */
bctx.jcr->ReadBytes += bytes_read; /* count bytes read */
}
sd->msg = bctx.msgsave; /* restore read buffer */
return retval;
}

Expand Down

0 comments on commit 4910002

Please sign in to comment.