Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow explicitly set compression level in output format #58539

Merged
merged 7 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/en/operations/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -4771,6 +4771,24 @@ Type: Int64

Default: 0


## output_format_compression_level

Default compression level if query output is compressed. The setting is applied when `SELECT` query has `INTO OUTFILE` or when writing to table functions `file`, `url`, `hdfs`, `s3`, or `azureBlobStorage`.

Possible values: from `1` to `22`

Default: `3`


## output_format_compression_zstd_window_log

To be used when output compression method is `zstd`. If greater than `0`, this setting explicitly sets compression window size (expressed as power of `2`) and enable long-range mode for zstd compression. This can help to achieve better compression ratio.
canhld94 marked this conversation as resolved.
Show resolved Hide resolved

Possible values: non-negative numbers. Note that if the value is too small or too big, `zstdlib` will throw exception. Typical values are from `20` (window size = `1MB`) to `30` (window size = `1GB`).
canhld94 marked this conversation as resolved.
Show resolved Hide resolved

Default: `0`

## rewrite_count_distinct_if_with_count_distinct_implementation

Allows you to rewrite `countDistcintIf` with [count_distinct_implementation](#count_distinct_implementation) setting.
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ class IColumn;
M(Bool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
M(UInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
M(Bool, output_format_parallel_formatting, true, "Enable parallel formatting for some data formats.", 0) \
M(UInt64, output_format_compression_level, 3, "Default compression level if query output is compressed. The setting is applied when `SELECT` query has `INTO OUTFILE` or when inserting to table function `file`, `url`, `hdfs`, `s3`, and `azureBlobStorage`.", 0) \
M(UInt64, output_format_compression_zstd_window_log, 0, "To be used when output compression method is `zstd`. If greater than `0`, this setting explicitly sets compression window size (expressed as power of `2`) and enable long-range mode for zstd compression.", 0) \
canhld94 marked this conversation as resolved.
Show resolved Hide resolved
\
M(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \
Expand Down
4 changes: 2 additions & 2 deletions src/IO/CompressionMethod.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
}

std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, int zstd_window_log, size_t buf_size, char * existing_memory, size_t alignment)
{
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(nested), method, level, buf_size, existing_memory, alignment);
Expand All @@ -183,7 +183,7 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);

if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, zstd_window_log, buf_size, existing_memory, alignment);

if (method == CompressionMethod::Lz4)
return std::make_unique<Lz4DeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
Expand Down
1 change: 1 addition & 0 deletions src/IO/CompressionMethod.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
int zstd_window_log = 0,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
Expand Down
43 changes: 32 additions & 11 deletions src/IO/ZstdDeflatingWriteBuffer.cpp
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
#include <IO/ZstdDeflatingWriteBuffer.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>

namespace DB
{
namespace ErrorCodes
{
extern const int ZSTD_ENCODER_FAILED;
extern const int ILLEGAL_CODEC_PARAMETER;
}

static void setZstdParameter(ZSTD_CCtx * cctx, ZSTD_cParameter param, int value)
{
auto ret = ZSTD_CCtx_setParameter(cctx, param, value);
if (ZSTD_isError(ret))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"zstd stream encoder option setting failed: error code: {}; zstd version: {}",
ret,
ZSTD_VERSION_STRING);
}

ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
std::unique_ptr<WriteBuffer> out_, int compression_level, int window_log, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
size_t ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);
if (ZSTD_isError(ret))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"zstd stream encoder option setting failed: error code: {}; zstd version: {}",
ret, ZSTD_VERSION_STRING);
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(ret))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"zstd stream encoder option setting failed: error code: {}; zstd version: {}",
ret, ZSTD_VERSION_STRING);
setZstdParameter(cctx, ZSTD_c_compressionLevel, compression_level);

if (window_log > 0)
{
ZSTD_bounds window_log_bounds = ZSTD_cParam_getBounds(ZSTD_c_windowLog);
if (ZSTD_isError(window_log_bounds.error))
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "ZSTD windowLog parameter is not supported {}",
std::string(ZSTD_getErrorName(window_log_bounds.error)));
if (window_log > window_log_bounds.upperBound || window_log < window_log_bounds.lowerBound)
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER,
"ZSTD codec can't have window log more than {} and lower than {}, given {}",
toString(window_log_bounds.upperBound),
toString(window_log_bounds.lowerBound), toString(window_log));
setZstdParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1);
setZstdParameter(cctx, ZSTD_c_windowLog, window_log);
}

