Skip to content

Commit

Permalink
compression: add error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
sebsura committed Nov 6, 2023
1 parent 7f2c32d commit c20b8b1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 66 deletions.
18 changes: 9 additions & 9 deletions core/src/filed/backup.cc
Expand Up @@ -1201,19 +1201,20 @@ static inline bool SendPlainData(b_ctx& bctx)
SerEnd(header_data, OFFSET_FADDR_SIZE);
}
if (compctx) {
std::optional comp_size = ThreadlocalCompress(
result comp_size = ThreadlocalCompress(
compctx->algorithm, compctx->level, buf->data(), buf->size(),
file_data + sizeof(comp_stream_header),
data_size - sizeof(comp_stream_header));

if (!comp_size) {
PoolMem error{"compression error"};
return error;
if (comp_size.holds_error()) {
return std::move(comp_size.error_unchecked());
}

if (*comp_size > std::numeric_limits<std::uint32_t>::max()) {
auto csize = comp_size.value_unchecked();

if (csize > std::numeric_limits<std::uint32_t>::max()) {
PoolMem error;
Mmsg(error, "Compressed size to big (%llu > %llu)", *comp_size,
Mmsg(error, "Compressed size to big (%llu > %llu)", csize,
std::numeric_limits<std::uint32_t>::max());
return error;
}
Expand All @@ -1223,14 +1224,13 @@ static inline bool SendPlainData(b_ctx& bctx)
ser_declare;
SerBegin(file_data, sizeof(comp_stream_header));
ser_uint32(compctx->ch.magic);
ser_uint32(*comp_size);
ser_uint32(csize);
ser_uint16(compctx->ch.level);
ser_uint16(compctx->ch.version);
SerEnd(file_data, sizeof(comp_stream_header));
}

total_size
= header_size + *comp_size + sizeof(comp_stream_header);
total_size = header_size + csize + sizeof(comp_stream_header);
} else {
std::memcpy(file_data, buf->data(), buf->size());
}
Expand Down
106 changes: 56 additions & 50 deletions core/src/lib/compression.cc
Expand Up @@ -35,6 +35,7 @@
#include "include/streams.h"
#include "lib/edit.h"
#include "lib/serial.h"
#include "lib/compression.h"

#ifdef HAVE_LIBZ
# include <zlib.h>
Expand Down Expand Up @@ -132,7 +133,7 @@ std::size_t RequiredCompressionOutputBufferSize(uint32_t algo,
class z4_compressor {
zfast_stream pZfastStream{};
bool init_error{false};
bool error{false};
std::optional<PoolMem> error{};

public:
z4_compressor(int type, zfast_stream_compressor compressor)
Expand All @@ -144,23 +145,22 @@ class z4_compressor {

if (auto zstat = fastlzlibCompressInit(&pZfastStream, type);
zstat != Z_OK) {
// Jmsg(jcr, M_FATAL, 0, _("Failed to initialize FASTLZ compression\n"));
init_error = true;
error = true;
error.emplace("Failed to initialize FASTLZ compression");
}

if (auto zstat = fastlzlibSetCompressor(&pZfastStream, compressor);
zstat != Z_OK) {
error = true;
error.emplace("Failed to set FASTLZ compressor");
}
}

std::optional<std::size_t> compress(char const* input,
std::size_t size,
char* output,
std::size_t capacity)
result<std::size_t> compress(char const* input,
std::size_t size,
char* output,
std::size_t capacity)
{
if (error) { return std::nullopt; }
if (error) { return PoolMem{error->c_str()}; }

pZfastStream.next_in = (Bytef*)input;
pZfastStream.avail_in = size;
Expand All @@ -169,14 +169,15 @@ class z4_compressor {

if (auto zstat = fastlzlibCompress(&pZfastStream, Z_FINISH);
zstat != Z_STREAM_END) {
// Jmsg(jcr, M_FATAL, 0, _("Compression fastlzlibCompress error: %d\n"),
// zstat);
// jcr->setJobStatusWithPriorityCheck(JS_ErrorTerminated);
return std::nullopt;
PoolMem errmsg;
Mmsg(errmsg, "Compression fastlzlibCompress error: %d\n", zstat);
return errmsg;
}

std::size_t out = pZfastStream.total_out;
if (fastlzlibCompressReset(&pZfastStream)) { error = true; }
if (fastlzlibCompressReset(&pZfastStream)) {
error.emplace("Failed to reset fastzlib");
}

ASSERT(out <= capacity);
return out;
Expand All @@ -194,7 +195,7 @@ class z4_compressor {
class gzip_compressor {
z_stream zlibStream{};
bool init_error{false};
bool error{false};
std::optional<PoolMem> error{};

public:
gzip_compressor()
Expand All @@ -206,7 +207,7 @@ class gzip_compressor {

if (deflateInit(&zlibStream, Z_DEFAULT_COMPRESSION) != Z_OK) {
init_error = true;
error = true;
error.emplace("Failed to initialize zlib.");
}
}

Expand All @@ -216,37 +217,33 @@ class gzip_compressor {

if (auto zstat = deflateParams(&zlibStream, level, Z_DEFAULT_STRATEGY);
zstat != Z_OK) {
error = true;
error.emplace("Failed to set zlib params.");
}

return !error;
}

std::optional<std::size_t> compress(char const* input,
std::size_t size,
char* output,
std::size_t capacity)
result<std::size_t> compress(char const* input,
std::size_t size,
char* output,
std::size_t capacity)
{
zlibStream.next_in = (Bytef*)input;
zlibStream.avail_in = size;
zlibStream.next_out = (Bytef*)output;
zlibStream.avail_out = capacity;

if (auto zstat = deflate(&zlibStream, Z_FINISH); zstat != Z_STREAM_END) {
// Jmsg(jcr, M_FATAL, 0, _("Compression deflate error: %d\n"), zstat);
// jcr->setJobStatusWithPriorityCheck(JS_ErrorTerminated);
error = true;
return std::nullopt;
Mmsg(error.emplace(), "Compression deflate error: %d\n", zstat);
return PoolMem{error->c_str()};
}

std::size_t compress_len = zlibStream.total_out;

// Reset zlib stream to be able to begin from scratch again
if (auto zstat = deflateReset(&zlibStream); zstat != Z_OK) {
// Jmsg(jcr, M_FATAL, 0, _("Compression deflateReset error: %d\n"),
// zstat); jcr->setJobStatusWithPriorityCheck(JS_ErrorTerminated);
error = true;
return std::nullopt;
Mmsg(error.emplace(), "Compression deflateReset error: %d\n", zstat);
return PoolMem{error->c_str()};
}

Dmsg2(400, "GZIP compressed len=%d uncompressed len=%d\n", compress_len,
Expand All @@ -264,53 +261,60 @@ class gzip_compressor {

#ifdef HAVE_LZO
class lzo_compressor {
lzo_voidp lzoMem;
int error{false};
lzo_voidp lzoMem{nullptr};
bool init_error{false};
std::optional<PoolMem> error{};

public:
lzo_compressor()
{
error = lzo_init() != LZO_E_OK;
lzoMem = static_cast<lzo_voidp>(malloc(LZO1X_1_MEM_COMPRESS));
if (lzo_init() != LZO_E_OK) {
init_error = true;
error.emplace("Failed to init lzo.");
} else {
lzoMem = static_cast<lzo_voidp>(malloc(LZO1X_1_MEM_COMPRESS));
}
}

std::optional<std::size_t> compress(char const* input,
std::size_t size,
char* output,
std::size_t capacity)
result<std::size_t> compress(char const* input,
std::size_t size,
char* output,
std::size_t capacity)
{
if (error) return std::nullopt;
if (error) return PoolMem{error->c_str()};
memset(lzoMem, 0, LZO1X_1_MEM_COMPRESS);
lzo_uint len = 0;

// Dmsg3(400, "cbuf=0x%x rbuf=0x%x len=%u\n", cbuf, rbuf, rsize);
Dmsg3(400, "cbuf=0x%x rbuf=0x%x len=%u\n", output, input, size);

int lzores = lzo1x_1_compress(
reinterpret_cast<const unsigned char*>(input), size,
reinterpret_cast<unsigned char*>(output), &len, lzoMem);

if (lzores != LZO_E_OK || len > capacity) {
// This should NEVER happen
// Jmsg(jcr, M_FATAL, 0, _("Compression LZO error: %d\n"), lzores);
// jcr->setJobStatusWithPriorityCheck(JS_ErrorTerminated);
return std::nullopt;
Mmsg(error.emplace(), "Compression LZO error: %d\n", lzores);
return PoolMem{error->c_str()};
}

Dmsg2(400, "LZO compressed len=%d uncompressed len=%d\n", len, size);

return len;
}

~lzo_compressor() { free(lzoMem); }
~lzo_compressor()
{
if (!init_error) { free(lzoMem); }
}
};
#endif

std::optional<std::size_t> ThreadlocalCompress(uint32_t algo,
uint32_t level,
char const* input,
std::size_t size,
char* output,
std::size_t capacity)
result<std::size_t> ThreadlocalCompress(uint32_t algo,
uint32_t level,
char const* input,
std::size_t size,
char* output,
std::size_t capacity)
{
switch (algo) {
#ifdef HAVE_LIBZ
Expand Down Expand Up @@ -340,7 +344,9 @@ std::optional<std::size_t> ThreadlocalCompress(uint32_t algo,
} break;
}

return std::nullopt;
PoolMem errmsg;
Mmsg(errmsg, "Unknown compression algorithm: %d", algo);
return errmsg;
}

bool SetupCompressionBuffers(JobControlRecord* jcr,
Expand Down
14 changes: 7 additions & 7 deletions core/src/lib/compression.h
Expand Up @@ -21,7 +21,7 @@
#ifndef BAREOS_LIB_COMPRESSION_H_
#define BAREOS_LIB_COMPRESSION_H_

#include <optional>
#include "lib/util.h"

const char* cmprs_algo_to_text(uint32_t compression_algorithm);

Expand All @@ -34,12 +34,12 @@ bool SetupDecompressionBuffers(JobControlRecord* jcr,

// return the number of bytes written to the output on success
// or std::nullopt on error
std::optional<std::size_t> ThreadlocalCompress(uint32_t algo,
uint32_t level,
char const* input,
std::size_t size,
char* output,
std::size_t capacity);
result<std::size_t> ThreadlocalCompress(uint32_t algo,
uint32_t level,
char const* input,
std::size_t size,
char* output,
std::size_t capacity);

std::size_t RequiredCompressionOutputBufferSize(uint32_t algo,
std::size_t max_input_size);
Expand Down

0 comments on commit c20b8b1

Please sign in to comment.