Skip to content

Commit

Permalink
ARROW-2587: [Python][Parquet] Verify nested data can be written
Browse files Browse the repository at this point in the history
 - Plumbs through engine version
 - Makes engine version settable via environment variable
 - Adds unit tests coverage

Should also unblock: googleapis/python-bigquery#21

CC @wesm

Closes #6751 from emkornfield/add_flag_to_python

Authored-by: Micah Kornfield <emkornfield@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
emkornfield authored and wesm committed Mar 29, 2020
1 parent f140625 commit db50352
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
6 changes: 6 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* allow_truncated_timestamps()
Builder* disallow_truncated_timestamps()
Builder* store_schema()
Builder* set_engine_version(ArrowWriterEngineVersion version)
shared_ptr[ArrowWriterProperties] build()
c_bool support_deprecated_int96_timestamps()

Expand Down Expand Up @@ -437,6 +438,11 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
shared_ptr[SchemaDescriptor]* out)


cdef extern from "parquet/properties.h" namespace "parquet" nogil:
cdef enum ArrowWriterEngineVersion:
V1 "parquet::ArrowWriterProperties::V1",
V2 "parquet::ArrowWriterProperties::V2"

cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
cdef cppclass FileWriter:

Expand Down
14 changes: 13 additions & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,7 @@ cdef class ParquetWriter:
object compression_level
object version
object write_statistics
object writer_engine_version
int row_group_size
int64_t data_page_size

Expand All @@ -1221,7 +1222,8 @@ cdef class ParquetWriter:
data_page_size=None,
allow_truncated_timestamps=False,
compression_level=None,
use_byte_stream_split=False):
use_byte_stream_split=False,
writer_engine_version=None):
cdef:
shared_ptr[WriterProperties] properties
c_string c_where
Expand All @@ -1247,6 +1249,7 @@ cdef class ParquetWriter:
self.coerce_timestamps = coerce_timestamps
self.allow_truncated_timestamps = allow_truncated_timestamps
self.use_byte_stream_split = use_byte_stream_split
self.writer_engine_version = writer_engine_version

cdef WriterProperties.Builder properties_builder
self._set_version(&properties_builder)
Expand All @@ -1269,6 +1272,7 @@ cdef class ParquetWriter:
self._set_int96_support(&arrow_properties_builder)
self._set_coerce_timestamps(&arrow_properties_builder)
self._set_allow_truncated_timestamps(&arrow_properties_builder)
self._set_writer_engine_version(&arrow_properties_builder)

arrow_properties = arrow_properties_builder.build()

Expand Down Expand Up @@ -1302,6 +1306,14 @@ cdef class ParquetWriter:
else:
props.disallow_truncated_timestamps()

cdef int _set_writer_engine_version(
self, ArrowWriterProperties.Builder* props) except -1:
if self.writer_engine_version == "V1":
props.set_engine_version(ArrowWriterEngineVersion.V1)
elif self.writer_engine_version != "V2":
raise ValueError("Unsupported Writer Engine Version: {0}"
.format(self.writer_engine_version))

cdef int _set_version(self, WriterProperties.Builder* props) except -1:
if self.version is not None:
if self.version == "1.0":
Expand Down
11 changes: 10 additions & 1 deletion python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,13 @@ def _sanitize_table(table, new_schema, flavor):
only for some columns. If both dictionary and byte_stream_stream are
enabled, then dictionary is prefered.
The byte_stream_split encoding is valid only for floating-point data types
and should be combined with a compression codec."""
and should be combined with a compression codec.
writer_engine_version: str, default "V2"
The engine version to use when writing out Arrow data. V2 supports
all nested types. V1 is legacy and will be removed in a future release.
Setting the environment variable ARROW_PARQUET_WRITER_ENGINE will
override the default.
"""


class ParquetWriter:
Expand Down Expand Up @@ -429,6 +435,7 @@ def __init__(self, where, schema, filesystem=None,
use_deprecated_int96_timestamps=None,
compression_level=None,
use_byte_stream_split=False,
writer_engine_version=None,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -456,6 +463,7 @@ def __init__(self, where, schema, filesystem=None,
else:
sink = where
self._metadata_collector = options.pop('metadata_collector', None)
engine_version = os.environ.get('ARROW_PARQUET_WRITER_ENGINE', 'V2')
self.writer = _parquet.ParquetWriter(
sink, schema,
version=version,
Expand All @@ -465,6 +473,7 @@ def __init__(self, where, schema, filesystem=None,
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
writer_engine_version=engine_version,
**options)
self.is_open = True

Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,31 @@ def test_pandas_parquet_empty_roundtrip(tempdir):
tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_pandas_can_write_nested_data(tempdir):
data = {
"agg_col": [
{"page_type": 1},
{"record_type": 1},
{"non_consectutive_home": 0},
],
"uid_first": "1001"
}
df = pd.DataFrame(data=data)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
# This succeeds under V2
_write_table(arrow_table, imos)

# Under V1 it fails.
with pytest.raises(ValueError):
import os
os.environ['ARROW_PARQUET_WRITER_ENGINE'] = 'V1'
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos)
del os.environ['ARROW_PARQUET_WRITER_ENGINE']


@pytest.mark.pandas
def test_pandas_parquet_pyfile_roundtrip(tempdir):
filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
Expand Down

0 comments on commit db50352

Please sign in to comment.