Skip to content

Commit

Permalink
backup: merge SendDataToSd into SendPlainData
Browse files Browse the repository at this point in the history
This should make it easier to implement parallelisation.
  • Loading branch information
sebsura committed Nov 6, 2023
1 parent 26aed57 commit 0c1b04f
Showing 1 changed file with 99 additions and 4 deletions.
103 changes: 99 additions & 4 deletions core/src/filed/backup.cc
Expand Up @@ -1078,30 +1078,125 @@ static inline bool SendPlainData(b_ctx& bctx)
{
std::size_t max_buf_size = bctx.rsize;

auto file_type = bctx.ff_pkt->type;
auto file_size = bctx.ff_pkt->statp.st_size;
auto* flags = bctx.ff_pkt->flags;

// Currently we do not support encryption while doing
// parallel sending/checksumming/compression/etc.
// This is mostly because EncryptData() is weird!
// FIXME(ssura): change this
if (BitIsSet(FO_ENCRYPT, flags)
if (BitIsSet(FO_ENCRYPT, flags) || BitIsSet(FO_COMPRESS, flags)
|| file_size < static_cast<ssize_t>(2 * max_buf_size)) {
return SendPlainDataSerially(bctx);
}

bool retval = false;
BareosSocket* sd = bctx.jcr->store_bsock;

auto file_addr = bctx.fileAddr;

auto& bfd = bctx.ff_pkt->bfd;

// Read the file data
while ((sd->message_length
= (uint32_t)bread(&bctx.ff_pkt->bfd, bctx.rbuf, bctx.rsize))
while ((sd->message_length = (uint32_t)bread(&bfd, bctx.rbuf, bctx.rsize))
> 0) {
if (!SendDataToSd(&bctx)) { goto bail_out; }
auto offset = bfd.offset;

size_t buf_size = static_cast<size_t>(sd->message_length);

// Check for sparse blocks
if (BitIsSet(FO_SPARSE, flags)) {
bool allZeros;
ser_declare;

allZeros = false;
if ((buf_size == max_buf_size
&& (file_addr + buf_size < (uint64_t)file_size))
|| ((file_type == FT_RAW || file_type == FT_FIFO)
&& (file_size == 0))) {
allZeros = IsBufZero(bctx.rbuf, bctx.rsize);
}

if (!allZeros) {
// Put file address as first data in buffer
SerBegin(bctx.wbuf, OFFSET_FADDR_SIZE);
ser_uint64(file_addr); /* store fileAddr in begin of buffer */
}

file_addr += buf_size; /* update file address */

// Skip block of all zeros
if (allZeros) { continue; }
} else if (BitIsSet(FO_OFFSETS, flags)) {
ser_declare;
SerBegin(bctx.wbuf, OFFSET_FADDR_SIZE);
ser_uint64(offset); /* store offset in begin of buffer */
}

bctx.jcr->ReadBytes += buf_size; /* count bytes read */

// Update checksum if requested
if (bctx.digest) {
CryptoDigestUpdate(bctx.digest, (uint8_t*)bctx.rbuf, buf_size);
}

// Update signing digest if requested
if (bctx.signing_digest) {
CryptoDigestUpdate(bctx.signing_digest, (uint8_t*)bctx.rbuf, buf_size);
}

// Compress the data.
// if (BitIsSet(FO_COMPRESS, flags)) {
// if (!CompressData(bctx->jcr, bctx->ff_pkt->Compress_algo, bctx->rbuf,
// bctx->jcr->store_bsock->message_length, bctx->cbuf,
// bctx->max_compress_len, &bctx->compress_len)) {
// goto bail_out;
// }

// // See if we need to generate a compression header.
// if (bctx->chead) {
// ser_declare;

// // Complete header
// SerBegin(bctx->chead, sizeof(comp_stream_header));
// ser_uint32(bctx->ch.magic);
// ser_uint32(bctx->compress_len);
// ser_uint16(bctx->ch.level);
// ser_uint16(bctx->ch.version);
// SerEnd(bctx->chead, sizeof(comp_stream_header));

// bctx->compress_len += sizeof(comp_stream_header); /* add size of header
// */
// }

// bctx->jcr->store_bsock->message_length
// = bctx->compress_len; /* set compressed length */
// bctx->cipher_input_len = bctx->compress_len;
// }

// Send the buffer to the Storage daemon
if (BitIsSet(FO_SPARSE, flags) || BitIsSet(FO_OFFSETS, flags)) {
sd->message_length += OFFSET_FADDR_SIZE; /* include fileAddr in size */
}
sd->msg = bctx.wbuf; /* set correct write buffer */

if (!sd->send()) {
if (!bctx.jcr->IsJobCanceled()) {
Jmsg1(bctx.jcr, M_FATAL, 0, _("Network send error to SD. ERR=%s\n"),
sd->bstrerror());
}
goto bail_out;
}

Dmsg1(130, "Send data to SD len=%d\n", sd->message_length);
bctx.jcr->JobBytes += sd->message_length; /* count bytes saved possibly
compressed/encrypted */
}
retval = true;

bail_out:
sd->msg = bctx.msgsave; /* restore read buffer */
return retval;
}

Expand Down

0 comments on commit 0c1b04f

Please sign in to comment.