Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Move to ValueReader generation to a visitor #9063

Merged
merged 4 commits into from
Jan 8, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
Expand All @@ -43,6 +44,7 @@
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -108,6 +110,113 @@ public ParquetValueReader<?> struct(
}
}

private class LogicalTypeAnnotationParquetValueReaderVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {

private final ColumnDescriptor desc;
private final org.apache.iceberg.types.Type.PrimitiveType expected;
private final PrimitiveType primitive;

LogicalTypeAnnotationParquetValueReaderVisitor(
ColumnDescriptor desc,
org.apache.iceberg.types.Type.PrimitiveType expected,
PrimitiveType primitive) {
this.desc = desc;
this.expected = expected;
this.primitive = primitive;
}

@Override
public Optional<ParquetValueReader<?>> visit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this case UTF8: return new ParquetValueReaders.StringReader(desc); being covered here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decimalLogicalType) {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return Optional.of(
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT64:
return Optional.of(
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale()));
case INT32:
return Optional.of(
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale()));
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return Optional.of(new TimeReader(desc));
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return Optional.of(new TimeMillisReader(desc));
}

return Optional.empty();
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
return tsMicrosType.shouldAdjustToUTC()
? Optional.of(new TimestamptzReader(desc))
: Optional.of(new TimestampReader(desc));
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be == LogicalTypeAnnotation.TimeUnit.MILLIS? Not Micros

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, that's a good one! I would this part to be thoroughly tested as well.

Types.TimestampType tsMillisType = (Types.TimestampType) expected;
return tsMillisType.shouldAdjustToUTC()
? Optional.of(new TimestamptzMillisReader(desc))
: Optional.of(new TimestampMillisReader(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
if (intLogicalType.getBitWidth() == 64) {
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG)
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc))
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(new ParquetValueReaders.StringReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, ?> idToConstant;
Expand Down Expand Up @@ -240,62 +349,13 @@ public ParquetValueReader<?> primitive(
ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
switch (primitive.getOriginalType()) {
case ENUM:
case JSON:
case UTF8:
return new ParquetValueReaders.StringReader(desc);
case INT_8:
case INT_16:
case INT_32:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) {
return new ParquetValueReaders.IntAsLongReader(desc);
} else {
return new ParquetValueReaders.UnboxedReader<>(desc);
}
case INT_64:
return new ParquetValueReaders.UnboxedReader<>(desc);
case DATE:
return new DateReader(desc);
case TIMESTAMP_MICROS:
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
if (tsMicrosType.shouldAdjustToUTC()) {
return new TimestamptzReader(desc);
} else {
return new TimestampReader(desc);
}
case TIMESTAMP_MILLIS:
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
if (tsMillisType.shouldAdjustToUTC()) {
return new TimestamptzMillisReader(desc);
} else {
return new TimestampMillisReader(desc);
}
case TIME_MICROS:
return new TimeReader(desc);
case TIME_MILLIS:
return new TimeMillisReader(desc);
case DECIMAL:
DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return new ParquetValueReaders.BinaryAsDecimalReader(desc, decimal.getScale());
case INT64:
return new ParquetValueReaders.LongAsDecimalReader(desc, decimal.getScale());
case INT32:
return new ParquetValueReaders.IntegerAsDecimalReader(desc, decimal.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
case BSON:
return new ParquetValueReaders.BytesReader(desc);
default:
throw new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getOriginalType());
}
return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
.orElseThrow(
() ->
new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getLogicalTypeAnnotation()));
}

switch (primitive.getPrimitiveTypeName()) {
Expand Down