diff --git a/core/src/filed/backup.cc b/core/src/filed/backup.cc index 3381484fbe2..1023e09bdae 100644 --- a/core/src/filed/backup.cc +++ b/core/src/filed/backup.cc @@ -1073,6 +1073,23 @@ static inline bool SendPlainDataSerially(b_ctx& bctx) return retval; } +static result SendData(BareosSocket* sd, + POOLMEM* data, + size_t size) +{ + sd->message_length = size; + sd->msg = data; /* set correct write buffer */ + + if (!sd->send()) { + PoolMem error; + Mmsg(error, "Network send error to SD. ERR=%s", sd->bstrerror()); + return error; + } + + Dmsg1(130, "Send data to SD len=%d\n", sd->message_length); + return size; +} + // Send the content of a file on anything but an EFS filesystem. static inline bool SendPlainData(b_ctx& bctx) { @@ -1086,16 +1103,13 @@ static inline bool SendPlainData(b_ctx& bctx) // parallel sending/checksumming/compression/etc. // This is mostly because EncryptData() is weird! // FIXME(ssura): change this - if (BitIsSet(FO_ENCRYPT, flags) || BitIsSet(FO_COMPRESS, flags) - || file_size < static_cast(2 * max_buf_size)) { - return SendPlainDataSerially(bctx); - } + if (BitIsSet(FO_ENCRYPT, flags)) { return SendPlainDataSerially(bctx); } + + if (BitIsSet(FO_COMPRESS, flags)) { return SendPlainDataSerially(bctx); } bool retval = false; BareosSocket* sd = bctx.jcr->store_bsock; - auto file_addr = bctx.fileAddr; - auto& bfd = bctx.ff_pkt->bfd; bool support_sparse = BitIsSet(FO_SPARSE, flags); @@ -1106,7 +1120,10 @@ static inline bool SendPlainData(b_ctx& bctx) DIGEST* checksum = bctx.digest; DIGEST* signing = bctx.signing_digest; + size_t bytes_read{0}; + size_t bytes_send{0}; // Read the file data + for (;;) { PoolMem Buffer; Buffer.check_size(header_size + max_buf_size); @@ -1120,29 +1137,34 @@ static inline bool SendPlainData(b_ctx& bctx) size_t buf_size = static_cast(read_bytes); + // Update checksum if requested + if (checksum) { + CryptoDigestUpdate(checksum, (uint8_t*)file_data, buf_size); + } + + // Update signing digest if requested + if (signing) { CryptoDigestUpdate(signing, (uint8_t*)file_data, buf_size); } + + // Check for sparse blocks if (support_sparse) { bool allZeros = false; if ((buf_size == max_buf_size - && (file_addr + buf_size < (uint64_t)file_size)) + && (bytes_read + buf_size < (uint64_t)file_size)) || ((file_type == FT_RAW || file_type == FT_FIFO) && (file_size == 0))) { - allZeros = IsBufZero(DataStart, max_buf_size); + allZeros = IsBufZero(file_data, max_buf_size); } - if (!allZeros) { - // Put file address as first data in buffer - ser_declare; - SerBegin(Header, OFFSET_FADDR_SIZE); - ser_uint64(file_addr); /* store fileAddr in begin of buffer */ - SerEnd(Header, OFFSET_FADDR_SIZE); - } - - file_addr += buf_size; /* update file address */ - // Skip block of all zeros if (allZeros) { continue; } + + // Put file address as first data in buffer + ser_declare; + SerBegin(header, OFFSET_FADDR_SIZE); + ser_uint64(bytes_read); /* store fileAddr in begin of buffer */ + SerEnd(header, OFFSET_FADDR_SIZE); } else if (support_offsets) { ser_declare; SerBegin(header, OFFSET_FADDR_SIZE); @@ -1150,71 +1172,33 @@ static inline bool SendPlainData(b_ctx& bctx) SerEnd(header, OFFSET_FADDR_SIZE); } - bctx.jcr->ReadBytes += buf_size; /* count bytes read */ - - // Update checksum if requested - if (checksum) { - CryptoDigestUpdate(checksum, (uint8_t*)file_data, buf_size); - } - - // Update signing digest if requested - if (signing) { CryptoDigestUpdate(signing, (uint8_t*)file_data, buf_size); } - - // Compress the data. - // if (BitIsSet(FO_COMPRESS, flags)) { - // if (!CompressData(bctx->jcr, bctx->ff_pkt->Compress_algo, bctx->rbuf, - // bctx->jcr->store_bsock->message_length, bctx->cbuf, - // bctx->max_compress_len, &bctx->compress_len)) { - // goto bail_out; - // } - - // // See if we need to generate a compression header. - // if (bctx->chead) { - // ser_declare; - - // // Complete header - // SerBegin(bctx->chead, sizeof(comp_stream_header)); - // ser_uint32(bctx->ch.magic); - // ser_uint32(bctx->compress_len); - // ser_uint16(bctx->ch.level); - // ser_uint16(bctx->ch.version); - // SerEnd(bctx->chead, sizeof(comp_stream_header)); - - // bctx->compress_len += sizeof(comp_stream_header); /* add size of header - // */ - // } - - // bctx->jcr->store_bsock->message_length - // = bctx->compress_len; /* set compressed length */ - // bctx->cipher_input_len = bctx->compress_len; - // } - // Send the buffer to the Storage daemon - if (need_header) { - sd->message_length - = header_size + buf_size; /* include file_addr/offset in size */ - } else { - sd->message_length = buf_size; - } + size_t total_length + = (need_header) + ? header_size + buf_size /* include bytes_read/offset in size */ + : buf_size; - sd->msg = Buffer.addr(); /* set correct write buffer */ + result sendres = SendData(sd, Buffer.addr(), total_length); - if (!sd->send()) { + if (auto* error = sendres.error()) { if (!bctx.jcr->IsJobCanceled()) { - Jmsg1(bctx.jcr, M_FATAL, 0, _("Network send error to SD. ERR=%s\n"), - sd->bstrerror()); + Jmsg1(bctx.jcr, M_FATAL, 0, "%s\n", error->c_str()); } goto bail_out; + } else { + bytes_send + += sendres.value_unchecked(); /* count bytes saved possibly + compressed/encrypted */ } - Dmsg1(130, "Send data to SD len=%d\n", sd->message_length); - bctx.jcr->JobBytes += sd->message_length; /* count bytes saved possibly - compressed/encrypted */ + bytes_read += buf_size; /* count bytes read */ } retval = true; bail_out: - sd->msg = bctx.msgsave; /* restore read buffer */ + bctx.jcr->ReadBytes += bytes_read; /* count bytes read */ + bctx.jcr->JobBytes += bytes_send; /* count bytes read */ + sd->msg = bctx.msgsave; /* restore read buffer */ return retval; }