Skip to content

Core: Parquet per column compression#16094

Open
mengna-lin wants to merge 4 commits intoapache:mainfrom
mengna-lin:parquet_per_column_compression
Open

Core: Parquet per column compression#16094
mengna-lin wants to merge 4 commits intoapache:mainfrom
mengna-lin:parquet_per_column_compression

Conversation

@mengna-lin
Copy link
Copy Markdown

@mengna-lin mengna-lin commented Apr 24, 2026

Closes #16090

Previously all columns in a Parquet file were forced to use the same codec. This PR enables parquet per column compression based on apache/parquet-java#3526 and apache/parquet-java#3396.

Changes

  1. Two new table property prefixes (write.parquet.compression-codec.column. and
    write.parquet.compression-level.column.) — columns without an override fall back to the global codec.
  2. ParquetWriter now holds a CompressionCodecFactory + default codec instead of a pre-resolved single BytesInputCompressor, and passes them to the new ColumnChunkPageWriteStore.
  3. setColumnCompressionConfig in WriteBuilder maps Iceberg column names to parquet. paths and calls withCompressionCodec/ withCompressionLevel on ParquetProperties.Builder, wiring the table properties into the Parquet write configuration.

Test with a spark Job

/**
 * Runnable Spark job to manually verify per-column Parquet compression.
 *
 * <p>Creates a temporary Iceberg table with:
 * <ul>
 *   <li>Global codec: zstd
 *   <li>Per-column override for {@code int_col}: snappy
 * </ul>
 * Writes a few rows, then reads the Parquet footer and prints each column's actual codec.
 */
public class PerColumnCompressionMain {

  public static void main(String[] args) throws Exception {
    Path warehouse = Files.createTempDirectory("iceberg-warehouse");

    SparkSession spark =
        SparkSession.builder()
            .master("local[2]")
            .appName("PerColumnCompressionMain")
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
            .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
            .config("spark.sql.catalog.local.type", "hadoop")
            .config("spark.sql.catalog.local.warehouse", warehouse.toAbsolutePath().toString())
            .getOrCreate();

    try {
      spark.sql(
          "CREATE TABLE local.default.test_per_col ("
              + "  int_col int,"
              + "  string_col string"
              + ") USING iceberg"
              + " TBLPROPERTIES ("
              + "  'write.parquet.compression-codec' = 'zstd',"
              + "  'write.parquet.compression-codec.column.int_col' = 'snappy'"
              + ")");

      spark.sql(
          "INSERT INTO local.default.test_per_col VALUES (1, 'a'), (2, 'b'), (3, 'c')");

      // Load the table and find the written data file
      Catalog catalog = new HadoopCatalog(spark.sessionState().newHadoopConf(), warehouse.toAbsolutePath().toString());
      Table table = catalog.loadTable(TableIdentifier.of("default", "test_per_col"));
      List<ManifestFile> manifests = table.currentSnapshot().dataManifests(table.io());

      try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
        DataFile file = reader.iterator().next();
        System.out.println("Data file: " + file.path());

        try (ParquetFileReader parquetReader =
            ParquetFileReader.open(
                new LocalInputFile(Paths.get(file.path().toString())))) {
          System.out.println("\nColumn codecs:");
          for (BlockMetaData block : parquetReader.getFooter().getBlocks()) {
            for (ColumnChunkMetaData col : block.getColumns()) {
              System.out.printf("  %-30s %s%n", col.getPath().toDotString(), col.getCodec());
            }
          }
        }
      }

      System.out.println("\nExpected:");
      System.out.println("  int_col                        SNAPPY  (per-column override)");
      System.out.println("  string_col                     ZSTD    (global fallback)");

    } finally {
      spark.sql("DROP TABLE IF EXISTS local.default.test_per_col");
      spark.stop();
    }
  }
}

Result

Column codecs:
  int_col                        SNAPPY
  string_col                     ZSTD

Expected:
  int_col                        SNAPPY  (per-column override)
  string_col                     ZSTD    (global fallback)

Copy link
Copy Markdown
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets wait for parquet code changes to complete and release then, once we get there, we may resume the discussion ?

cc @emkornfield

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parquet per column compression

2 participants