Skip to content

Conversation

@saffrydaffry
Copy link

Add write_to_dataset in pyarrow.parquet that writes tables to parquet given partitioning columns.

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! This will be very useful. I made some comments around the API and particular details in the implementation that will affect memory use and performance

@jreback and @cpcloud could you double check my reasoning if you agree re: the performance considerations? we may want to add some ASV benchmarks so that we can ascertain the exact performance implications

----------
table : pyarrow.Table
where: string,
Name of the parquet file for data saved in each partition
Copy link
Member

@wesm wesm Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In these cases, usually the data file paths are randomly generated. Any reason not to use a uuid value like $UUID.parquet? (see guid() function in pyarrow.compat)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could make the file path an optional parameter, so that if it's None a uuid is used

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I'll take a look at guid(). I was also looking at uuid4() from the uuid module, but glad to know there's already a native function.

table : pyarrow.Table
where: string,
Name of the parquet file for data saved in each partition
parition_cols : list,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in parameter name.

This should be optional, so that this function also works for an unpartitioned dataset

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re Typo: oof!

re Optional parameter: if no parameter is specified, would it be best to just call the original parquet.write_table?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because you're appending data to a dataset consisting of one or more Parquet files, so you might have:

root_dir/
  file1.parquet
  file2.parquet
  file3.parquet
  file4.parquet

So if partition_cols is not specified, then it will write another file to the folder. This special case should be handled in the implementation of this function to avoid unnecessary data copying, and unit tested, too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see! Giving the file names UUIDs (as suggested above) will make this simpler and follow the spark implementation more closely: root_dir is specified, but each of the files appended under it will be <uuid>.parquet.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re "Unnecessary data copying": Are you saying I should check if the data being appended to the dataset has duplicates within the existing dataset? If so, this seems like it could get hairy: a) I might want duplicates in my dataset (not sure why I would) b) I'm not sure how to do it other than reading back in the existing data, appending new data, deduping, then overwriting the directory with the complete dataset.

I'm hoping users will know not to accidentally write the same data twice. Right now, my implementation for this is:

    if partition_cols is not None:
          (modified original function)
    else:
        outfile = compat.guid() + ".parquet"
        full_path = "/".join([root_path, outfile])
        write_table(table, full_path, **kwargs)

Column names by which to partition the dataset
Columns are partitioned in the order they are given
root_path: string,
The root directory of the table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this should be the second parameter:

pq.write_to_dataset(table, '/path/to/mydata')
pq.write_to_dataset(table, '/path/to/mydata', partition_cols=[k1, k2])


df = table.to_pandas()
groups = df.groupby(partition_cols)
data_cols = [col for col in df.columns.tolist()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use df.columns.drop(partition_cols) here

if col not in partition_cols]
for partition in partition_cols:
try:
df[partition] = df[partition].astype(str)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite expensive. It would be faster to coerce the keys to string only when writing to the dataset

except ValueError:
raise ValueError("Partition column must be coercible to string")

if not data_cols:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(data_cols) == 0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


schema = {}
for subgroup in groups.indices:
sub_df = groups.get_group(subgroup)[data_cols]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead do for keys, subgroup in df.groupby(...) and omit the .indices and .get_group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more efficient to do:

partition_keys = [df[col] for col in partition_cols]
data_df = df.drop(partition_cols, axis='columns')
for part_keys, data_group in data_df.groupby(partition_keys):
    ...

subgroup = (subgroup,)
subdir = "/".join(
["{colname}={value}".format(colname=name, value=val)
for name, val in zip(partition_cols, subgroup)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the coercion to string fail, it will fail here, so the cast above likely not needed

prefix = "/".join([root_path, path])
os.makedirs(prefix, exist_ok=True)
full_path = "/".join([prefix, where])
write_table(data, full_path, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe write the chunks eagerly rather than waiting until the end? Otherwise you have a minimum of a memory tripling in this function:

  • first copy in table.to_pandas()
  • second copy when iterating through the groups

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thanks for the feedback, I'll start on these tonight.

@wesm
Copy link
Member

wesm commented Aug 28, 2017

@saffrydaffry can you also change the PR title to explain the content of the patch?

@saffrydaffry
Copy link
Author

@wesm, I've never changed the title on a pr before--is it possible to change while merging commits to these fixes?
This looks like a good lead, but I want to make sure this is how it's typically done: https://stackoverflow.com/questions/35770346/is-it-possible-to-change-github-pull-request-title-when-merging-the-pr.

Thanks!

@wesm
Copy link
Member

wesm commented Aug 28, 2017

Right now it's "ARROW-1400 Hotfix". There's an Edit button at the top of the page. The commit messages aren't important, because those all get squashed. You can change it to something like ARROW-1400: [Python] Add function to append data to a partitioned Parquet dataset

@saffrydaffry
Copy link
Author

Great, thank you!

@saffrydaffry saffrydaffry changed the title ARROW-1400 Hotfix ARROW-1400: [Python] Adding parquet.write_to_dataset() method for writing partitioned .parquet files Aug 28, 2017
)

if not os.path.isdir(root_path):
os.mkdir(root_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to work with other kinds of filesystems (see the filesystem argument in ParquetDataset)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look, thanks!

os.makedirs(prefix, exist_ok=True)
outfile = compat.guid() + ".parquet"
full_path = "/".join([prefix, outfile])
write_table(subtable, full_path, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be

with filesystem.open(full_path, 'wb') as f:
    write_table(subtable, f, **kwargs)

else:
outfile = compat.guid() + ".parquet"
full_path = "/".join([root_path, outfile])
write_table(table, full_path, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

for name, val in zip(partition_cols, keys)])
subtable = Table.from_pandas(subgroup, preserve_index=preserve_index)
prefix = "/".join([root_path, subdir])
os.makedirs(prefix, exist_ok=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use the filesystem.mkdir function (e.g. in LocalFileSystem or HadoopFileSystem) then it will resolve this Python 2 incompatibility (the exist_ok argument is Py3-only)

@wesm
Copy link
Member

wesm commented Aug 31, 2017

If you give me write permission (add me as collaborator) on your fork I can try to help too if you don't have time or are not too familiar with this portion of the code

@saffrydaffry
Copy link
Author

@wesm Sorry it took a bit for me to understand, but I made the changes per your suggestions and it works (according to unit tests) for LocalFilesystem at least. Should I be worried that I keep failing the C++ check... I'm not sure if it's something I'm doing or if it's been failing all along? Happy to add you as a collaborator.

@wesm
Copy link
Member

wesm commented Sep 1, 2017

No problem, I will take a look

Change-Id: I60dc74a3a404bd5f75a3f93011758e3d87572e6b
@wesm
Copy link
Member

wesm commented Sep 4, 2017

Alright, I think I have this in ship shape. I will wait for the build to run

@wesm
Copy link
Member

wesm commented Sep 4, 2017

+1. Thanks @saffrydaffry!

@asfgit asfgit closed this in 9968d95 Sep 4, 2017
@saffrydaffry
Copy link
Author

@wesm My pleasure! Thank you for your suggestions and polishing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants