diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 37a69fe9aebd..640081cd5002 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -353,12 +353,16 @@ public static DataField convertToPaimonField(Type parquetType) { instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; - int precision = - timestampType - .getUnit() - .equals(LogicalTypeAnnotation.TimeUnit.MILLIS) - ? 3 - : 6; + int precision; + if (timestampType.getUnit().equals(LogicalTypeAnnotation.TimeUnit.MILLIS)) { + precision = 3; + } else if (timestampType + .getUnit() + .equals(LogicalTypeAnnotation.TimeUnit.MICROS)) { + precision = 6; + } else { + precision = 9; + } paimonDataType = timestampType.isAdjustedToUTC() ? new LocalZonedTimestampType(precision) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java index 0abf78fd2747..fdf3ecdc3066 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java @@ -190,7 +190,9 @@ public UpdaterFactory visit(TimestampType timestampType) { return c -> { if (c.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) { - return new LongTimestampUpdater(timestampType.getPrecision()); + return new LongTimestampUpdater( + timestampType.getPrecision(), + timestampUnit(c, timestampType.getPrecision())); } else if (c.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { return new TimestampUpdater(timestampType.getPrecision()); @@ -206,12 +208,32 @@ public UpdaterFactory visit(LocalZonedTimestampType localZonedTimestampType) { return c -> { if (c.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) { - return new LongUpdater(); + return new LongTimestampUpdater( + localZonedTimestampType.getPrecision(), + timestampUnit(c, localZonedTimestampType.getPrecision())); } return new TimestampUpdater(localZonedTimestampType.getPrecision()); }; } + private static LogicalTypeAnnotation.TimeUnit timestampUnit( + ColumnDescriptor descriptor, int precision) { + LogicalTypeAnnotation typeAnnotation = + descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + if (typeAnnotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + return ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) typeAnnotation) + .getUnit(); + } + + if (precision <= 3) { + return LogicalTypeAnnotation.TimeUnit.MILLIS; + } else if (precision <= 6) { + return LogicalTypeAnnotation.TimeUnit.MICROS; + } else { + return LogicalTypeAnnotation.TimeUnit.MILLIS; + } + } + @Override public UpdaterFactory visit(VariantType variantType) { throw new RuntimeException("Variant type is not supported"); @@ -436,8 +458,11 @@ public void readValues( private static class LongTimestampUpdater extends AbstractTimestampUpdater { - public LongTimestampUpdater(int precision) { + private final LogicalTypeAnnotation.TimeUnit timeUnit; + + public LongTimestampUpdater(int precision, LogicalTypeAnnotation.TimeUnit timeUnit) { super(precision); + this.timeUnit = timeUnit; } @Override @@ -465,9 +490,67 @@ public void decodeSingleDictionaryId( private void putTimestamp(WritableColumnVector vector, int offset, long timestamp) { if (vector instanceof WritableTimestampVector) { ((WritableTimestampVector) vector) - .setTimestamp(offset, Timestamp.fromEpochMillis(timestamp)); + .setTimestamp(offset, timestampFromInt64(timestamp, timeUnit)); + } else { + ((WritableLongVector) vector).setLong(offset, longTimestamp(timestamp)); + } + } + + private long longTimestamp(long timestamp) { + if (precision <= 3) { + return millisFromInt64(timestamp, timeUnit); + } else if (precision <= 6) { + return microsFromInt64(timestamp, timeUnit); } else { - ((WritableLongVector) vector).setLong(offset, timestamp); + throw new UnsupportedOperationException( + "Unsupported timestamp precision: " + precision); + } + } + + private static Timestamp timestampFromInt64( + long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) { + switch (timeUnit) { + case MILLIS: + return Timestamp.fromEpochMillis(timestamp); + case MICROS: + return Timestamp.fromMicros(timestamp); + case NANOS: + return Timestamp.fromEpochMillis( + Math.floorDiv(timestamp, 1_000_000L), + (int) Math.floorMod(timestamp, 1_000_000L)); + default: + throw new UnsupportedOperationException( + "Unsupported timestamp unit: " + timeUnit); + } + } + + private static long millisFromInt64( + long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) { + switch (timeUnit) { + case MILLIS: + return timestamp; + case MICROS: + return Math.floorDiv(timestamp, 1_000L); + case NANOS: + return Math.floorDiv(timestamp, 1_000_000L); + default: + throw new UnsupportedOperationException( + "Unsupported timestamp unit: " + timeUnit); + } + } + + private static long microsFromInt64( + long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) { + switch (timeUnit) { + case MILLIS: + return Math.multiplyExact(timestamp, 1_000L); + case MICROS: + return timestamp; + case NANOS: + return Math.floorDiv(timestamp, 1_000L); + default: + throw new UnsupportedOperationException( + "Unsupported timestamp unit: " + timeUnit); } } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index 58fc10cc5174..5851ef7db5f4 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -670,6 +670,77 @@ public void testReadBinaryWrittenByParquet() throws Exception { }); } + @Test + public void testReadTimestampNanosWrittenByParquet() throws Exception { + Path path = new Path(folder.getPath(), UUID.randomUUID().toString()); + Configuration conf = new Configuration(); + Type timestampNanosType = + Types.primitive(INT64, Type.Repetition.REQUIRED) + .as( + LogicalTypeAnnotation.timestampType( + false, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("f0") + .withId(0); + Type arrayTimestampNanosType = + ConversionPatterns.listOfElements( + Type.Repetition.OPTIONAL, + "f1", + Types.primitive(INT64, Type.Repetition.OPTIONAL) + .as( + LogicalTypeAnnotation.timestampType( + false, + LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("element") + .withId(2)) + .withId(1); + MessageType schema = + new MessageType("origin-parquet", timestampNanosType, arrayTimestampNanosType); + long[] nanosValues = new long[] {1704067200123456789L, -123456789L}; + + try (ParquetWriter writer = + ExampleParquetWriter.builder( + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(path.toString()), conf)) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withConf(new Configuration()) + .withType(schema) + .build()) { + SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema); + for (long nanos : nanosValues) { + Group row = simpleGroupFactory.newGroup(); + row.append("f0", nanos); + Group array = row.addGroup("f1"); + array.addGroup(0).add(0, nanos); + array.addGroup(0).add(0, nanos + 1); + writer.write(row); + } + } + + RowType paimonRowType = + RowType.builder() + .fields(new TimestampType(9), new ArrayType(new TimestampType(9))) + .build(); + ParquetReaderFactory format = + new ParquetReaderFactory(new Options(), paimonRowType, 500, FilterCompat.NOOP); + AtomicInteger count = new AtomicInteger(0); + try (RecordReader reader = + format.createReader( + new FormatReaderContext( + new LocalFileIO(), path, new LocalFileIO().getFileSize(path)))) { + reader.forEachRemaining( + row -> { + long nanos = nanosValues[count.get()]; + assertThat(row.getTimestamp(0, 9)).isEqualTo(timestampFromNanos(nanos)); + assertThat(row.getArray(1).getTimestamp(0, 9)) + .isEqualTo(timestampFromNanos(nanos)); + assertThat(row.getArray(1).getTimestamp(1, 9)) + .isEqualTo(timestampFromNanos(nanos + 1)); + count.incrementAndGet(); + }); + } + assertThat(count.get()).isEqualTo(nanosValues.length); + } + private void innerTestTypes(File folder, List records, int rowGroupSize) throws IOException { List rows = records.stream().map(this::newRow).collect(Collectors.toList()); @@ -865,6 +936,11 @@ private InternalRow newRow(Integer v) { new GenericMap(f34)); } + private Timestamp timestampFromNanos(long nanos) { + return Timestamp.fromEpochMillis( + Math.floorDiv(nanos, 1_000_000L), (int) Math.floorMod(nanos, 1_000_000L)); + } + private Timestamp toMills(Integer v) { return Timestamp.fromEpochMillis( Timestamp.fromLocalDateTime(toDateTime(v)).getMillisecond()); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java index bfbdaed7c4a3..2808cc535abb 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java @@ -24,7 +24,10 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -32,6 +35,7 @@ import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType; import static org.apache.paimon.types.DataTypesTest.assertThat; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; /** Test for {@link ParquetSchemaConverter}. */ public class ParquetSchemaConverterTest { @@ -100,6 +104,37 @@ public class ParquetSchemaConverterTest { .STRING()))) .notNull())))))); + @Test + public void testParquetTimestampNanosSchemaConvert() { + MessageType messageType = + new MessageType( + "origin-parquet", + Types.primitive(INT64, Type.Repetition.OPTIONAL) + .as( + LogicalTypeAnnotation.timestampType( + false, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("timestamp_nanos") + .withId(0), + Types.primitive(INT64, Type.Repetition.OPTIONAL) + .as( + LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.NANOS)) + .named("timestamp_ltz_nanos") + .withId(1)); + + RowType rowType = convertToPaimonRowType(messageType); + + assertThat( + new RowType( + Arrays.asList( + new DataField(0, "timestamp_nanos", DataTypes.TIMESTAMP(9)), + new DataField( + 1, + "timestamp_ltz_nanos", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9))))) + .isEqualTo(rowType); + } + @Test public void testPaimonParquetSchemaConvert() { MessageType messageType = convertToParquetMessageType(ALL_TYPES);