Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,16 @@ public static DataField convertToPaimonField(Type parquetType) {
instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
int precision =
timestampType
.getUnit()
.equals(LogicalTypeAnnotation.TimeUnit.MILLIS)
? 3
: 6;
int precision;
if (timestampType.getUnit().equals(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
precision = 3;
} else if (timestampType
.getUnit()
.equals(LogicalTypeAnnotation.TimeUnit.MICROS)) {
precision = 6;
} else {
precision = 9;
}
paimonDataType =
timestampType.isAdjustedToUTC()
? new LocalZonedTimestampType(precision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ public UpdaterFactory visit(TimestampType timestampType) {
return c -> {
if (c.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT64) {
return new LongTimestampUpdater(timestampType.getPrecision());
return new LongTimestampUpdater(
timestampType.getPrecision(),
timestampUnit(c, timestampType.getPrecision()));
} else if (c.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT96) {
return new TimestampUpdater(timestampType.getPrecision());
Expand All @@ -206,12 +208,32 @@ public UpdaterFactory visit(LocalZonedTimestampType localZonedTimestampType) {
return c -> {
if (c.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT64) {
return new LongUpdater();
return new LongTimestampUpdater(
localZonedTimestampType.getPrecision(),
timestampUnit(c, localZonedTimestampType.getPrecision()));
}
return new TimestampUpdater(localZonedTimestampType.getPrecision());
};
}

private static LogicalTypeAnnotation.TimeUnit timestampUnit(
ColumnDescriptor descriptor, int precision) {
LogicalTypeAnnotation typeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
if (typeAnnotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
return ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) typeAnnotation)
.getUnit();
}

if (precision <= 3) {
return LogicalTypeAnnotation.TimeUnit.MILLIS;
} else if (precision <= 6) {
return LogicalTypeAnnotation.TimeUnit.MICROS;
} else {
return LogicalTypeAnnotation.TimeUnit.MILLIS;
}
}

@Override
public UpdaterFactory visit(VariantType variantType) {
throw new RuntimeException("Variant type is not supported");
Expand Down Expand Up @@ -436,8 +458,11 @@ public void readValues(

private static class LongTimestampUpdater extends AbstractTimestampUpdater {

public LongTimestampUpdater(int precision) {
private final LogicalTypeAnnotation.TimeUnit timeUnit;

public LongTimestampUpdater(int precision, LogicalTypeAnnotation.TimeUnit timeUnit) {
super(precision);
this.timeUnit = timeUnit;
}

@Override
Expand Down Expand Up @@ -465,9 +490,67 @@ public void decodeSingleDictionaryId(
private void putTimestamp(WritableColumnVector vector, int offset, long timestamp) {
if (vector instanceof WritableTimestampVector) {
((WritableTimestampVector) vector)
.setTimestamp(offset, Timestamp.fromEpochMillis(timestamp));
.setTimestamp(offset, timestampFromInt64(timestamp, timeUnit));
} else {
((WritableLongVector) vector).setLong(offset, longTimestamp(timestamp));
}
}

private long longTimestamp(long timestamp) {
if (precision <= 3) {
return millisFromInt64(timestamp, timeUnit);
} else if (precision <= 6) {
return microsFromInt64(timestamp, timeUnit);
} else {
((WritableLongVector) vector).setLong(offset, timestamp);
throw new UnsupportedOperationException(
"Unsupported timestamp precision: " + precision);
}
}

private static Timestamp timestampFromInt64(
long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
switch (timeUnit) {
case MILLIS:
return Timestamp.fromEpochMillis(timestamp);
case MICROS:
return Timestamp.fromMicros(timestamp);
case NANOS:
return Timestamp.fromEpochMillis(
Math.floorDiv(timestamp, 1_000_000L),
(int) Math.floorMod(timestamp, 1_000_000L));
default:
throw new UnsupportedOperationException(
"Unsupported timestamp unit: " + timeUnit);
}
}

private static long millisFromInt64(
long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
switch (timeUnit) {
case MILLIS:
return timestamp;
case MICROS:
return Math.floorDiv(timestamp, 1_000L);
case NANOS:
return Math.floorDiv(timestamp, 1_000_000L);
default:
throw new UnsupportedOperationException(
"Unsupported timestamp unit: " + timeUnit);
}
}

private static long microsFromInt64(
long timestamp, LogicalTypeAnnotation.TimeUnit timeUnit) {
switch (timeUnit) {
case MILLIS:
return Math.multiplyExact(timestamp, 1_000L);
case MICROS:
return timestamp;
case NANOS:
return Math.floorDiv(timestamp, 1_000L);
default:
throw new UnsupportedOperationException(
"Unsupported timestamp unit: " + timeUnit);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,77 @@ public void testReadBinaryWrittenByParquet() throws Exception {
});
}

@Test
public void testReadTimestampNanosWrittenByParquet() throws Exception {
Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
Configuration conf = new Configuration();
Type timestampNanosType =
Types.primitive(INT64, Type.Repetition.REQUIRED)
.as(
LogicalTypeAnnotation.timestampType(
false, LogicalTypeAnnotation.TimeUnit.NANOS))
.named("f0")
.withId(0);
Type arrayTimestampNanosType =
ConversionPatterns.listOfElements(
Type.Repetition.OPTIONAL,
"f1",
Types.primitive(INT64, Type.Repetition.OPTIONAL)
.as(
LogicalTypeAnnotation.timestampType(
false,
LogicalTypeAnnotation.TimeUnit.NANOS))
.named("element")
.withId(2))
.withId(1);
MessageType schema =
new MessageType("origin-parquet", timestampNanosType, arrayTimestampNanosType);
long[] nanosValues = new long[] {1704067200123456789L, -123456789L};

try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(
HadoopOutputFile.fromPath(
new org.apache.hadoop.fs.Path(path.toString()), conf))
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withConf(new Configuration())
.withType(schema)
.build()) {
SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(schema);
for (long nanos : nanosValues) {
Group row = simpleGroupFactory.newGroup();
row.append("f0", nanos);
Group array = row.addGroup("f1");
array.addGroup(0).add(0, nanos);
array.addGroup(0).add(0, nanos + 1);
writer.write(row);
}
}

RowType paimonRowType =
RowType.builder()
.fields(new TimestampType(9), new ArrayType(new TimestampType(9)))
.build();
ParquetReaderFactory format =
new ParquetReaderFactory(new Options(), paimonRowType, 500, FilterCompat.NOOP);
AtomicInteger count = new AtomicInteger(0);
try (RecordReader<InternalRow> reader =
format.createReader(
new FormatReaderContext(
new LocalFileIO(), path, new LocalFileIO().getFileSize(path)))) {
reader.forEachRemaining(
row -> {
long nanos = nanosValues[count.get()];
assertThat(row.getTimestamp(0, 9)).isEqualTo(timestampFromNanos(nanos));
assertThat(row.getArray(1).getTimestamp(0, 9))
.isEqualTo(timestampFromNanos(nanos));
assertThat(row.getArray(1).getTimestamp(1, 9))
.isEqualTo(timestampFromNanos(nanos + 1));
count.incrementAndGet();
});
}
assertThat(count.get()).isEqualTo(nanosValues.length);
}

private void innerTestTypes(File folder, List<Integer> records, int rowGroupSize)
throws IOException {
List<InternalRow> rows = records.stream().map(this::newRow).collect(Collectors.toList());
Expand Down Expand Up @@ -865,6 +936,11 @@ private InternalRow newRow(Integer v) {
new GenericMap(f34));
}

private Timestamp timestampFromNanos(long nanos) {
return Timestamp.fromEpochMillis(
Math.floorDiv(nanos, 1_000_000L), (int) Math.floorMod(nanos, 1_000_000L));
}

private Timestamp toMills(Integer v) {
return Timestamp.fromEpochMillis(
Timestamp.fromLocalDateTime(toDateTime(v)).getMillisecond());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;

import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType;
import static org.apache.paimon.types.DataTypesTest.assertThat;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

/** Test for {@link ParquetSchemaConverter}. */
public class ParquetSchemaConverterTest {
Expand Down Expand Up @@ -100,6 +104,37 @@ public class ParquetSchemaConverterTest {
.STRING())))
.notNull()))))));

@Test
public void testParquetTimestampNanosSchemaConvert() {
MessageType messageType =
new MessageType(
"origin-parquet",
Types.primitive(INT64, Type.Repetition.OPTIONAL)
.as(
LogicalTypeAnnotation.timestampType(
false, LogicalTypeAnnotation.TimeUnit.NANOS))
.named("timestamp_nanos")
.withId(0),
Types.primitive(INT64, Type.Repetition.OPTIONAL)
.as(
LogicalTypeAnnotation.timestampType(
true, LogicalTypeAnnotation.TimeUnit.NANOS))
.named("timestamp_ltz_nanos")
.withId(1));

RowType rowType = convertToPaimonRowType(messageType);

assertThat(
new RowType(
Arrays.asList(
new DataField(0, "timestamp_nanos", DataTypes.TIMESTAMP(9)),
new DataField(
1,
"timestamp_ltz_nanos",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)))))
.isEqualTo(rowType);
}

@Test
public void testPaimonParquetSchemaConvert() {
MessageType messageType = convertToParquetMessageType(ALL_TYPES);
Expand Down
Loading