Skip to content

Commit

Permalink
Merge pull request #50775 from rschu1ze/non-experimental-qpl-deflate
Browse files Browse the repository at this point in the history
Mark QPL_DEFLATE non-experimental but default: off-by-default
  • Loading branch information
rschu1ze committed Jun 21, 2023
2 parents 2abd580 + bc7df2b commit 06e8590
Show file tree
Hide file tree
Showing 21 changed files with 182 additions and 18 deletions.
6 changes: 5 additions & 1 deletion docs/en/sql-reference/statements/create/table.md
Expand Up @@ -380,11 +380,15 @@ High compression levels are useful for asymmetric scenarios, like compress once,

`DEFLATE_QPL`[Deflate compression algorithm](https://github.com/intel/qpl) implemented by Intel® Query Processing Library. Some limitations apply:

- DEFLATE_QPL is experimental and can only be used after setting configuration parameter `allow_experimental_codecs=1`.
- DEFLATE_QPL is disabled by default and can only be used after setting configuration parameter `enable_deflate_qpl_codec = 1`.
- DEFLATE_QPL requires a ClickHouse build compiled with SSE 4.2 instructions (by default, this is the case). Refer to [Build Clickhouse with DEFLATE_QPL](/docs/en/development/building_and_benchmarking_deflate_qpl.md/#Build-Clickhouse-with-DEFLATE_QPL) for more details.
- DEFLATE_QPL works best if the system has a Intel® IAA (In-Memory Analytics Accelerator) offloading device. Refer to [Accelerator Configuration](https://intel.github.io/qpl/documentation/get_started_docs/installation.html#accelerator-configuration) and [Benchmark with DEFLATE_QPL](/docs/en/development/building_and_benchmarking_deflate_qpl.md/#Run-Benchmark-with-DEFLATE_QPL) for more details.
- DEFLATE_QPL-compressed data can only be transferred between ClickHouse nodes compiled with SSE 4.2 enabled.

:::note
DEFLATE_QPL is not available in ClickHouse Cloud.
:::

### Specialized Codecs

These codecs are designed to make compression more effective by using specific features of data. Some of these codecs do not compress data themself. Instead, they prepare the data for a common purpose codec, which compresses it better than without this preparation.
Expand Down
2 changes: 1 addition & 1 deletion src/Client/Connection.cpp
Expand Up @@ -588,7 +588,7 @@ void Connection::sendQuery(
if (method == "ZSTD")
level = settings->network_zstd_compression_level;

CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs, settings->enable_deflate_qpl_codec);
compression_codec = CompressionCodecFactory::instance().get(method, level);
}
else
Expand Down
11 changes: 11 additions & 0 deletions src/Compression/CompressionCodecDeflateQpl.cpp
Expand Up @@ -8,6 +8,7 @@
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include "libaccel_config.h"
#include <Common/MemorySanitizer.h>

namespace DB
{
Expand Down Expand Up @@ -382,6 +383,11 @@ UInt32 CompressionCodecDeflateQpl::getMaxCompressedDataSize(UInt32 uncompressed_

UInt32 CompressionCodecDeflateQpl::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
/// QPL library is using AVX-512 with some shuffle operations.
/// Memory sanitizer don't understand if there was uninitialized memory in SIMD register but it was not used in the result of shuffle.
#if defined(MEMORY_SANITIZER)
__msan_unpoison(dest, getMaxCompressedDataSize(source_size));
#endif
Int32 res = HardwareCodecDeflateQpl::RET_ERROR;
if (DeflateQplJobHWPool::instance().isJobPoolReady())
res = hw_codec->doCompressData(source, source_size, dest, getMaxCompressedDataSize(source_size));
Expand All @@ -392,6 +398,11 @@ UInt32 CompressionCodecDeflateQpl::doCompressData(const char * source, UInt32 so

void CompressionCodecDeflateQpl::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
/// QPL library is using AVX-512 with some shuffle operations.
/// Memory sanitizer don't understand if there was uninitialized memory in SIMD register but it was not used in the result of shuffle.
#if defined(MEMORY_SANITIZER)
__msan_unpoison(dest, uncompressed_size);
#endif
switch (getDecompressMode())
{
case CodecMode::Synchronous:
Expand Down
2 changes: 1 addition & 1 deletion src/Compression/CompressionCodecDeflateQpl.h
Expand Up @@ -98,7 +98,7 @@ class CompressionCodecDeflateQpl final : public ICompressionCodec
protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
bool isExperimental() const override { return true; }
bool isDeflateQpl() const override { return true; }

UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
Expand Down
4 changes: 2 additions & 2 deletions src/Compression/CompressionFactory.h
Expand Up @@ -40,10 +40,10 @@ class CompressionCodecFactory final : private boost::noncopyable
CompressionCodecPtr getDefaultCodec() const;

/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const;
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const;

/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const;

/// Get codec by AST and possible column_type. Some codecs can use
/// information about type to improve inner settings, but every codec should
Expand Down
14 changes: 10 additions & 4 deletions src/Compression/CompressionFactoryAdditions.cpp
Expand Up @@ -34,7 +34,7 @@ namespace ErrorCodes


void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const
{
if (family_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Compression codec name cannot be empty");
Expand All @@ -43,13 +43,13 @@ void CompressionCodecFactory::validateCodec(
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
{}, sanity_check, allow_experimental_codecs, enable_deflate_qpl_codec);
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
{}, sanity_check, allow_experimental_codecs, enable_deflate_qpl_codec);
}
}

Expand Down Expand Up @@ -77,7 +77,7 @@ bool innerDataTypeIsFloat(const DataTypePtr & type)
}

ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs, bool enable_deflate_qpl_codec) const
{
if (const auto * func = ast->as<ASTFunction>())
{
Expand Down Expand Up @@ -159,6 +159,12 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);

if (!enable_deflate_qpl_codec && result_codec->isDeflateQpl())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is disabled by default."
" You can enable it with the 'enable_deflate_qpl_codec' setting.",
codec_family_name);

codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}

Expand Down
3 changes: 3 additions & 0 deletions src/Compression/ICompressionCodec.h
Expand Up @@ -109,6 +109,9 @@ class ICompressionCodec : private boost::noncopyable
/// It will not be allowed to use unless the user will turn off the safety switch.
virtual bool isExperimental() const { return false; }

/// Is this the DEFLATE_QPL codec?
virtual bool isDeflateQpl() const { return false; }

/// If it does nothing.
virtual bool isNone() const { return false; }

Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -327,6 +327,7 @@ class IColumn;
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \
M(Bool, enable_deflate_qpl_codec, false, "Enable/disable the DEFLATE_QPL codec.", 0) \
M(UInt64, query_profiler_real_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/InterpreterCreateQuery.cpp
Expand Up @@ -571,6 +571,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(

bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
bool allow_experimental_codecs = attach || context_->getSettingsRef().allow_experimental_codecs;
bool enable_deflate_qpl_codec = attach || context_->getSettingsRef().enable_deflate_qpl_codec;

ColumnsDescription res;
auto name_type_it = column_names_and_types.begin();
Expand Down Expand Up @@ -631,7 +632,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.default_specifier == "ALIAS")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs);
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs, enable_deflate_qpl_codec);
}

