Skip to content

Commit

Permalink
backup: do not use preallocated buffers from bctx
Browse files Browse the repository at this point in the history
When we want to to this in parallel we clearly cannot use the buffers
in bctx but have to allocate our own.  As preperation for that we do
that even now in serial mode.
  • Loading branch information
sebsura committed Nov 6, 2023
1 parent 0c1b04f commit 76eb082
Showing 1 changed file with 37 additions and 19 deletions.
56 changes: 37 additions & 19 deletions core/src/filed/backup.cc
Expand Up @@ -1098,53 +1098,67 @@ static inline bool SendPlainData(b_ctx& bctx)

auto& bfd = bctx.ff_pkt->bfd;

bool support_sparse = BitIsSet(FO_SPARSE, flags);
bool support_offsets = BitIsSet(FO_OFFSETS, flags);
bool need_header = support_sparse || support_offsets;
auto header_size = need_header ? OFFSET_FADDR_SIZE : 0;

DIGEST* checksum = bctx.digest;
DIGEST* signing = bctx.signing_digest;

// Read the file data
while ((sd->message_length = (uint32_t)bread(&bfd, bctx.rbuf, bctx.rsize))
> 0) {
for (;;) {
PoolMem Buffer;
Buffer.check_size(header_size + max_buf_size);
char* header = need_header ? Buffer.c_str() : nullptr;
char* file_data = Buffer.c_str() + header_size;
ssize_t read_bytes = bread(&bfd, file_data, max_buf_size);

if (read_bytes <= 0) break;

auto offset = bfd.offset;

size_t buf_size = static_cast<size_t>(sd->message_length);
size_t buf_size = static_cast<size_t>(read_bytes);

// Check for sparse blocks
if (BitIsSet(FO_SPARSE, flags)) {
bool allZeros;
ser_declare;
if (support_sparse) {
bool allZeros = false;

allZeros = false;
if ((buf_size == max_buf_size
&& (file_addr + buf_size < (uint64_t)file_size))
|| ((file_type == FT_RAW || file_type == FT_FIFO)
&& (file_size == 0))) {
allZeros = IsBufZero(bctx.rbuf, bctx.rsize);
allZeros = IsBufZero(DataStart, max_buf_size);
}

if (!allZeros) {
// Put file address as first data in buffer
SerBegin(bctx.wbuf, OFFSET_FADDR_SIZE);
ser_declare;
SerBegin(Header, OFFSET_FADDR_SIZE);
ser_uint64(file_addr); /* store fileAddr in begin of buffer */
SerEnd(Header, OFFSET_FADDR_SIZE);
}

file_addr += buf_size; /* update file address */

// Skip block of all zeros
if (allZeros) { continue; }
} else if (BitIsSet(FO_OFFSETS, flags)) {
} else if (support_offsets) {
ser_declare;
SerBegin(bctx.wbuf, OFFSET_FADDR_SIZE);
SerBegin(header, OFFSET_FADDR_SIZE);
ser_uint64(offset); /* store offset in begin of buffer */
SerEnd(header, OFFSET_FADDR_SIZE);
}

bctx.jcr->ReadBytes += buf_size; /* count bytes read */

// Update checksum if requested
if (bctx.digest) {
CryptoDigestUpdate(bctx.digest, (uint8_t*)bctx.rbuf, buf_size);
if (checksum) {
CryptoDigestUpdate(checksum, (uint8_t*)file_data, buf_size);
}

// Update signing digest if requested
if (bctx.signing_digest) {
CryptoDigestUpdate(bctx.signing_digest, (uint8_t*)bctx.rbuf, buf_size);
}
if (signing) { CryptoDigestUpdate(signing, (uint8_t*)file_data, buf_size); }

// Compress the data.
// if (BitIsSet(FO_COMPRESS, flags)) {
Expand Down Expand Up @@ -1176,10 +1190,14 @@ static inline bool SendPlainData(b_ctx& bctx)
// }

// Send the buffer to the Storage daemon
if (BitIsSet(FO_SPARSE, flags) || BitIsSet(FO_OFFSETS, flags)) {
sd->message_length += OFFSET_FADDR_SIZE; /* include fileAddr in size */
if (need_header) {
sd->message_length
= header_size + buf_size; /* include file_addr/offset in size */
} else {
sd->message_length = buf_size;
}
sd->msg = bctx.wbuf; /* set correct write buffer */

sd->msg = Buffer.addr(); /* set correct write buffer */

if (!sd->send()) {
if (!bctx.jcr->IsJobCanceled()) {
Expand Down

0 comments on commit 76eb082

Please sign in to comment.