Skip to content

Commit

Permalink
backup: factor out functions
Browse files Browse the repository at this point in the history
  • Loading branch information
sebsura committed Nov 6, 2023
1 parent e41ee62 commit f1551fe
Showing 1 changed file with 106 additions and 90 deletions.
196 changes: 106 additions & 90 deletions core/src/filed/backup.cc
Expand Up @@ -1139,6 +1139,98 @@ class data_message {
}
};

using shared_message = std::shared_ptr<data_message>;

static std::future<result<std::size_t>> MakeSendThread(
thread_pool& pool,
BareosSocket* sd,
channel::output<std::future<result<shared_message>>> out)
{
std::promise<result<std::size_t>> promise;
std::future fut = promise.get_future();

pool.borrow_thread(
[prom = std::move(promise), out = std::move(out), sd]() mutable {
std::size_t accumulated = 0;
for (;;) {
std::optional fut = out.get();
if (!fut) { break; }
result p = fut->get();
if (p.holds_error()) {
prom.set_value(std::move(p.error_unchecked()));
return;
}

auto& val = p.value_unchecked();
auto size = val->message_size();
POOLMEM* msg = val->as_socket_message();

// technically we are overwriting part of message here
// but its only the "size" field of the message, which is not
// read/written to otherwise after making it a shared_message.
result ret = SendData(sd, msg, size);
if (ret.holds_error()) {
prom.set_value(std::move(ret.error_unchecked()));
return;
}

accumulated += ret.value_unchecked();
}
prom.set_value(accumulated);
});
return fut;
}

struct compression_context {
comp_stream_header ch;
uint32_t algorithm;
int level;
};

static result<shared_message> DoCompressMessage(compression_context& compctx,
const data_message& input)
{
auto data_size = RequiredCompressionOutputBufferSize(compctx.algorithm,
input.data_size());

auto msg = input.derived();
msg.resize(data_size);
result comp_size = ThreadlocalCompress(
compctx.algorithm, compctx.level, input.data_ptr(), input.data_size(),
msg.data_ptr() + sizeof(comp_stream_header),
msg.data_size() - sizeof(comp_stream_header));

if (comp_size.holds_error()) {
return std::move(comp_size.error_unchecked());
}

auto csize = comp_size.value_unchecked();

if (csize > std::numeric_limits<std::uint32_t>::max()) {
PoolMem error;
Mmsg(error, "Compressed size to big (%llu > %llu)", csize,
std::numeric_limits<std::uint32_t>::max());
return error;
}

{
// Write compression header
ser_declare;
SerBegin(msg.data_ptr(), sizeof(comp_stream_header));
ser_uint32(compctx.ch.magic);
ser_uint32(csize);
ser_uint16(compctx.ch.level);
ser_uint16(compctx.ch.version);
SerEnd(msg.data_ptr(), sizeof(comp_stream_header));
}

auto total_size = csize + sizeof(comp_stream_header);
ASSERT(total_size <= msg.data_size());
msg.resize(total_size);

return shared_message{new data_message{std::move(msg)}};
}

// Send the content of a file on anything but an EFS filesystem.
static inline bool SendPlainData(b_ctx& bctx)
{
Expand Down Expand Up @@ -1171,15 +1263,9 @@ static inline bool SendPlainData(b_ctx& bctx)
bool support_sparse = BitIsSet(FO_SPARSE, flags);
bool support_offsets = BitIsSet(FO_OFFSETS, flags);

struct compression_ctx {
comp_stream_header ch;
uint32_t algorithm;
int level;
};

std::optional<compression_ctx> compctx;
std::optional<compression_context> compctx;
if (BitIsSet(FO_COMPRESS, flags)) {
compctx = compression_ctx{
compctx = compression_context{
.ch = bctx.ch,
.algorithm = bctx.ff_pkt->Compress_algo,
.level = bctx.ff_pkt->Compress_level,
Expand All @@ -1201,43 +1287,11 @@ static inline bool SendPlainData(b_ctx& bctx)
compute_fin.notify_one();
});

using shared_message = std::shared_ptr<const data_message>;

// FIXME(ssura): How big should the buffer be ?
auto [in, out]
= channel::CreateBufferedChannel<std::future<result<shared_message>>>(
num_workers);

std::promise<result<std::size_t>> promise{};
std::future bytes_send_fut = promise.get_future();
threadpool.borrow_thread(
[prom = std::move(promise), out = std::move(out), sd]() mutable {
std::size_t accumulated = 0;
for (std::optional fut = out.get(); fut.has_value(); fut = out.get()) {
result p = fut->get();
if (p.holds_error()) {
prom.set_value(std::move(p.error_unchecked()));
return;
}

auto& val = p.value_unchecked();
auto size = val->message_size();
POOLMEM* msg = val->as_socket_message();

// technically we are overwriting part of message here
// but its only the "size" field of the message, which is not
// read/written to otherwise after making it a shared_message.
result ret = SendData(sd, msg, size);
if (ret.holds_error()) {
prom.set_value(std::move(ret));
return;
}

accumulated += ret.value_unchecked();
}
prom.set_value(accumulated);
return;
});
std::future bytes_send_fut = MakeSendThread(threadpool, sd, std::move(out));

DIGEST* checksum = bctx.digest;
DIGEST* signing = bctx.signing_digest;
Expand Down Expand Up @@ -1318,55 +1372,17 @@ static inline bool SendPlainData(b_ctx& bctx)
}));
}

std::future copy_fut
= (compctx) ? compute_group.submit(
[input = shared_msg,
compctx = compctx.value()]() mutable -> result<shared_message> {
auto data_size = RequiredCompressionOutputBufferSize(
compctx.algorithm, input->data_size());
auto msg = input->derived();
msg.resize(data_size);
result comp_size = ThreadlocalCompress(
compctx.algorithm, compctx.level, input->data_ptr(),
input->data_size(),
msg.data_ptr() + sizeof(comp_stream_header),
msg.data_size() - sizeof(comp_stream_header));

if (comp_size.holds_error()) {
return std::move(comp_size.error_unchecked());
}

auto csize = comp_size.value_unchecked();

if (csize > std::numeric_limits<std::uint32_t>::max()) {
PoolMem error;
Mmsg(error, "Compressed size to big (%llu > %llu)", csize,
std::numeric_limits<std::uint32_t>::max());
return error;
}

{
// Write compression header
ser_declare;
SerBegin(msg.data_ptr(), sizeof(comp_stream_header));
ser_uint32(compctx.ch.magic);
ser_uint32(csize);
ser_uint16(compctx.ch.level);
ser_uint16(compctx.ch.version);
SerEnd(msg.data_ptr(), sizeof(comp_stream_header));
}

auto total_size = csize + sizeof(comp_stream_header);
ASSERT(total_size <= msg.data_size());
msg.resize(total_size);

return shared_message{new data_message{std::move(msg)}};
})
: [shared_msg]() {
std::promise<result<shared_message>> prom;
prom.set_value(std::move(shared_msg));
return prom.get_future();
}();
std::future<result<shared_message>> copy_fut;
if (compctx) {
copy_fut = compute_group.submit(
[cctx = compctx.value(), shared_msg]() mutable {
return DoCompressMessage(cctx, *shared_msg.get());
});
} else {
std::promise<result<shared_message>> prom;
prom.set_value(std::move(shared_msg));
copy_fut = prom.get_future();
}

// Send the buffer to the Storage daemon
if (!in.emplace(std::move(copy_fut))) { goto bail_out; }
Expand Down

0 comments on commit f1551fe

Please sign in to comment.