Skip to content

Commit

Permalink
backup: some further refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
sebsura committed Nov 6, 2023
1 parent 76eb082 commit ea08159
Showing 1 changed file with 55 additions and 71 deletions.
126 changes: 55 additions & 71 deletions core/src/filed/backup.cc
Expand Up @@ -1073,6 +1073,23 @@ static inline bool SendPlainDataSerially(b_ctx& bctx)
return retval;
}

static result<std::size_t> SendData(BareosSocket* sd,
POOLMEM* data,
size_t size)
{
sd->message_length = size;
sd->msg = data; /* set correct write buffer */

if (!sd->send()) {
PoolMem error;
Mmsg(error, "Network send error to SD. ERR=%s", sd->bstrerror());
return error;
}

Dmsg1(130, "Send data to SD len=%d\n", sd->message_length);
return size;
}

// Send the content of a file on anything but an EFS filesystem.
static inline bool SendPlainData(b_ctx& bctx)
{
Expand All @@ -1086,16 +1103,13 @@ static inline bool SendPlainData(b_ctx& bctx)
// parallel sending/checksumming/compression/etc.
// This is mostly because EncryptData() is weird!
// FIXME(ssura): change this
if (BitIsSet(FO_ENCRYPT, flags) || BitIsSet(FO_COMPRESS, flags)
|| file_size < static_cast<ssize_t>(2 * max_buf_size)) {
return SendPlainDataSerially(bctx);
}
if (BitIsSet(FO_ENCRYPT, flags)) { return SendPlainDataSerially(bctx); }

if (BitIsSet(FO_COMPRESS, flags)) { return SendPlainDataSerially(bctx); }

bool retval = false;
BareosSocket* sd = bctx.jcr->store_bsock;

auto file_addr = bctx.fileAddr;

auto& bfd = bctx.ff_pkt->bfd;

bool support_sparse = BitIsSet(FO_SPARSE, flags);
Expand All @@ -1106,7 +1120,10 @@ static inline bool SendPlainData(b_ctx& bctx)
DIGEST* checksum = bctx.digest;
DIGEST* signing = bctx.signing_digest;

size_t bytes_read{0};
size_t bytes_send{0};
// Read the file data

for (;;) {
PoolMem Buffer;
Buffer.check_size(header_size + max_buf_size);
Expand All @@ -1120,101 +1137,68 @@ static inline bool SendPlainData(b_ctx& bctx)

size_t buf_size = static_cast<size_t>(read_bytes);

// Update checksum if requested
if (checksum) {
CryptoDigestUpdate(checksum, (uint8_t*)file_data, buf_size);
}

// Update signing digest if requested
if (signing) { CryptoDigestUpdate(signing, (uint8_t*)file_data, buf_size); }


// Check for sparse blocks
if (support_sparse) {
bool allZeros = false;

if ((buf_size == max_buf_size
&& (file_addr + buf_size < (uint64_t)file_size))
&& (bytes_read + buf_size < (uint64_t)file_size))
|| ((file_type == FT_RAW || file_type == FT_FIFO)
&& (file_size == 0))) {
allZeros = IsBufZero(DataStart, max_buf_size);
allZeros = IsBufZero(file_data, max_buf_size);
}

if (!allZeros) {
// Put file address as first data in buffer
ser_declare;
SerBegin(Header, OFFSET_FADDR_SIZE);
ser_uint64(file_addr); /* store fileAddr in begin of buffer */
SerEnd(Header, OFFSET_FADDR_SIZE);
}

file_addr += buf_size; /* update file address */

// Skip block of all zeros
if (allZeros) { continue; }

// Put file address as first data in buffer
ser_declare;
SerBegin(header, OFFSET_FADDR_SIZE);
ser_uint64(bytes_read); /* store fileAddr in begin of buffer */
SerEnd(header, OFFSET_FADDR_SIZE);
} else if (support_offsets) {
ser_declare;
SerBegin(header, OFFSET_FADDR_SIZE);
ser_uint64(offset); /* store offset in begin of buffer */
SerEnd(header, OFFSET_FADDR_SIZE);
}

bctx.jcr->ReadBytes += buf_size; /* count bytes read */

// Update checksum if requested
if (checksum) {
CryptoDigestUpdate(checksum, (uint8_t*)file_data, buf_size);
}

// Update signing digest if requested
if (signing) { CryptoDigestUpdate(signing, (uint8_t*)file_data, buf_size); }

// Compress the data.
// if (BitIsSet(FO_COMPRESS, flags)) {
// if (!CompressData(bctx->jcr, bctx->ff_pkt->Compress_algo, bctx->rbuf,
// bctx->jcr->store_bsock->message_length, bctx->cbuf,
// bctx->max_compress_len, &bctx->compress_len)) {
// goto bail_out;
// }

// // See if we need to generate a compression header.
// if (bctx->chead) {
// ser_declare;

// // Complete header
// SerBegin(bctx->chead, sizeof(comp_stream_header));
// ser_uint32(bctx->ch.magic);
// ser_uint32(bctx->compress_len);
// ser_uint16(bctx->ch.level);
// ser_uint16(bctx->ch.version);
// SerEnd(bctx->chead, sizeof(comp_stream_header));

// bctx->compress_len += sizeof(comp_stream_header); /* add size of header
// */
// }

// bctx->jcr->store_bsock->message_length
// = bctx->compress_len; /* set compressed length */
// bctx->cipher_input_len = bctx->compress_len;
// }

// Send the buffer to the Storage daemon
if (need_header) {
sd->message_length
= header_size + buf_size; /* include file_addr/offset in size */
} else {
sd->message_length = buf_size;
}
size_t total_length
= (need_header)
? header_size + buf_size /* include bytes_read/offset in size */
: buf_size;

sd->msg = Buffer.addr(); /* set correct write buffer */
result sendres = SendData(sd, Buffer.addr(), total_length);

if (!sd->send()) {
if (auto* error = sendres.error()) {
if (!bctx.jcr->IsJobCanceled()) {
Jmsg1(bctx.jcr, M_FATAL, 0, _("Network send error to SD. ERR=%s\n"),
sd->bstrerror());
Jmsg1(bctx.jcr, M_FATAL, 0, "%s\n", error->c_str());
}
goto bail_out;
} else {
bytes_send
+= sendres.value_unchecked(); /* count bytes saved possibly
compressed/encrypted */
}

Dmsg1(130, "Send data to SD len=%d\n", sd->message_length);
bctx.jcr->JobBytes += sd->message_length; /* count bytes saved possibly
compressed/encrypted */
bytes_read += buf_size; /* count bytes read */
}
retval = true;

bail_out:
sd->msg = bctx.msgsave; /* restore read buffer */
bctx.jcr->ReadBytes += bytes_read; /* count bytes read */
bctx.jcr->JobBytes += bytes_send; /* count bytes read */
sd->msg = bctx.msgsave; /* restore read buffer */
return retval;
}

Expand Down

0 comments on commit ea08159

Please sign in to comment.