diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index bda98ca1540e..79585b097d27 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -29,6 +29,7 @@ import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; @@ -43,6 +44,7 @@ import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -108,6 +110,113 @@ public ParquetValueReader struct( } } + private class LogicalTypeAnnotationParquetValueReaderVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { + + private final ColumnDescriptor desc; + private final org.apache.iceberg.types.Type.PrimitiveType expected; + private final PrimitiveType primitive; + + LogicalTypeAnnotationParquetValueReaderVisitor( + ColumnDescriptor desc, + org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + this.desc = desc; + this.expected = expected; + this.primitive = primitive; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit(DecimalLogicalTypeAnnotation decimalLogicalType) { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT64: + return Optional.of( + new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); + case INT32: + return Optional.of( + new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(new DateReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { + return Optional.of(new TimeReader(desc)); + } else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return Optional.of(new TimeMillisReader(desc)); + } + + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { + Types.TimestampType tsMicrosType = (Types.TimestampType) expected; + return tsMicrosType.shouldAdjustToUTC() + ? Optional.of(new TimestamptzReader(desc)) + : Optional.of(new TimestampReader(desc)); + } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + Types.TimestampType tsMillisType = (Types.TimestampType) expected; + return tsMillisType.shouldAdjustToUTC() + ? Optional.of(new TimestamptzMillisReader(desc)) + : Optional.of(new TimestampMillisReader(desc)); + } + + return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + if (intLogicalType.getBitWidth() == 64) { + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) + ? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) + : Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(new ParquetValueReaders.StringReader(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { + return Optional.of(new ParquetValueReaders.BytesReader(desc)); + } + } + private class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; @@ -240,62 +349,13 @@ public ParquetValueReader primitive( ColumnDescriptor desc = type.getColumnDescription(currentPath()); if (primitive.getOriginalType() != null) { - switch (primitive.getOriginalType()) { - case ENUM: - case JSON: - case UTF8: - return new ParquetValueReaders.StringReader(desc); - case INT_8: - case INT_16: - case INT_32: - if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) { - return new ParquetValueReaders.IntAsLongReader(desc); - } else { - return new ParquetValueReaders.UnboxedReader<>(desc); - } - case INT_64: - return new ParquetValueReaders.UnboxedReader<>(desc); - case DATE: - return new DateReader(desc); - case TIMESTAMP_MICROS: - Types.TimestampType tsMicrosType = (Types.TimestampType) expected; - if (tsMicrosType.shouldAdjustToUTC()) { - return new TimestamptzReader(desc); - } else { - return new TimestampReader(desc); - } - case TIMESTAMP_MILLIS: - Types.TimestampType tsMillisType = (Types.TimestampType) expected; - if (tsMillisType.shouldAdjustToUTC()) { - return new TimestamptzMillisReader(desc); - } else { - return new TimestampMillisReader(desc); - } - case TIME_MICROS: - return new TimeReader(desc); - case TIME_MILLIS: - return new TimeMillisReader(desc); - case DECIMAL: - DecimalLogicalTypeAnnotation decimal = - (DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation(); - switch (primitive.getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return new ParquetValueReaders.BinaryAsDecimalReader(desc, decimal.getScale()); - case INT64: - return new ParquetValueReaders.LongAsDecimalReader(desc, decimal.getScale()); - case INT32: - return new ParquetValueReaders.IntegerAsDecimalReader(desc, decimal.getScale()); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - case BSON: - return new ParquetValueReaders.BytesReader(desc); - default: - throw new UnsupportedOperationException( - "Unsupported logical type: " + primitive.getOriginalType()); - } + return primitive + .getLogicalTypeAnnotation() + .accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive)) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); } switch (primitive.getPrimitiveTypeName()) {