Skip to content

[Bug] Parquet format with timestamp with local time processing wrong #5066

@liuyehcf

Description

@liuyehcf

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

For timestamp with local time type, the local time should be normalized to UTC time then written to parquet/orc file.

Orc format process well as expected, but parquet not.

Compute Engine

Just Paimon SDK itself.

Minimal reproduce step

the code(0.8.2):

package org.byconity.paimon.demo;

import org.apache.commons.io.FileUtils;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataTypes;

import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

public class ParquetWriteTest {

    private static void testTimestampWithFormat(String localPath, String fileFormat)
            throws Exception {
        Options options = new Options();
        options.set(CatalogOptions.METASTORE, "filesystem");
        options.set(CatalogOptions.WAREHOUSE, new Path(localPath).toUri().toString());
        CatalogContext context = CatalogContext.create(options);
        Catalog catalog = CatalogFactory.createCatalog(context);
        String dbName = "testDb" + "_" + fileFormat;
        String tblName = "testTbl";
        catalog.createDatabase(dbName, false);
        Schema.Builder schemaBuilder = Schema.newBuilder();
        schemaBuilder.column("col_local_zoned_timestamp",
                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
        schemaBuilder.option("file.format", fileFormat);
        Schema schema = schemaBuilder.build();
        Identifier tableId = Identifier.create(dbName, tblName);
        catalog.createTable(tableId, schema, false);
        Table table = catalog.getTable(tableId);
        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder().withOverwrite();
        try (BatchTableWrite write = writeBuilder.newWrite()) {
            LocalDateTime localDateTime = LocalDateTime.parse("2024-12-12 10:10:10.000000",
                    DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"));
            GenericRow record = GenericRow.of(Timestamp.fromLocalDateTime(localDateTime));
            write.write(record);
            List<CommitMessage> messages = write.prepareCommit();
            try (BatchTableCommit commit = writeBuilder.newCommit()) {
                commit.commit(messages);
            }
        }

        ReadBuilder readBuilder = table.newReadBuilder();
        List<Split> splits = readBuilder.newScan().plan().splits();
        TableRead read = readBuilder.newRead();

        try (RecordReader<InternalRow> reader = read.createReader(splits)) {
            RecordReader.RecordIterator<InternalRow> batch;
            while ((batch = reader.readBatch()) != null) {
                InternalRow row;
                while ((row = batch.next()) != null) {
                    Timestamp timestamp = row.getTimestamp(0, 6);
                    System.out.println(timestamp);
                }
                batch.releaseBatch();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        String localPath = "/tmp/paimon_warehouse";
        FileUtils.deleteDirectory(new File(localPath));

        testTimestampWithFormat(localPath, "orc");
        testTimestampWithFormat(localPath, "parquet");
    }
}

and, you can check the orc data file and parquet data file, it stores different UTC timestamp.

Here are two py scrips for display orc and parquet file

import pyarrow.orc as orc
import sys

def read_orc_file(file_path):
    orc_file = orc.ORCFile(file_path)
    table = orc_file.read()

    print("Schema:")
    print(table.schema)
    print("\nData:")
    print(table.to_pandas())

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python read_orc.py <path_to_orc_file>")
        sys.exit(1)

    orc_file_path = sys.argv[1]
    read_orc_file(orc_file_path)
import pyarrow.parquet as pq
import sys

def read_parquet_file(file_path):
    parquet_file = pq.ParquetFile(file_path)
    table = parquet_file.read()

    print("Schema:")
    print(table.schema)
    print("\nData:")
    print(table.to_pandas())

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python read_parquet.py <path_to_parquet_file>")
        sys.exit(1)

    parquet_file_path = sys.argv[1]
    read_parquet_file(parquet_file_path)

Results:

python3 ~/Code/py/read_orc.py /tmp/paimon_warehouse/testDb_orc.db/testTbl/bucket-0/data-945a146d-9ee2-4f41-987f-0231c63b97e8-0.orc
Schema:
col_local_zoned_timestamp: timestamp[ns, tz=UTC]

Data:
  col_local_zoned_timestamp
0 2024-12-12 02:10:10+00:00
python3 ~/Code/py/read_parquet.py /tmp/paimon_warehouse/testDb_parquet.db/testTbl/bucket-0/data-82e3abb6-98e9-4eb5-9a6f-126a96fc056c-0.parquet
Schema:
col_local_zoned_timestamp: timestamp[us, tz=UTC]

Data:
  col_local_zoned_timestamp
0 2024-12-12 10:10:10+00:00

As you can see, parquet didn't normalize the local time to UTC timestamp, but simply treated it as UTC timestamp and write into file directly.

What doesn't meet your expectations?

the data stored in parquet is not met expectations.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions