From 275aa34b7af4139e81737c67f012ce4363141a9f Mon Sep 17 00:00:00 2001 From: manuzhang Date: Sun, 5 Nov 2023 23:56:50 +0800 Subject: [PATCH] Parquet: Support reading INT96 column in row group filter --- .../iceberg/parquet/ParquetConversions.java | 5 +++ .../ParquetDictionaryRowGroupFilter.java | 3 ++ .../source/TestIcebergSourceTablesBase.java | 33 ++++++++++++------- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index e1a342b63261..0f9878d2019d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.function.Function; @@ -112,6 +113,10 @@ static Function converterFromParquet(PrimitiveType type) { case FIXED_LEN_BYTE_ARRAY: case BINARY: return binary -> ByteBuffer.wrap(((Binary) binary).getBytes()); + case INT96: + return binary -> + ParquetUtil.extractTimestampInt96( + ByteBuffer.wrap(((Binary) binary).getBytes()).order(ByteOrder.LITTLE_ENDIAN)); default: } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java index 33ec2f6817e0..1d24b7ccd71f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java @@ -453,6 +453,9 @@ private Set dict(int id, Comparator comparator) { case DOUBLE: dictSet.add((T) conversion.apply(dict.decodeToDouble(i))); break; + case INT96: + dictSet.add((T) conversion.apply(dict.decodeToBinary(i))); + break; default: throw new IllegalArgumentException( "Cannot decode dictionary of type: " diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 486713e52e30..540d94228bca 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2146,22 +2146,33 @@ public void testTableWithInt96Timestamp() throws IOException { stagingLocation); // validate we get the expected results back - List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier)) - .select("tmp_col") - .collectAsList(); - Assertions.assertThat(actual) - .as("Rows must match") - .containsExactlyInAnyOrderElementsOf(expected); + testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + dropTable(tableIdentifier); } } } + private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) { + List expected = + spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .filter(filterExpr) + .collectAsList(); + Assertions.assertThat(actual) + .as("Rows must match") + .containsExactlyInAnyOrderElementsOf(expected); + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder =