Skip to content

Commit

Permalink
backup: enable parallel compression
Browse files Browse the repository at this point in the history
Instead of reading the file into the send buffer directly, we read it
into its own buffer that is shared with every worker thread.  Then we
either compress the file data into the send buffer or just memcpy it
-- in case compression is not enabled -- in a worker thread.
  • Loading branch information
sebsura committed Nov 6, 2023
1 parent 4910002 commit df2d2da
Showing 1 changed file with 106 additions and 54 deletions.
160 changes: 106 additions & 54 deletions core/src/filed/backup.cc
Expand Up @@ -54,6 +54,7 @@
#include "lib/channel.h"

#include <variant>
#include <cstring>

namespace filedaemon {

Expand Down Expand Up @@ -1107,17 +1108,28 @@ static inline bool SendPlainData(b_ctx& bctx)
// FIXME(ssura): change this
if (BitIsSet(FO_ENCRYPT, flags)) { return SendPlainDataSerially(bctx); }

if (BitIsSet(FO_COMPRESS, flags)) { return SendPlainDataSerially(bctx); }

bool retval = false;
BareosSocket* sd = bctx.jcr->store_bsock;

auto& bfd = bctx.ff_pkt->bfd;

bool support_sparse = BitIsSet(FO_SPARSE, flags);
bool support_offsets = BitIsSet(FO_OFFSETS, flags);
bool need_header = support_sparse || support_offsets;
auto header_size = need_header ? OFFSET_FADDR_SIZE : 0;

struct compression_ctx {
comp_stream_header ch;
uint32_t algorithm;
int level;
};

std::optional<compression_ctx> compctx;
if (BitIsSet(FO_COMPRESS, flags)) {
compctx = compression_ctx{
.ch = bctx.ch,
.algorithm = bctx.ff_pkt->Compress_algo,
.level = bctx.ff_pkt->Compress_level,
};
}

DIGEST* checksum = bctx.digest;
DIGEST* signing = bctx.signing_digest;
Expand All @@ -1130,7 +1142,7 @@ static inline bool SendPlainData(b_ctx& bctx)

// FIXME(ssura): How big should the buffer be ?
auto [in, out] = channel::CreateBufferedChannel<
std::pair<std::shared_ptr<POOLMEM>, std::size_t>>(5);
std::future<result<std::pair<PoolMem, std::size_t>>>>(5);

// FIXME(ssura): We have to wrap out in a shared_ptr, because borrow_thread
// needs a copyable function. Once we move to
Expand All @@ -1139,34 +1151,33 @@ static inline bool SendPlainData(b_ctx& bctx)
std::future<result<std::size_t>> bytes_send_fut = borrow_thread(
pool, [chunks = std::move(chunks), sd]() mutable -> result<size_t> {
std::size_t accumulated = 0;
for (std::optional pair = chunks->get(); pair.has_value();
pair = chunks->get()) {
auto [ptr, size] = pair.value();

result ret = SendData(sd, ptr.get(), size);
if (ret.holds_error()) {
return ret;
} else {
accumulated += ret.value_unchecked();
}
for (std::optional fut = chunks->get(); fut.has_value();
fut = chunks->get()) {
result p = fut->get();
if (p.holds_error()) { return std::move(p.error_unchecked()); }

auto [msg, size] = std::move(p.value_unchecked());
result ret = SendData(sd, msg.addr(), size);
if (ret.holds_error()) { return ret; }

accumulated += ret.value_unchecked();
}
return accumulated;
});


// Read the file data
for (;;) {
std::shared_ptr Buffer = std::make_shared<PoolMem>();
Buffer->check_size(header_size + max_buf_size);
char* header = need_header ? Buffer->c_str() : nullptr;
char* file_data = Buffer->c_str() + header_size;
ssize_t read_bytes = bread(&bfd, file_data, max_buf_size);
std::shared_ptr buf = std::make_shared<std::vector<char>>();
buf->resize(max_buf_size);
ssize_t read_bytes = bread(&bfd, buf->data(), buf->size());

if (read_bytes <= 0) break;

auto offset = bfd.offset;

size_t buf_size = static_cast<size_t>(read_bytes);
ASSERT(static_cast<std::size_t>(read_bytes) <= max_buf_size);
buf->resize(read_bytes);

if (update_digest) {
// updating the digest has to be done serially
Expand All @@ -1178,56 +1189,97 @@ static inline bool SendPlainData(b_ctx& bctx)
bool skip_block = false;

if (support_sparse
&& ((buf_size == max_buf_size
&& (bytes_read + buf_size < (uint64_t)file_size))
&& ((buf->size() == max_buf_size
&& (bytes_read + buf->size() < (uint64_t)file_size))
|| ((file_type == FT_RAW || file_type == FT_FIFO)
&& (file_size == 0)))
&& IsBufZero(file_data, max_buf_size)) {
&& IsBufZero(buf->data(), max_buf_size)) {
skip_block = true;
}

if (!skip_block) {
if (checksum || signing) {
std::shared_ptr<char> aliased(Buffer, file_data);
update_digest.emplace(
enqueue(pool, [checksum, signing, aliased, buf_size]() {
// Update checksum if requested
if (checksum) {
CryptoDigestUpdate(checksum, (uint8_t*)aliased.get(), buf_size);
}
update_digest.emplace(enqueue(pool, [checksum, signing, buf]() {
// Update checksum if requested
if (checksum) {
CryptoDigestUpdate(checksum, (uint8_t*)buf->data(), buf->size());
}

// Update signing digest if requested
if (signing) {
CryptoDigestUpdate(signing, (uint8_t*)aliased.get(), buf_size);
}
}));
// Update signing digest if requested
if (signing) {
CryptoDigestUpdate(signing, (uint8_t*)buf->data(), buf->size());
}
}));
}

std::optional<uint64_t> header;
if (support_sparse) {
ser_declare;
SerBegin(header, OFFSET_FADDR_SIZE);
ser_uint64(bytes_read); /* store fileAddr in begin of buffer */
SerEnd(header, OFFSET_FADDR_SIZE);
header = bytes_read;
} else if (support_offsets) {
ser_declare;
SerBegin(header, OFFSET_FADDR_SIZE);
ser_uint64(offset); /* store offset in begin of buffer */
SerEnd(header, OFFSET_FADDR_SIZE);
header = offset;
}

// Send the buffer to the Storage daemon
size_t total_length
= need_header
? header_size + buf_size /* include bytes_read/offset in size */
: buf_size;
std::future copy_fut = enqueue(
pool,
[header, buf,
compctx]() mutable -> result<std::pair<PoolMem, std::size_t>> {
auto header_size = header ? OFFSET_FADDR_SIZE : 0;
auto data_size = compctx ? RequiredCompressionOutputBufferSize(
compctx->algorithm, buf->size())
: buf->size();
auto total_size = header_size + data_size;
PoolMem msg;
msg.check_size(total_size);
char* header_data = header ? msg.c_str() : nullptr;
char* file_data = msg.c_str() + header_size;
if (header) {
ser_declare;
SerBegin(header_data, OFFSET_FADDR_SIZE);
ser_uint64(header.value()); /* store offset in begin of buffer */
SerEnd(header_data, OFFSET_FADDR_SIZE);
}
if (compctx) {
std::optional comp_size = ThreadlocalCompress(
compctx->algorithm, compctx->level, buf->data(), buf->size(),
file_data + sizeof(comp_stream_header),
data_size - sizeof(comp_stream_header));

if (!comp_size) {
PoolMem error{"compression error"};
return error;
}

if (!in.emplace(std::shared_ptr<POOLMEM>(Buffer, Buffer->addr()),
total_length)) {
goto bail_out;
}
if (*comp_size > std::numeric_limits<std::uint32_t>::max()) {
PoolMem error;
Mmsg(error, "Compressed size to big (%llu > %llu)", *comp_size,
std::numeric_limits<std::uint32_t>::max());
return error;
}

{
// Write compression header
ser_declare;
SerBegin(file_data, sizeof(comp_stream_header));
ser_uint32(compctx->ch.magic);
ser_uint32(*comp_size);
ser_uint16(compctx->ch.level);
ser_uint16(compctx->ch.version);
SerEnd(file_data, sizeof(comp_stream_header));
}

total_size
= header_size + *comp_size + sizeof(comp_stream_header);
} else {
std::memcpy(file_data, buf->data(), buf->size());
}
return std::make_pair(std::move(msg), total_size);
});

// Send the buffer to the Storage daemon
if (!in.emplace(std::move(copy_fut))) { goto bail_out; }
}

bytes_read += buf_size; /* count bytes read */
bytes_read += buf->size(); /* count bytes read */
}
retval = true;

Expand Down

0 comments on commit df2d2da

Please sign in to comment.