From 331b418b440b1ca9a17bca0853fc7a4ffc405ad8 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 16 Jan 2025 14:15:37 -0300 Subject: [PATCH 1/3] impl --- src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 1 + .../Formats/Impl/Parquet/PrepareForWrite.cpp | 1 + src/Processors/Formats/Impl/Parquet/Write.cpp | 7 ++++--- src/Processors/Formats/Impl/Parquet/Write.h | 2 ++ .../Formats/Impl/ParquetBlockOutputFormat.cpp | 9 ++++++++- ...1_parquet_arrow_orc_compressions.reference | 1 + .../02581_parquet_arrow_orc_compressions.sh | 1 + ...parquet_output_compression_level.reference | 2 ++ ...03276_parquet_output_compression_level.sql | 19 +++++++++++++++++++ 10 files changed, 40 insertions(+), 4 deletions(-) create mode 100644 tests/queries/0_stateless/03276_parquet_output_compression_level.reference create mode 100644 tests/queries/0_stateless/03276_parquet_output_compression_level.sql diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 752741cd9e19..c2c105d988d7 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -171,6 +171,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes; format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method; + format_settings.parquet.output_compression_level = settings.output_format_compression_level; format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types; format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder; format_settings.parquet.parallel_encoding = settings.output_format_parquet_parallel_encoding; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 276d83328dba..ed68e36e41bd 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -282,6 +282,7 @@ struct FormatSettings size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256; ParquetVersion output_version; ParquetCompression output_compression_method = ParquetCompression::SNAPPY; + uint64_t output_compression_level; bool output_compliant_nested_types = true; size_t data_page_size = 1024 * 1024; size_t write_batch_size = 1024; diff --git a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp index ce859b38b3c4..2e2ff216a72d 100644 --- a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp +++ b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp @@ -232,6 +232,7 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin auto & state = states.emplace_back(); state.primitive_column = column; state.compression = options.compression; + state.compression_level = options.compression_level; state.column_chunk.__isset.meta_data = true; state.column_chunk.meta_data.__set_path_in_schema({name}); diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index b1e231d7749a..bcbc46df21e4 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -390,7 +390,7 @@ struct ConverterDecimal }; /// Returns either `source` or `scratch`. -PODArray & compress(PODArray & source, PODArray & scratch, CompressionMethod method) +PODArray & compress(PODArray & source, PODArray & scratch, CompressionMethod method, int level) { /// We could use wrapWriteBufferWithCompressionMethod() for everything, but I worry about the /// overhead of creating a bunch of WriteBuffers on each page (thousands of values). @@ -447,6 +447,7 @@ PODArray & compress(PODArray & source, PODArray & scratch, Com auto compressed_buf = wrapWriteBufferWithCompressionMethod( std::move(dest_buf), method, + level, /*level*/ 3, /*zstd_window_log*/ 0, source.size(), @@ -575,7 +576,7 @@ void writeColumnImpl( throw Exception(ErrorCodes::CANNOT_COMPRESS, "Uncompressed page is too big: {}", encoded.size()); size_t uncompressed_size = encoded.size(); - auto & compressed = compress(encoded, compressed_maybe, s.compression); + auto & compressed = compress(encoded, compressed_maybe, s.compression, s.compression_level); if (compressed.size() > INT32_MAX) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Compressed page is too big: {}", compressed.size()); @@ -626,7 +627,7 @@ void writeColumnImpl( encoded.resize(static_cast(dict_size)); dict_encoder->WriteDict(reinterpret_cast(encoded.data())); - auto & compressed = compress(encoded, compressed_maybe, s.compression); + auto & compressed = compress(encoded, compressed_maybe, s.compression, s.compression_level); if (compressed.size() > INT32_MAX) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Compressed dictionary page is too big: {}", compressed.size()); diff --git a/src/Processors/Formats/Impl/Parquet/Write.h b/src/Processors/Formats/Impl/Parquet/Write.h index f162984fd5ed..1a0a60a5b5b6 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.h +++ b/src/Processors/Formats/Impl/Parquet/Write.h @@ -19,6 +19,7 @@ struct WriteOptions bool output_fixed_string_as_fixed_byte_array = true; CompressionMethod compression = CompressionMethod::Lz4; + int compression_level = 3; size_t data_page_size = 1024 * 1024; size_t write_batch_size = 1024; @@ -43,6 +44,7 @@ struct ColumnChunkWriteState ColumnPtr primitive_column; CompressionMethod compression; // must match what's inside column_chunk + int compression_level = 3; Int64 datetime64_multiplier = 1; // for converting e.g. seconds to milliseconds bool is_bool = false; // bool vs UInt8 have the same column type but are encoded differently diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp index 645f16c6abe5..96d47b2ffb96 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -95,6 +95,7 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo case C::GZIP: options.compression = CompressionMethod::Gzip; break; case C::BROTLI: options.compression = CompressionMethod::Brotli; break; } + options.compression_level = static_cast(format_settings.parquet.output_compression_level); options.output_string_as_string = format_settings.parquet.output_string_as_string; options.output_fixed_string_as_fixed_byte_array = format_settings.parquet.output_fixed_string_as_fixed_byte_array; options.data_page_size = format_settings.parquet.data_page_size; @@ -322,7 +323,13 @@ void ParquetBlockOutputFormat::writeUsingArrow(std::vector chunks) parquet::WriterProperties::Builder builder; builder.version(getParquetVersion(format_settings)); - builder.compression(getParquetCompression(format_settings.parquet.output_compression_method)); + auto compression_codec = getParquetCompression(format_settings.parquet.output_compression_method); + builder.compression(compression_codec); + + if (arrow::util::Codec::SupportsCompressionLevel(compression_codec)) + { + builder.compression_level(static_cast(format_settings.parquet.output_compression_level)); + } // write page index is disable at default. if (format_settings.parquet.write_page_index) builder.enable_write_page_index(); diff --git a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference index 492b12dba563..3d385dcfb465 100644 --- a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference +++ b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference @@ -12,3 +12,4 @@ 10 10 10 +10 diff --git a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh index d00026d516a3..26ff30495abc 100755 --- a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh +++ b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh @@ -10,6 +10,7 @@ set -o pipefail $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='none'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='lz4'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='snappy'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" +$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='snappy', output_format_parquet_use_custom_encoder=0" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='zstd'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='brotli'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='gzip'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" diff --git a/tests/queries/0_stateless/03276_parquet_output_compression_level.reference b/tests/queries/0_stateless/03276_parquet_output_compression_level.reference new file mode 100644 index 000000000000..6ed281c757a9 --- /dev/null +++ b/tests/queries/0_stateless/03276_parquet_output_compression_level.reference @@ -0,0 +1,2 @@ +1 +1 diff --git a/tests/queries/0_stateless/03276_parquet_output_compression_level.sql b/tests/queries/0_stateless/03276_parquet_output_compression_level.sql new file mode 100644 index 000000000000..27e096a632a2 --- /dev/null +++ b/tests/queries/0_stateless/03276_parquet_output_compression_level.sql @@ -0,0 +1,19 @@ +-- Tags: no-parallel, no-fasttest + +insert into function file(03276_parquet_custom_encoder_compression_level_1.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=1, output_format_parquet_use_custom_encoder=1, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); +insert into function file(03276_parquet_custom_encoder_compression_level_22.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=22, output_format_parquet_use_custom_encoder=1, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); + +WITH + (SELECT total_compressed_size FROM file(03276_parquet_custom_encoder_compression_level_1.parquet, ParquetMetadata)) AS size_level_1, + (SELECT total_compressed_size FROM file(03276_parquet_custom_encoder_compression_level_22.parquet, ParquetMetadata)) AS size_level_22 +SELECT + size_level_22 < size_level_1 AS compression_higher_level_produces_smaller_file; + +insert into function file(03276_parquet_arrow_encoder_compression_level_1.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=1, output_format_parquet_use_custom_encoder=0, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); +insert into function file(03276_parquet_arrow_encoder_compression_level_22.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=22, output_format_parquet_use_custom_encoder=0, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); + +WITH + (SELECT total_compressed_size FROM file(03276_parquet_arrow_encoder_compression_level_1.parquet, ParquetMetadata)) AS size_level_1, + (SELECT total_compressed_size FROM file(03276_parquet_arrow_encoder_compression_level_22.parquet, ParquetMetadata)) AS size_level_22 +SELECT + size_level_22 < size_level_1 AS compression_higher_level_produces_smaller_file; From 361c305336db2ad9bb141bf0c7bd8e2e4578a431 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 17 Jan 2025 10:40:14 -0300 Subject: [PATCH 2/3] attempt to fix forrmat settings --- src/Formats/FormatFactory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index c2c105d988d7..1d101d9b97ca 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -171,7 +171,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes; format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method; - format_settings.parquet.output_compression_level = settings.output_format_compression_level; + format_settings.parquet.output_compression_level = context->getSettingsRef().output_format_compression_level; format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types; format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder; format_settings.parquet.parallel_encoding = settings.output_format_parquet_parallel_encoding; From 27000346824681ca0da9a84da8eea73daf42f18f Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 21 Jan 2025 17:37:17 +0100 Subject: [PATCH 3/3] Attempt to fix build --- src/Processors/Formats/Impl/Parquet/Write.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index bcbc46df21e4..d0f68148ff19 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -448,7 +448,6 @@ PODArray & compress(PODArray & source, PODArray & scratch, Com std::move(dest_buf), method, level, - /*level*/ 3, /*zstd_window_log*/ 0, source.size(), /*existing_memory*/ source.data());