Skip to content

Flink: LogicalTypeWriterBuilder does not support UUID #14330

@rlokugamage

Description

@rlokugamage

Apache Iceberg version

1.10.0 (latest release)

Query engine

Flink

Please describe the bug 🐞

flink: 2.0
iceberg-flink: 1.10.0 (2.0 runtime jar)
parquet: 1.16.0

Trying to write to an iceberg table with a UUID column. However when flink tries writing I get this error:

Caused by: java.lang.UnsupportedOperationException: Unsupported logical type: null
	at org.apache.iceberg.flink.data.FlinkParquetWriters$WriteBuilder.primitive(FlinkParquetWriters.java:163)
	at org.apache.iceberg.flink.data.FlinkParquetWriters$WriteBuilder.primitive(FlinkParquetWriters.java:75)
	at org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor.visit(ParquetWithFlinkSchemaVisitor.java:51)
	at org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor.visitField(ParquetWithFlinkSchemaVisitor.java:162)
	at org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor.visitFields(ParquetWithFlinkSchemaVisitor.java:186)
	at org.apache.iceberg.flink.data.ParquetWithFlinkSchemaVisitor.visit(ParquetWithFlinkSchemaVisitor.java:49)
	at org.apache.iceberg.flink.data.FlinkParquetWriters.buildWriter(FlinkParquetWriters.java:72)
	at org.apache.iceberg.flink.sink.FlinkAppenderFactory.lambda$newAppender$2(FlinkAppenderFactory.java:126)

doing some debugging I found that it's failing on the UUID column. I see that in FlinkParquetWriters.java it doesn't support the UUID Logical Type Annotation. Flink and Parquet support UUID so I don't understand why this wasn't included.

Since UUIDs in parquet are fixed length byte arrays, should be implemented like this:

import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
...
private static class LogicalTypeWriterBuilder
      implements LogicalTypeAnnotationVisitor<ParquetValueWriter<?>> {
    private final LogicalType flinkType;
    private final ColumnDescriptor desc;

    private LogicalTypeWriterBuilder(LogicalType flinkType, ColumnDescriptor desc) {
      this.flinkType = flinkType;
      this.desc = desc;
    }
...
    @Override
    public Optional<ParquetValueWriter<?>> visit(UUIDLogicalTypeAnnotation uuid) {
        return Optional.of(byteArrays(desc));
    }
}

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions