diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 752741cd9e19..1d101d9b97ca 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -171,6 +171,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size; format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes; format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method; + format_settings.parquet.output_compression_level = context->getSettingsRef().output_format_compression_level; format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types; format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder; format_settings.parquet.parallel_encoding = settings.output_format_parquet_parallel_encoding; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 276d83328dba..ed68e36e41bd 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -282,6 +282,7 @@ struct FormatSettings size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256; ParquetVersion output_version; ParquetCompression output_compression_method = ParquetCompression::SNAPPY; + uint64_t output_compression_level; bool output_compliant_nested_types = true; size_t data_page_size = 1024 * 1024; size_t write_batch_size = 1024; diff --git a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp index ce859b38b3c4..2e2ff216a72d 100644 --- a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp +++ b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp @@ -232,6 +232,7 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin auto & state = states.emplace_back(); state.primitive_column = column; state.compression = options.compression; + state.compression_level = options.compression_level; state.column_chunk.__isset.meta_data = true; state.column_chunk.meta_data.__set_path_in_schema({name}); diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index b1e231d7749a..d0f68148ff19 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -390,7 +390,7 @@ struct ConverterDecimal }; /// Returns either `source` or `scratch`. -PODArray & compress(PODArray & source, PODArray & scratch, CompressionMethod method) +PODArray & compress(PODArray & source, PODArray & scratch, CompressionMethod method, int level) { /// We could use wrapWriteBufferWithCompressionMethod() for everything, but I worry about the /// overhead of creating a bunch of WriteBuffers on each page (thousands of values). @@ -447,7 +447,7 @@ PODArray & compress(PODArray & source, PODArray & scratch, Com auto compressed_buf = wrapWriteBufferWithCompressionMethod( std::move(dest_buf), method, - /*level*/ 3, + level, /*zstd_window_log*/ 0, source.size(), /*existing_memory*/ source.data()); @@ -575,7 +575,7 @@ void writeColumnImpl( throw Exception(ErrorCodes::CANNOT_COMPRESS, "Uncompressed page is too big: {}", encoded.size()); size_t uncompressed_size = encoded.size(); - auto & compressed = compress(encoded, compressed_maybe, s.compression); + auto & compressed = compress(encoded, compressed_maybe, s.compression, s.compression_level); if (compressed.size() > INT32_MAX) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Compressed page is too big: {}", compressed.size()); @@ -626,7 +626,7 @@ void writeColumnImpl( encoded.resize(static_cast(dict_size)); dict_encoder->WriteDict(reinterpret_cast(encoded.data())); - auto & compressed = compress(encoded, compressed_maybe, s.compression); + auto & compressed = compress(encoded, compressed_maybe, s.compression, s.compression_level); if (compressed.size() > INT32_MAX) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Compressed dictionary page is too big: {}", compressed.size()); diff --git a/src/Processors/Formats/Impl/Parquet/Write.h b/src/Processors/Formats/Impl/Parquet/Write.h index f162984fd5ed..1a0a60a5b5b6 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.h +++ b/src/Processors/Formats/Impl/Parquet/Write.h @@ -19,6 +19,7 @@ struct WriteOptions bool output_fixed_string_as_fixed_byte_array = true; CompressionMethod compression = CompressionMethod::Lz4; + int compression_level = 3; size_t data_page_size = 1024 * 1024; size_t write_batch_size = 1024; @@ -43,6 +44,7 @@ struct ColumnChunkWriteState ColumnPtr primitive_column; CompressionMethod compression; // must match what's inside column_chunk + int compression_level = 3; Int64 datetime64_multiplier = 1; // for converting e.g. seconds to milliseconds bool is_bool = false; // bool vs UInt8 have the same column type but are encoded differently diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp index 645f16c6abe5..96d47b2ffb96 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -95,6 +95,7 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo case C::GZIP: options.compression = CompressionMethod::Gzip; break; case C::BROTLI: options.compression = CompressionMethod::Brotli; break; } + options.compression_level = static_cast(format_settings.parquet.output_compression_level); options.output_string_as_string = format_settings.parquet.output_string_as_string; options.output_fixed_string_as_fixed_byte_array = format_settings.parquet.output_fixed_string_as_fixed_byte_array; options.data_page_size = format_settings.parquet.data_page_size; @@ -322,7 +323,13 @@ void ParquetBlockOutputFormat::writeUsingArrow(std::vector chunks) parquet::WriterProperties::Builder builder; builder.version(getParquetVersion(format_settings)); - builder.compression(getParquetCompression(format_settings.parquet.output_compression_method)); + auto compression_codec = getParquetCompression(format_settings.parquet.output_compression_method); + builder.compression(compression_codec); + + if (arrow::util::Codec::SupportsCompressionLevel(compression_codec)) + { + builder.compression_level(static_cast(format_settings.parquet.output_compression_level)); + } // write page index is disable at default. if (format_settings.parquet.write_page_index) builder.enable_write_page_index(); diff --git a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference index 492b12dba563..3d385dcfb465 100644 --- a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference +++ b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.reference @@ -12,3 +12,4 @@ 10 10 10 +10 diff --git a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh index d00026d516a3..26ff30495abc 100755 --- a/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh +++ b/tests/queries/0_stateless/02581_parquet_arrow_orc_compressions.sh @@ -10,6 +10,7 @@ set -o pipefail $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='none'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='lz4'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='snappy'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" +$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='snappy', output_format_parquet_use_custom_encoder=0" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='zstd'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='brotli'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" $CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='gzip'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table" diff --git a/tests/queries/0_stateless/03276_parquet_output_compression_level.reference b/tests/queries/0_stateless/03276_parquet_output_compression_level.reference new file mode 100644 index 000000000000..6ed281c757a9 --- /dev/null +++ b/tests/queries/0_stateless/03276_parquet_output_compression_level.reference @@ -0,0 +1,2 @@ +1 +1 diff --git a/tests/queries/0_stateless/03276_parquet_output_compression_level.sql b/tests/queries/0_stateless/03276_parquet_output_compression_level.sql new file mode 100644 index 000000000000..27e096a632a2 --- /dev/null +++ b/tests/queries/0_stateless/03276_parquet_output_compression_level.sql @@ -0,0 +1,19 @@ +-- Tags: no-parallel, no-fasttest + +insert into function file(03276_parquet_custom_encoder_compression_level_1.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=1, output_format_parquet_use_custom_encoder=1, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); +insert into function file(03276_parquet_custom_encoder_compression_level_22.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=22, output_format_parquet_use_custom_encoder=1, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); + +WITH + (SELECT total_compressed_size FROM file(03276_parquet_custom_encoder_compression_level_1.parquet, ParquetMetadata)) AS size_level_1, + (SELECT total_compressed_size FROM file(03276_parquet_custom_encoder_compression_level_22.parquet, ParquetMetadata)) AS size_level_22 +SELECT + size_level_22 < size_level_1 AS compression_higher_level_produces_smaller_file; + +insert into function file(03276_parquet_arrow_encoder_compression_level_1.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=1, output_format_parquet_use_custom_encoder=0, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); +insert into function file(03276_parquet_arrow_encoder_compression_level_22.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=22, output_format_parquet_use_custom_encoder=0, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000); + +WITH + (SELECT total_compressed_size FROM file(03276_parquet_arrow_encoder_compression_level_1.parquet, ParquetMetadata)) AS size_level_1, + (SELECT total_compressed_size FROM file(03276_parquet_arrow_encoder_compression_level_22.parquet, ParquetMetadata)) AS size_level_22 +SELECT + size_level_22 < size_level_1 AS compression_higher_level_produces_smaller_file;