if (col_decl.ttl)
Expand Down
2 changes: 1 addition & 1 deletion src/Server/TCPHandler.cpp
Expand Up @@ -1775,7 +1775,7 @@ void TCPHandler::initBlockOutput(const Block & block)

if (state.compression == Protocol::Compression::Enable)
{
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs, query_settings.enable_deflate_qpl_codec);

state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionCodecFactory::instance().get(method, level));
Expand Down
8 changes: 4 additions & 4 deletions src/Storages/AlterCommands.cpp
Expand Up @@ -388,7 +388,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
column.comment = *comment;

if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true, true);

column.ttl = ttl;

Expand Down Expand Up @@ -429,7 +429,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else
{
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true, true);

if (comment)
column.comment = *comment;
Expand Down Expand Up @@ -1067,7 +1067,7 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
"this column name is reserved for lightweight delete feature", backQuote(column_name));

if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);

all_columns.add(ColumnDescription(column_name, command.data_type));
}
Expand All @@ -1093,7 +1093,7 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
{
if (all_columns.hasAlias(column_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);
}
auto column_default = all_columns.getDefault(column_name);
if (column_default)
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ColumnsDescription.cpp
Expand Up @@ -130,7 +130,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
comment = col_ast->comment->as<ASTLiteral &>().value.get<String>();

if (col_ast->codec)
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true);
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true, true);

if (col_ast->ttl)
ttl = col_ast->ttl;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/Distributed/DistributedSink.cpp
Expand Up @@ -733,7 +733,7 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level;

CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs);
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs, settings.enable_deflate_qpl_codec);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);

