Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-2587: [Python][Parquet] Verify nested data can be written #6751

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* allow_truncated_timestamps()
Builder* disallow_truncated_timestamps()
Builder* store_schema()
Builder* set_engine_version(ArrowWriterEngineVersion version)
shared_ptr[ArrowWriterProperties] build()
c_bool support_deprecated_int96_timestamps()

Expand Down Expand Up @@ -437,6 +438,11 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
shared_ptr[SchemaDescriptor]* out)


cdef extern from "parquet/properties.h" namespace "parquet" nogil:
cdef enum ArrowWriterEngineVersion:
V1 "parquet::ArrowWriterProperties::V1",
V2 "parquet::ArrowWriterProperties::V2"

cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
cdef cppclass FileWriter:

Expand Down
14 changes: 13 additions & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,7 @@ cdef class ParquetWriter:
object compression_level
object version
object write_statistics
object writer_engine_version
int row_group_size
int64_t data_page_size

Expand All @@ -1221,7 +1222,8 @@ cdef class ParquetWriter:
data_page_size=None,
allow_truncated_timestamps=False,
compression_level=None,
use_byte_stream_split=False):
use_byte_stream_split=False,
writer_engine_version=None):
cdef:
shared_ptr[WriterProperties] properties
c_string c_where
Expand All @@ -1247,6 +1249,7 @@ cdef class ParquetWriter:
self.coerce_timestamps = coerce_timestamps
self.allow_truncated_timestamps = allow_truncated_timestamps
self.use_byte_stream_split = use_byte_stream_split
self.writer_engine_version = writer_engine_version

cdef WriterProperties.Builder properties_builder
self._set_version(&properties_builder)
Expand All @@ -1269,6 +1272,7 @@ cdef class ParquetWriter:
self._set_int96_support(&arrow_properties_builder)
self._set_coerce_timestamps(&arrow_properties_builder)
self._set_allow_truncated_timestamps(&arrow_properties_builder)
self._set_writer_engine_version(&arrow_properties_builder)

arrow_properties = arrow_properties_builder.build()

Expand Down Expand Up @@ -1302,6 +1306,14 @@ cdef class ParquetWriter:
else:
props.disallow_truncated_timestamps()

cdef int _set_writer_engine_version(
self, ArrowWriterProperties.Builder* props) except -1:
if self.writer_engine_version == "V1":
props.set_engine_version(ArrowWriterEngineVersion.V1)
elif self.writer_engine_version != "V2":
raise ValueError("Unsupported Writer Engine Version: {0}"
.format(self.writer_engine_version))

cdef int _set_version(self, WriterProperties.Builder* props) except -1:
if self.version is not None:
if self.version == "1.0":
Expand Down
11 changes: 10 additions & 1 deletion python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,13 @@ def _sanitize_table(table, new_schema, flavor):
only for some columns. If both dictionary and byte_stream_stream are
enabled, then dictionary is prefered.
The byte_stream_split encoding is valid only for floating-point data types
and should be combined with a compression codec."""
and should be combined with a compression codec.
writer_engine_version: str, default "V2"
The engine version to use when writing out Arrow data. V2 supports
all nested types. V1 is legacy and will be removed in a future release.
Setting the environment variable ARROW_PARQUET_WRITER_ENGINE will
override the default.
"""


class ParquetWriter:
Expand Down Expand Up @@ -429,6 +435,7 @@ def __init__(self, where, schema, filesystem=None,
use_deprecated_int96_timestamps=None,
compression_level=None,
use_byte_stream_split=False,
writer_engine_version=None,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -456,6 +463,7 @@ def __init__(self, where, schema, filesystem=None,
else:
sink = where
self._metadata_collector = options.pop('metadata_collector', None)
engine_version = os.environ.get('ARROW_PARQUET_WRITER_ENGINE', 'V2')
self.writer = _parquet.ParquetWriter(
sink, schema,
version=version,
Expand All @@ -465,6 +473,7 @@ def __init__(self, where, schema, filesystem=None,
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
writer_engine_version=engine_version,
**options)
self.is_open = True

Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,31 @@ def test_pandas_parquet_empty_roundtrip(tempdir):
tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_pandas_can_write_nested_data(tempdir):
data = {
"agg_col": [
{"page_type": 1},
{"record_type": 1},
{"non_consectutive_home": 0},
],
"uid_first": "1001"
}
df = pd.DataFrame(data=data)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
# This succeeds under V2
_write_table(arrow_table, imos)

# Under V1 it fails.
with pytest.raises(ValueError):
import os
os.environ['ARROW_PARQUET_WRITER_ENGINE'] = 'V1'
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos)
del os.environ['ARROW_PARQUET_WRITER_ENGINE']


@pytest.mark.pandas
def test_pandas_parquet_pyfile_roundtrip(tempdir):
filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
Expand Down