setZstdParameter(cctx, ZSTD_c_checksumFlag, 1);

input = {nullptr, 0, 0};
output = {nullptr, 0, 0};
Expand Down
1 change: 1 addition & 0 deletions src/IO/ZstdDeflatingWriteBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ZstdDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
int compression_level,
int window_log = 0,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
Expand Down
5 changes: 3 additions & 2 deletions src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1434,11 +1434,12 @@ void executeQuery(
const auto & compression_method_node = ast_query_with_output->compression->as<ASTLiteral &>();
compression_method = compression_method_node.value.safeGet<std::string>();
}

const auto & settings = context->getSettingsRef();
compressed_buffer = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromFile>(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT),
chooseCompressionMethod(out_file, compression_method),
/* compression level = */ 3
/* compression level = */ static_cast<int>(settings.output_format_compression_level),
/* zstd_window_log = */ static_cast<int>(settings.output_format_compression_zstd_window_log)
);
}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/Parquet/Write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ PODArray<char> & compress(PODArray<char> & source, PODArray<char> & scratch, Com
std::move(dest_buf),
method,
/*level*/ 3,
/*zstd_window_log*/ 0,
source.size(),
/*existing_memory*/ source.data());
chassert(compressed_buf->position() == source.data());
Expand Down
1 change: 1 addition & 0 deletions src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
std::make_unique<WriteBufferFromOStream>(*response_body_ostr),
compress ? compression_method : CompressionMethod::None,
compression_level,
0,
working_buffer.size(),
working_buffer.begin());
else
Expand Down
10 changes: 5 additions & 5 deletions src/Storages/HDFS/StorageHDFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,13 +723,13 @@ class HDFSSink : public SinkToStorage
const CompressionMethod compression_method)
: SinkToStorage(sample_block)
{
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHDFS>(
uri,
context->getGlobalContext()->getConfigRef(),
context->getSettingsRef().hdfs_replication,
context->getWriteSettings()),
compression_method, 3);
uri, context->getGlobalContext()->getConfigRef(), context->getSettingsRef().hdfs_replication, context->getWriteSettings()),
compression_method,
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context);
}

Expand Down
7 changes: 6 additions & 1 deletion src/Storages/StorageAzureBlob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,12 @@ class StorageAzureBlobSink : public SinkToStorage
, format_settings(format_settings_)
{
StoredObject object(blob_path);
write_buf = wrapWriteBufferWithCompressionMethod(object_storage->writeObject(object, WriteMode::Rewrite), compression_method, 3);
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
object_storage->writeObject(object, WriteMode::Rewrite),
compression_method,
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings);
}

Expand Down
8 changes: 6 additions & 2 deletions src/Storages/StorageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1485,8 +1485,12 @@ class StorageFileSink final : public SinkToStorage

/// In case of formats with prefixes if file is not empty we have already written prefix.
bool do_not_write_prefix = naked_buffer->size();

write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::move(naked_buffer),
compression_method,
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));

writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context, format_settings);
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ class StorageS3Sink : public SinkToStorage
blob_log->query_id = context->getCurrentQueryId();
}

const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(
configuration_.client,
Expand All @@ -944,7 +945,8 @@ class StorageS3Sink : public SinkToStorage
threadPoolCallbackRunner<void>(getIOThreadPool().get(), "S3ParallelWrite"),
context->getWriteSettings()),
compression_method,
3);
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer
= FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings);
}
Expand Down
5 changes: 3 additions & 2 deletions src/Storages/StorageURL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,12 @@ StorageURLSink::StorageURLSink(
Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config
);

const auto & settings = context->getSettingsRef();
write_buf = wrapWriteBufferWithCompressionMethod(
std::move(write_buffer),
compression_method,
3
);
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, format_settings);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1000000
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
INSERT INTO FUNCTION file('test_02961.csv', 'CSV', 'x UInt64', 'zstd') SELECT number FROM numbers(1000000) SETTINGS output_format_compression_level = 10, output_format_compression_zstd_window_log = 30, engine_file_truncate_on_insert = 1;
canhld94 marked this conversation as resolved.
Show resolved Hide resolved
-- Simple check that output_format_compression_zstd_window_log = 30 works
SELECT count() FROM file('test_02961.csv', 'CSV', 'x UInt64', 'zstd'); -- { serverError ZSTD_DECODER_FAILED }
SELECT count() FROM file('test_02961.csv', 'CSV', 'x UInt64', 'zstd') SETTINGS zstd_window_log_max = 30;