Skip to content

Commit

Permalink
ARROW-8244: [Python] Fix parquet.write_to_dataset to set file path in…
Browse files Browse the repository at this point in the history
… metadata_collector

This explores a potential fix for ARROW-8244, it seems rather straightforward to set the file path in `write_to_dataset` (`write_table` does not do this, because there the user passes a full path, so no relative path is known).

cc @rjzamora does this look the correct logic?

Closes #6797 from jorisvandenbossche/ARROW-8244

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
  • Loading branch information
jorisvandenbossche authored and fsaintjacques committed Apr 2, 2020
1 parent ee8d3e8 commit ac3bfe4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
31 changes: 20 additions & 11 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ class ParquetWriter:
**options : dict
If options contains a key `metadata_collector` then the
corresponding value is assumed to be a list (or any object with
`.append` method) that will be filled with file metadata instances
of dataset pieces.
`.append` method) that will be filled with the file metadata instance
of the written file.
""".format(_parquet_writer_arg_docs)

def __init__(self, where, schema, filesystem=None,
Expand Down Expand Up @@ -1429,15 +1429,18 @@ def write_to_dataset(table, root_path, partition_cols=None,
and allow you to override the partition filename. If nothing is
passed, the filename will consist of a uuid.
**kwargs : dict,
kwargs for write_table function. Using `metadata_collector` in
kwargs allows one to collect the file metadata instances of
dataset pieces. See docstring for `write_table` or
`ParquetWriter` for more information.
Additional kwargs for write_table function. See docstring for
`write_table` or `ParquetWriter` for more information.
Using `metadata_collector` in kwargs allows one to collect the
file metadata instances of dataset pieces. The file paths in the
ColumnChunkMetaData will be set relative to `root_path`.
"""
fs, root_path = _get_filesystem_and_path(filesystem, root_path)

_mkdir_if_not_exists(fs, root_path)

metadata_collector = kwargs.pop('metadata_collector', None)

if partition_cols is not None and len(partition_cols) > 0:
df = table.to_pandas(ignore_metadata=True)
partition_keys = [df[col] for col in partition_cols]
Expand All @@ -1462,23 +1465,29 @@ def write_to_dataset(table, root_path, partition_cols=None,
for name, val in zip(partition_cols, keys)])
subtable = pa.Table.from_pandas(subgroup, preserve_index=False,
schema=subschema, safe=False)
prefix = '/'.join([root_path, subdir])
_mkdir_if_not_exists(fs, prefix)
_mkdir_if_not_exists(fs, '/'.join([root_path, subdir]))
if partition_filename_cb:
outfile = partition_filename_cb(keys)
else:
outfile = guid() + '.parquet'
full_path = '/'.join([prefix, outfile])
relative_path = '/'.join([subdir, outfile])
full_path = '/'.join([root_path, relative_path])
with fs.open(full_path, 'wb') as f:
write_table(subtable, f, **kwargs)
write_table(subtable, f, metadata_collector=metadata_collector,
**kwargs)
if metadata_collector is not None:
metadata_collector[-1].set_file_path(relative_path)
else:
if partition_filename_cb:
outfile = partition_filename_cb(None)
else:
outfile = guid() + '.parquet'
full_path = '/'.join([root_path, outfile])
with fs.open(full_path, 'wb') as f:
write_table(table, f, **kwargs)
write_table(table, f, metadata_collector=metadata_collector,
**kwargs)
if metadata_collector is not None:
metadata_collector[-1].set_file_path(outfile)


def write_metadata(schema, where, version='1.0',
Expand Down
35 changes: 34 additions & 1 deletion python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3233,7 +3233,7 @@ def test_direct_read_dictionary_subfield():


@pytest.mark.pandas
def test_dataset_metadata(tempdir):
def test_write_to_dataset_metadata(tempdir):
path = tempdir / "ARROW-1983-dataset"

# create and write a test dataset
Expand All @@ -3253,6 +3253,7 @@ def test_dataset_metadata(tempdir):
dataset = pq.ParquetDataset(path)
metadata_list2 = [p.get_metadata() for p in dataset.pieces]

collected_paths = []
# compare metadata list content:
assert len(metadata_list) == len(metadata_list2)
for md, md2 in zip(metadata_list, metadata_list2):
Expand All @@ -3261,8 +3262,40 @@ def test_dataset_metadata(tempdir):
# serialized_size is initialized in the reader:
assert d.pop('serialized_size') == 0
assert d2.pop('serialized_size') > 0
# file_path is different (not set for in-file metadata)
assert d["row_groups"][0]["columns"][0]["file_path"] != ""
assert d2["row_groups"][0]["columns"][0]["file_path"] == ""
# collect file paths to check afterwards, ignore here
collected_paths.append(d["row_groups"][0]["columns"][0]["file_path"])
d["row_groups"][0]["columns"][0]["file_path"] = ""
assert d == d2

# ARROW-8244 - check the file paths in the collected metadata
n_root = len(path.parts)
file_paths = ["/".join(p.parts[n_root:]) for p in path.rglob("*.parquet")]
assert sorted(collected_paths) == sorted(file_paths)

# writing to single file (not partitioned)
metadata_list = []
pq.write_to_dataset(pa.table({'a': [1, 2, 3]}), root_path=str(path),
metadata_collector=metadata_list)

# compare metadata content
file_paths = list(path.glob("*.parquet"))
assert len(file_paths) == 1
file_path = file_paths[0]
file_metadata = pq.read_metadata(file_path)
d1 = metadata_list[0].to_dict()
d2 = file_metadata.to_dict()
# serialized_size is initialized in the reader:
assert d1.pop('serialized_size') == 0
assert d2.pop('serialized_size') > 0
# file_path is different (not set for in-file metadata)
assert d1["row_groups"][0]["columns"][0]["file_path"] == file_path.name
assert d2["row_groups"][0]["columns"][0]["file_path"] == ""
d1["row_groups"][0]["columns"][0]["file_path"] = ""
assert d1 == d2


def test_parquet_file_too_small(tempdir):
path = str(tempdir / "test.parquet")
Expand Down

0 comments on commit ac3bfe4

Please sign in to comment.