/// tmp directory is used to ensure atomicity of transactions
Expand Down
1 change: 1 addition & 0 deletions src/Storages/System/StorageSystemBuildOptions.cpp.in
Expand Up @@ -64,6 +64,7 @@ const char * auto_config_build[]
"USE_ARROW", "@USE_ARROW@",
"USE_ORC", "@USE_ORC@",
"USE_MSGPACK", "@USE_MSGPACK@",
"USE_QPL", "@ENABLE_QPL@",
"GIT_HASH", "@GIT_HASH@",
"GIT_BRANCH", R"IRjaNsZIL9Yh7FQ4(@GIT_BRANCH@)IRjaNsZIL9Yh7FQ4",
"GIT_DATE", "@GIT_DATE@",
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/TTLDescription.cpp
Expand Up @@ -285,7 +285,7 @@ TTLDescription TTLDescription::getTTLFromAST(
{
result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);
}
}

Expand Down
1 change: 1 addition & 0 deletions tests/ci/stress.py
Expand Up @@ -20,6 +20,7 @@ def get_options(i, upgrade_check):
'''--db-engine="Replicated('/test/db/test_{}', 's1', 'r1')"'''.format(i)
)
client_options.append("allow_experimental_database_replicated=1")
client_options.append("enable_deflate_qpl_codec=1")

# If database name is not specified, new database is created for each functional test.
# Run some threads with one database for all tests.
Expand Down
@@ -0,0 +1,11 @@
<clickhouse>
<compression>
<case>
<!-- Conditions. All must be satisfied simultaneously. Some conditions may not be specified. -->
<min_part_size>0</min_part_size> <!-- The minimum size of a part in bytes. -->
<min_part_size_ratio>0</min_part_size_ratio> <!-- The minimum size of the part relative to all the data in the table. -->
<!-- Which compression method to choose. -->
<method>deflate_qpl</method>
</case>
</compression>
</clickhouse>
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<enable_deflate_qpl_codec>1</enable_deflate_qpl_codec>
</default>
</profiles>
</clickhouse>
63 changes: 63 additions & 0 deletions tests/integration/test_non_default_compression/test.py
Expand Up @@ -38,6 +38,14 @@
)
node6 = cluster.add_instance(
"node6",
main_configs=["configs/deflateqpl_compression_by_default.xml"],
user_configs=[
"configs/allow_suspicious_codecs.xml",
"configs/enable_deflateqpl_codec.xml",
],
)
node7 = cluster.add_instance(
"node7",
main_configs=["configs/allow_experimental_codecs.xml"],
user_configs=["configs/allow_suspicious_codecs.xml"],
)
Expand Down Expand Up @@ -244,3 +252,58 @@ def test_uncompressed_cache_plus_zstd_codec(start_cluster):
)
== "10000\n"
)


def test_preconfigured_deflateqpl_codec(start_cluster):
node6.query(
"""
CREATE TABLE compression_codec_multiple_with_key (
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12), LZ4HC(12), DEFLATE_QPL),
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, DEFLATE_QPL),
data String CODEC(ZSTD(2), LZ4HC, NONE, LZ4, LZ4, DEFLATE_QPL),
somecolumn Float64
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2;
"""
)
node6.query(
"INSERT INTO compression_codec_multiple_with_key VALUES(toDate('2018-10-12'), 100000, 'hello', 88.88), (toDate('2018-10-12'), 100002, 'world', 99.99), (toDate('2018-10-12'), 1111, '!', 777.777)"
)
assert (
node6.query(
"SELECT COUNT(*) FROM compression_codec_multiple_with_key WHERE id % 2 == 0"
)
== "2\n"
)
assert (
node6.query(
"SELECT DISTINCT somecolumn FROM compression_codec_multiple_with_key ORDER BY id"
)
== "777.777\n88.88\n99.99\n"
)
assert (
node6.query(
"SELECT data FROM compression_codec_multiple_with_key WHERE id >= 1112 AND somedate = toDate('2018-10-12') AND somecolumn <= 100"
)
== "hello\nworld\n"
)

node6.query(
"INSERT INTO compression_codec_multiple_with_key SELECT toDate('2018-10-12'), number, toString(number), 1.0 FROM system.numbers LIMIT 10000"
)

assert (
node6.query(
"SELECT COUNT(id) FROM compression_codec_multiple_with_key WHERE id % 10 == 0"
)
== "1001\n"
)
assert (
node6.query("SELECT SUM(somecolumn) FROM compression_codec_multiple_with_key")
== str(777.777 + 88.88 + 99.99 + 1.0 * 10000) + "\n"
)
assert (
node6.query(
"SELECT count(*) FROM compression_codec_multiple_with_key GROUP BY somedate"
)
== "10003\n"
)

0 comments on commit 06e8590

Please sign in to comment.