Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.prefer_block_bytes = settings.input_format_parquet_prefer_block_bytes;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compression_level = context->getSettingsRef().output_format_compression_level;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;
format_settings.parquet.use_custom_encoder = settings.output_format_parquet_use_custom_encoder;
format_settings.parquet.parallel_encoding = settings.output_format_parquet_parallel_encoding;
Expand Down
1 change: 1 addition & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ struct FormatSettings
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
uint64_t output_compression_level;
bool output_compliant_nested_types = true;
size_t data_page_size = 1024 * 1024;
size_t write_batch_size = 1024;
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin
auto & state = states.emplace_back();
state.primitive_column = column;
state.compression = options.compression;
state.compression_level = options.compression_level;

state.column_chunk.__isset.meta_data = true;
state.column_chunk.meta_data.__set_path_in_schema({name});
Expand Down
8 changes: 4 additions & 4 deletions src/Processors/Formats/Impl/Parquet/Write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ struct ConverterDecimal
};

/// Returns either `source` or `scratch`.
PODArray<char> & compress(PODArray<char> & source, PODArray<char> & scratch, CompressionMethod method)
PODArray<char> & compress(PODArray<char> & source, PODArray<char> & scratch, CompressionMethod method, int level)
{
/// We could use wrapWriteBufferWithCompressionMethod() for everything, but I worry about the
/// overhead of creating a bunch of WriteBuffers on each page (thousands of values).
Expand Down Expand Up @@ -447,7 +447,7 @@ PODArray<char> & compress(PODArray<char> & source, PODArray<char> & scratch, Com
auto compressed_buf = wrapWriteBufferWithCompressionMethod(
std::move(dest_buf),
method,
/*level*/ 3,
level,
/*zstd_window_log*/ 0,
source.size(),
/*existing_memory*/ source.data());
Expand Down Expand Up @@ -575,7 +575,7 @@ void writeColumnImpl(
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Uncompressed page is too big: {}", encoded.size());

size_t uncompressed_size = encoded.size();
auto & compressed = compress(encoded, compressed_maybe, s.compression);
auto & compressed = compress(encoded, compressed_maybe, s.compression, s.compression_level);

if (compressed.size() > INT32_MAX)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Compressed page is too big: {}", compressed.size());
Expand Down Expand Up @@ -626,7 +626,7 @@ void writeColumnImpl(
encoded.resize(static_cast<size_t>(dict_size));
dict_encoder->WriteDict(reinterpret_cast<uint8_t *>(encoded.data()));

auto & compressed = compress(encoded, compressed_maybe, s.compression);
auto & compressed = compress(encoded, compressed_maybe, s.compression, s.compression_level);

if (compressed.size() > INT32_MAX)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Compressed dictionary page is too big: {}", compressed.size());
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Formats/Impl/Parquet/Write.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct WriteOptions
bool output_fixed_string_as_fixed_byte_array = true;

CompressionMethod compression = CompressionMethod::Lz4;
int compression_level = 3;

size_t data_page_size = 1024 * 1024;
size_t write_batch_size = 1024;
Expand All @@ -43,6 +44,7 @@ struct ColumnChunkWriteState

ColumnPtr primitive_column;
CompressionMethod compression; // must match what's inside column_chunk
int compression_level = 3;
Int64 datetime64_multiplier = 1; // for converting e.g. seconds to milliseconds
bool is_bool = false; // bool vs UInt8 have the same column type but are encoded differently

Expand Down
9 changes: 8 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo
case C::GZIP: options.compression = CompressionMethod::Gzip; break;
case C::BROTLI: options.compression = CompressionMethod::Brotli; break;
}
options.compression_level = static_cast<int>(format_settings.parquet.output_compression_level);
options.output_string_as_string = format_settings.parquet.output_string_as_string;
options.output_fixed_string_as_fixed_byte_array = format_settings.parquet.output_fixed_string_as_fixed_byte_array;
options.data_page_size = format_settings.parquet.data_page_size;
Expand Down Expand Up @@ -322,7 +323,13 @@ void ParquetBlockOutputFormat::writeUsingArrow(std::vector<Chunk> chunks)

parquet::WriterProperties::Builder builder;
builder.version(getParquetVersion(format_settings));
builder.compression(getParquetCompression(format_settings.parquet.output_compression_method));
auto compression_codec = getParquetCompression(format_settings.parquet.output_compression_method);
builder.compression(compression_codec);

if (arrow::util::Codec::SupportsCompressionLevel(compression_codec))
{
builder.compression_level(static_cast<int>(format_settings.parquet.output_compression_level));
}
// write page index is disable at default.
if (format_settings.parquet.write_page_index)
builder.enable_write_page_index();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
10
10
10
10
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set -o pipefail
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='none'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='lz4'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='snappy'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='snappy', output_format_parquet_use_custom_encoder=0" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='zstd'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='brotli'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
$CLICKHOUSE_LOCAL -q "select * from numbers(10) format Parquet settings output_format_parquet_compression_method='gzip'" | $CLICKHOUSE_LOCAL --input-format=Parquet -q "select count() from table"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Tags: no-parallel, no-fasttest

insert into function file(03276_parquet_custom_encoder_compression_level_1.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=1, output_format_parquet_use_custom_encoder=1, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000);
insert into function file(03276_parquet_custom_encoder_compression_level_22.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=22, output_format_parquet_use_custom_encoder=1, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000);

WITH
(SELECT total_compressed_size FROM file(03276_parquet_custom_encoder_compression_level_1.parquet, ParquetMetadata)) AS size_level_1,
(SELECT total_compressed_size FROM file(03276_parquet_custom_encoder_compression_level_22.parquet, ParquetMetadata)) AS size_level_22
SELECT
size_level_22 < size_level_1 AS compression_higher_level_produces_smaller_file;

insert into function file(03276_parquet_arrow_encoder_compression_level_1.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=1, output_format_parquet_use_custom_encoder=0, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000);
insert into function file(03276_parquet_arrow_encoder_compression_level_22.parquet) SETTINGS output_format_parquet_compression_method = 'zstd', output_format_compression_level=22, output_format_parquet_use_custom_encoder=0, engine_file_truncate_on_insert=1 SELECT number AS id, toString(number) AS name, now() + number AS timestamp FROM numbers(100000);

WITH
(SELECT total_compressed_size FROM file(03276_parquet_arrow_encoder_compression_level_1.parquet, ParquetMetadata)) AS size_level_1,
(SELECT total_compressed_size FROM file(03276_parquet_arrow_encoder_compression_level_22.parquet, ParquetMetadata)) AS size_level_22
SELECT
size_level_22 < size_level_1 AS compression_higher_level_produces_smaller_file;
Loading