diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index b14354def6ac..770865a5693c 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -135,6 +135,9 @@ private TableProperties() {} public static final String DELETE_PARQUET_PAGE_ROW_LIMIT = "write.delete.parquet.page-row-limit"; public static final int PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20_000; + public static final String PARQUET_DICT_ENABLED = "write.parquet.enable.dictionary"; + public static final boolean PARQUET_DICT_ENABLED_DEFAULT = true; + public static final String PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"; public static final String DELETE_PARQUET_DICT_SIZE_BYTES = "write.delete.parquet.dict-size-bytes"; diff --git a/docs/configuration.md b/docs/configuration.md index 15e36b34e4b0..99a25fec128c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -47,51 +47,52 @@ Iceberg tables support table properties to configure table behavior, like the de ### Write properties -| Property | Default | Description | -|-----------------------------------------------------|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| write.format.default | parquet | Default file format for the table; parquet, avro, or orc | -| write.delete.format.default | data file format | Default delete file format for the table; parquet, avro, or orc | -| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size | -| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size | -| write.parquet.page-row-limit | 20000 | Parquet page row limit | -| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | -| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | -| write.parquet.compression-level | null | Parquet compression level | -| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Enables writing a bloom filter for the column: col1 | -| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | -| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | -| write.avro.compression-level | null | Avro compression level | -| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes | -| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files | -| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none | -| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression | -| write.orc.bloom.filter.columns | (not set) | Comma separated list of column names for which a Bloom filter must be created | -| write.orc.bloom.filter.fpp | 0.05 | False positive probability for Bloom filter (must > 0.0 and < 1.0) | -| write.location-provider.impl | null | Optional custom implementation for LocationProvider | -| write.metadata.compression-codec | none | Metadata compression codec; none or gzip | -| write.metadata.metrics.max-inferred-column-defaults | 100 | Defines the maximum number of columns for which metrics are collected | -| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full | -| write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full | -| write.target-file-size-bytes | 536870912 (512 MB) | Controls the size of files generated to target about this many bytes | -| write.delete.target-file-size-bytes | 67108864 (64 MB) | Controls the size of delete files generated to target about this many bytes | -| write.distribution-mode | none | Defines distribution of write data: __none__: don't shuffle rows; __hash__: hash distribute by partition key ; __range__: range distribute by partition key or sort key if table has an SortOrder | -| write.delete.distribution-mode | hash | Defines distribution of write delete data | -| write.update.distribution-mode | hash | Defines distribution of write update data | -| write.merge.distribution-mode | none | Defines distribution of write merge data | -| write.wap.enabled | false | Enables write-audit-publish writes | -| write.summary.partition-limit | 0 | Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit | -| write.metadata.delete-after-commit.enabled | false | Controls whether to delete the oldest **tracked** version metadata files after commit | -| write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit | -| write.spark.fanout.enabled | false | Enables the fanout writer in Spark that does not require data to be clustered; uses more memory | -| write.object-storage.enabled | false | Enables the object storage location provider that adds a hash component to file paths | -| write.data.path | table location + /data | Base location for data files | -| write.metadata.path | table location + /metadata | Base location for metadata files | -| write.delete.mode | copy-on-write | Mode used for delete commands: copy-on-write or merge-on-read (v2 only) | -| write.delete.isolation-level | serializable | Isolation level for delete commands: serializable or snapshot | -| write.update.mode | copy-on-write | Mode used for update commands: copy-on-write or merge-on-read (v2 only) | -| write.update.isolation-level | serializable | Isolation level for update commands: serializable or snapshot | -| write.merge.mode | copy-on-write | Mode used for merge commands: copy-on-write or merge-on-read (v2 only) | -| write.merge.isolation-level | serializable | Isolation level for merge commands: serializable or snapshot | +| Property | Default | Description | +|------------------------------------------------------|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| write.format.default | parquet | Default file format for the table; parquet, avro, or orc | +| write.delete.format.default | data file format | Default delete file format for the table; parquet, avro, or orc | +| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size | +| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size | +| write.parquet.page-row-limit | 20000 | Parquet page row limit | + | write.parquet.dictionary.enabled | true | Enable dictionary encoding | + | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | +| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | +| write.parquet.compression-level | null | Parquet compression level | +| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: col1 | +| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | +| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed | +| write.avro.compression-level | null | Avro compression level | +| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes | +| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files | +| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none | +| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression | +| write.orc.bloom.filter.columns | (not set) | Comma separated list of column names for which a Bloom filter must be created | +| write.orc.bloom.filter.fpp | 0.05 | False positive probability for Bloom filter (must > 0.0 and < 1.0) | +| write.location-provider.impl | null | Optional custom implementation for LocationProvider | +| write.metadata.compression-codec | none | Metadata compression codec; none or gzip | +| write.metadata.metrics.max-inferred-column-defaults | 100 | Defines the maximum number of columns for which metrics are collected | +| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full | +| write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full | +| write.target-file-size-bytes | 536870912 (512 MB) | Controls the size of files generated to target about this many bytes | +| write.delete.target-file-size-bytes | 67108864 (64 MB) | Controls the size of delete files generated to target about this many bytes | +| write.distribution-mode | none | Defines distribution of write data: __none__: don't shuffle rows; __hash__: hash distribute by partition key ; __range__: range distribute by partition key or sort key if table has an SortOrder | +| write.delete.distribution-mode | hash | Defines distribution of write delete data | +| write.update.distribution-mode | hash | Defines distribution of write update data | +| write.merge.distribution-mode | none | Defines distribution of write merge data | +| write.wap.enabled | false | Enables write-audit-publish writes | +| write.summary.partition-limit | 0 | Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit | +| write.metadata.delete-after-commit.enabled | false | Controls whether to delete the oldest **tracked** version metadata files after commit | +| write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit | +| write.spark.fanout.enabled | false | Enables the fanout writer in Spark that does not require data to be clustered; uses more memory | +| write.object-storage.enabled | false | Enables the object storage location provider that adds a hash component to file paths | +| write.data.path | table location + /data | Base location for data files | +| write.metadata.path | table location + /metadata | Base location for metadata files | +| write.delete.mode | copy-on-write | Mode used for delete commands: copy-on-write or merge-on-read (v2 only) | +| write.delete.isolation-level | serializable | Isolation level for delete commands: serializable or snapshot | +| write.update.mode | copy-on-write | Mode used for update commands: copy-on-write or merge-on-read (v2 only) | +| write.update.isolation-level | serializable | Isolation level for update commands: serializable or snapshot | +| write.merge.mode | copy-on-write | Mode used for merge commands: copy-on-write or merge-on-read (v2 only) | +| write.merge.isolation-level | serializable | Isolation level for merge commands: serializable or snapshot | ### Table behavior properties diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 8b1e6c056403..7caeb7c7cd71 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -33,6 +33,8 @@ import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_DICT_ENABLED; +import static org.apache.iceberg.TableProperties.PARQUET_DICT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; @@ -244,6 +246,7 @@ public FileAppender build() throws IOException { int rowGroupSize = context.rowGroupSize(); int pageSize = context.pageSize(); int pageRowLimit = context.pageRowLimit(); + boolean dictionaryEnabled = context.dictionaryEnabled(); int dictionaryPageSize = context.dictionaryPageSize(); String compressionLevel = context.compressionLevel(); CompressionCodecName codec = context.codec(); @@ -286,6 +289,7 @@ public FileAppender build() throws IOException { .withWriterVersion(writerVersion) .withPageSize(pageSize) .withPageRowCountLimit(pageRowLimit) + .withDictionaryEncoding(dictionaryEnabled) .withDictionaryPageSize(dictionaryPageSize) .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) @@ -323,6 +327,7 @@ public FileAppender build() throws IOException { .withRowGroupSize(rowGroupSize) .withPageSize(pageSize) .withPageRowCountLimit(pageRowLimit) + .withDictionaryEncoding(dictionaryEnabled) .withDictionaryPageSize(dictionaryPageSize); for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { @@ -339,6 +344,7 @@ private static class Context { private final int rowGroupSize; private final int pageSize; private final int pageRowLimit; + private final boolean dictionaryEnabled; private final int dictionaryPageSize; private final CompressionCodecName codec; private final String compressionLevel; @@ -351,6 +357,7 @@ private Context( int rowGroupSize, int pageSize, int pageRowLimit, + boolean dictionaryEnabled, int dictionaryPageSize, CompressionCodecName codec, String compressionLevel, @@ -361,6 +368,7 @@ private Context( this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; this.pageRowLimit = pageRowLimit; + this.dictionaryEnabled = dictionaryEnabled; this.dictionaryPageSize = dictionaryPageSize; this.codec = codec; this.compressionLevel = compressionLevel; @@ -386,6 +394,10 @@ static Context dataContext(Map config) { config, PARQUET_PAGE_ROW_LIMIT, PARQUET_PAGE_ROW_LIMIT_DEFAULT); Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0"); + boolean dictionaryEnabled = + PropertyUtil.propertyAsBoolean( + config, PARQUET_DICT_ENABLED, PARQUET_DICT_ENABLED_DEFAULT); + int dictionaryPageSize = PropertyUtil.propertyAsInt( config, PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT); @@ -429,6 +441,7 @@ static Context dataContext(Map config) { rowGroupSize, pageSize, pageRowLimit, + dictionaryEnabled, dictionaryPageSize, codec, compressionLevel, @@ -500,6 +513,7 @@ static Context deleteContext(Map config) { rowGroupSize, pageSize, pageRowLimit, + dataContext.dictionaryEnabled(), dictionaryPageSize, codec, compressionLevel, @@ -529,6 +543,10 @@ int pageRowLimit() { return pageRowLimit; } + boolean dictionaryEnabled() { + return dictionaryEnabled; + } + int dictionaryPageSize() { return dictionaryPageSize; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java index 34a92a9b4483..acb318a23286 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.parquet; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_DICT_ENABLED; import static org.apache.iceberg.avro.AvroSchemaUtil.convert; import static org.apache.iceberg.expressions.Expressions.and; import static org.apache.iceberg.expressions.Expressions.equal; @@ -197,6 +198,7 @@ public void createInputFile() throws IOException { try (FileAppender appender = Parquet.write(outFile) .schema(FILE_SCHEMA) + .set(PARQUET_DICT_ENABLED, "false") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_id", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long", "true") .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_double", "true") diff --git a/versions.props b/versions.props index 7af5f0eaa9d4..c9028be3cfcf 100644 --- a/versions.props +++ b/versions.props @@ -5,7 +5,7 @@ org.apache.hadoop:* = 2.7.3 org.apache.hive:* = 2.3.9 org.apache.httpcomponents.client5:* = 5.2.1 org.apache.orc:* = 1.8.3 -org.apache.parquet:* = 1.12.3 +org.apache.parquet:* = 1.13.1 org.apache.pig:pig = 0.14.0 com.fasterxml.jackson.*:* = 2.14.1 com.google.code.findbugs:jsr305 = 3.0.2 @@ -48,4 +48,4 @@ com.esotericsoftware:kryo = 4.0.2 org.eclipse.jetty:* = 9.4.43.v20210629 org.testcontainers:* = 1.17.6 io.delta:delta-core_* = 2.2.0 -org.awaitility:awaitility = 4.2.0 +org.awaitility:awaitility = 4.2.0 \ No newline at end of file