From 99f825d35054803c8e3840ff0035f056e4d4b85f Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 18 Jul 2017 16:27:31 +0200 Subject: [PATCH] Add flag for timestamp[ns] roundtrips Change-Id: I8b32864774b4df345bc028b4cc0f866fb8a0b9d1 --- python/pyarrow/_parquet.pxd | 9 +++++++++ python/pyarrow/_parquet.pyx | 17 +++++++++++++++-- python/pyarrow/parquet.py | 11 +++++++---- python/pyarrow/tests/test_parquet.py | 20 +++++++++++++++++++- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 3d2d0c86ce021..b1cd5eb2c2be0 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -247,8 +247,17 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: CStatus Open(const CSchema& schema, CMemoryPool* pool, const shared_ptr[OutputStream]& sink, const shared_ptr[WriterProperties]& properties, + const shared_ptr[ArrowWriterProperties]& arrow_properties, unique_ptr[FileWriter]* writer) CStatus WriteTable(const CTable& table, int64_t chunk_size) CStatus NewRowGroup(int64_t chunk_size) CStatus Close() + + cdef cppclass ArrowWriterProperties: + cppclass Builder: + Builder() + Builder* disable_deprecated_int96_timestamps() + Builder* enable_deprecated_int96_timestamps() + shared_ptr[ArrowWriterProperties] build() + c_bool support_deprecated_int96_timestamps() diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 5d446a81044b9..340ee178fbf2d 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -539,13 +539,14 @@ cdef class ParquetWriter: cdef readonly: object use_dictionary + object use_deprecated_int96_timestamps object compression object version int row_group_size def __cinit__(self, where, Schema schema, use_dictionary=None, compression=None, version=None, - MemoryPool memory_pool=None): + MemoryPool memory_pool=None, use_deprecated_int96_timestamps=False): cdef: shared_ptr[FileOutputStream] filestream shared_ptr[OutputStream] sink @@ -560,6 +561,7 @@ cdef class ParquetWriter: self.use_dictionary = use_dictionary self.compression = compression self.version = version + self.use_deprecated_int96_timestamps = use_deprecated_int96_timestamps cdef WriterProperties.Builder properties_builder self._set_version(&properties_builder) @@ -567,10 +569,21 @@ cdef class ParquetWriter: self._set_dictionary_props(&properties_builder) properties = properties_builder.build() + cdef ArrowWriterProperties.Builder arrow_properties_builder + self._set_int96_support(&arrow_properties_builder) + arrow_properties = arrow_properties_builder.build() + check_status( FileWriter.Open(deref(schema.schema), maybe_unbox_memory_pool(memory_pool), - sink, properties, &self.writer)) + sink, properties, arrow_properties, + &self.writer)) + + cdef void _set_int96_support(self, ArrowWriterProperties.Builder* props): + if self.use_deprecated_int96_timestamps: + props.enable_deprecated_int96_timestamps() + else: + props.disable_deprecated_int96_timestamps() cdef void _set_version(self, WriterProperties.Builder* props): if self.version is not None: diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index dc26daba90597..b9ed161add09d 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -675,7 +675,8 @@ def read_pandas(source, columns=None, nthreads=1, metadata=None): def write_table(table, where, row_group_size=None, version='1.0', - use_dictionary=True, compression='snappy', **kwargs): + use_dictionary=True, compression='snappy', + use_deprecated_int96_timestamps=False, **kwargs): """ Write a Table to Parquet format @@ -698,12 +699,13 @@ def write_table(table, where, row_group_size=None, version='1.0', writer = ParquetWriter(where, table.schema, use_dictionary=use_dictionary, compression=compression, - version=version) + version=version, + use_deprecated_int96_timestamps=use_deprecated_int96_timestamps) writer.write_table(table, row_group_size=row_group_size) writer.close() -def write_metadata(schema, where, version='1.0'): +def write_metadata(schema, where, version='1.0', use_deprecated_int96_timestamps=False): """ Write metadata-only Parquet file from schema @@ -714,5 +716,6 @@ def write_metadata(schema, where, version='1.0'): version : {"1.0", "2.0"}, default "1.0" The Parquet format version, defaults to 1.0 """ - writer = ParquetWriter(where, schema, version=version) + writer = ParquetWriter(where, schema, version=version, + use_deprecated_int96_timestamps=use_deprecated_int96_timestamps) writer.close() diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index dee2ff9a13066..2e6ef3e8f4c20 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -456,6 +456,11 @@ def test_date_time_types(): data7 = np.array([start, start + 1, start + 2], dtype='int64') a7 = pa.Array.from_pandas(data7, type=t7) + t7_us = pa.timestamp('us') + start = pd.Timestamp('2001-01-01').value + data7_us = np.array([start, start + 1, start + 2], dtype='int64') / 1000 + a7_us = pa.Array.from_pandas(data7_us, type=t7_us) + table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7], ['date32', 'date64', 'timestamp[us]', 'time32[s]', 'time64[us]', @@ -464,7 +469,8 @@ def test_date_time_types(): # date64 as date32 # time32[s] to time32[ms] - expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7], + # 'timestamp[ns]' to 'timestamp[us]' + expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7_us], ['date32', 'date64', 'timestamp[us]', 'time32[s]', 'time64[us]', 'time32_from64[s]', @@ -472,6 +478,18 @@ def test_date_time_types(): _check_roundtrip(table, expected=expected, version='2.0') + # date64 as date32 + # time32[s] to time32[ms] + # 'timestamp[ns]' is saved as INT96 timestamp + expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7], + ['date32', 'date64', 'timestamp[us]', + 'time32[s]', 'time64[us]', + 'time32_from64[s]', + 'timestamp[ns]']) + + _check_roundtrip(table, expected=expected, version='2.0', + use_deprecated_int96_timestamps=True) + # Unsupported stuff def _assert_unsupported(array): table = pa.Table.from_arrays([array], ['unsupported'])