diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index e1a342b63261..0f9878d2019d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.function.Function; @@ -112,6 +113,10 @@ static Function converterFromParquet(PrimitiveType type) { case FIXED_LEN_BYTE_ARRAY: case BINARY: return binary -> ByteBuffer.wrap(((Binary) binary).getBytes()); + case INT96: + return binary -> + ParquetUtil.extractTimestampInt96( + ByteBuffer.wrap(((Binary) binary).getBytes()).order(ByteOrder.LITTLE_ENDIAN)); default: } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java index 33ec2f6817e0..1d24b7ccd71f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java @@ -453,6 +453,9 @@ private Set dict(int id, Comparator comparator) { case DOUBLE: dictSet.add((T) conversion.apply(dict.decodeToDouble(i))); break; + case INT96: + dictSet.add((T) conversion.apply(dict.decodeToBinary(i))); + break; default: throw new IllegalArgumentException( "Cannot decode dictionary of type: " diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 4f585eee51f4..d37d6a861690 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2181,20 +2181,30 @@ public void testTableWithInt96Timestamp() throws IOException { stagingLocation); // validate we get the expected results back - List expected = spark.table("parquet_table").select("tmp_col").collectAsList(); - List actual = - spark - .read() - .format("iceberg") - .load(loadLocation(tableIdentifier)) - .select("tmp_col") - .collectAsList(); - assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected); + testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier); + testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier); dropTable(tableIdentifier); } } } + private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) { + List expected = + spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList(); + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier)) + .select("tmp_col") + .filter(filterExpr) + .collectAsList(); + assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected); + } + private GenericData.Record manifestRecord( Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) { GenericRecordBuilder builder =