Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change parquet writers to use standard std:io::Write rather custom ParquetWriter trait (#1717) (#1163) #1719

Merged
merged 9 commits into from
May 25, 2022

Conversation

tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #1717

Part of #1163

Rationale for this change

See tickets, but in short the current write path makes use of a lot of custom IO abstractions which can be hard to use correctly. In particular the use of TryClone can easily lead to races if used to share a file descriptor across threads.

What changes are included in this PR?

This reworks the write path to use std::io::Write and nothing else. Unfortunately to achieve this requires a few changes:

  • A TrackedWrite that keeps track of how many bytes have been written, allowing removal of Seek
  • A callback based approach to get metadata from a child writer to its parent
  • Lifetimes, lots of lifetimes...

This last point becomes a bit obnoxious when it interacts with the RowGroupWriter trait. In order to be object-safe, i.e. possible to construct Box<dyn RowGroupWriter>, RowGroupWriter::close cannot take self by value. This results in explicit scopes, or calls to std::mem::drop in order to truncate its lifetime.

I'm not entirely sure what the purpose of these traits is, but perhaps we could simplify things, and fix this slight lifetime annoyance by removing them? Maybe something for a follow up PR?

Are there any user-facing changes?

Yes, this makes non-trivial changes to the parquet write API.

@github-actions github-actions bot added the parquet Changes to the parquet crate label May 21, 2022
//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
//! // ... write values to a column writer
//! row_group_writer.close_column(col_writer).unwrap();
//! {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the somewhat unfortunate manual scoping... I personally think removing the trait indirection would make the code a lot easier to understand...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which trait indirection are you referring to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it seems to me like we could change this example to actually run (not sure why it is marked norun -- it just needs to pick a tempfile as target rather than /path/to/sample.parquet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which trait indirection are you referring to?

We have FileWriter and RowGroupWriter traits which are then implemented by SerializedFileWriter and SerializedRowGroupWriter.

I'm not sure why you would ever want to override the default implementation, and the need to expose these as object-safe traits prevents FileWriter::close(self) or RowGroupWriter::close(self) which causes lifetime annoyances.

I also find the indirection very confusing in general 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, I also find this very confusing -- I think that split was part of the original implementation from @sunchao

Would be possible to create a ParallelizedFileWriter that used multiple threads and buffers internally to write files out with greater concurrency?

Copy link
Contributor Author

@tustvold tustvold May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have been highly non-trivial to do this correctly before, and with the lifetime change in this PR it would actually be impossible, as FileWriter::next_row_group returns a mutable borrow of FileWriter

That being said, nothing would prevent creating a ParallelizedFileWriter, you just wouldn't be able to be generic over parallel and non-parallel implementations. I'm not sure this is actually a problem, especially as any ParallelizedFileWriter is likely to have additional constraints, e.g. Send, async, etc...

I personally think removing these traits as part of the PR would go a long way to making the change easy for users to understand, the lifetime errors are not very obvious unless you know what you are looking for. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not opposed to removing the FileWriter and RowGroupWriter. I think it would help readability a lot. However I think other reviewers should weigh in.

If we are going to make such an API change, perhaps we should do it in the same release as this change to get it all over with at once.

}

let metadata = self.row_group_metadata.as_ref().unwrap().clone();
Ok(metadata)
}
}

/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a massive fan of this, but I was somewhat wary of making changes to the ColumnWriter plumbing as it is used in lots of places

@@ -65,7 +67,7 @@ pub struct ArrowWriter<W: ParquetWriter> {
max_row_group_size: usize,
}

impl<W: 'static + ParquetWriter> ArrowWriter<W> {
impl<W: Write> ArrowWriter<W> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemingly small change means you can pass in &mut std::io::Cursor<Vec<_>> or any other construction which makes this much easier to use

@@ -744,15 +748,15 @@ mod tests {
let expected_batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();

let cursor = InMemoryWriteableCursor::default();
let mut cursor = Cursor::new(vec![]);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

/// Can be called multiple times. In subsequent calls will result in no-op and return
/// already created row group metadata.
fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
}

/// Callback invoked on closing a column chunk, arguments are:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to move to a callback model as the child writers now mutably borrow from their parents, and so it would be impossible to keep the current interface. I also happen to think this is cleaner, you simply close the ColumnWriter when you are finished, vs passing the ColumnWriter back to the RowGroupWriter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also importantly, by keeping a mutable reference it lets the compiler prevent concurrent writes to the same writer, as reported in #1717

@codecov-commenter
Copy link

codecov-commenter commented May 21, 2022

Codecov Report

Merging #1719 (63491e5) into master (5cf06bf) will decrease coverage by 0.04%.
The diff coverage is 91.13%.

❗ Current head 63491e5 differs from pull request most recent head d699960. Consider uploading reports for the commit d699960 to get more accurate results

@@            Coverage Diff             @@
##           master    #1719      +/-   ##
==========================================
- Coverage   83.32%   83.27%   -0.05%     
==========================================
  Files         196      195       -1     
  Lines       55961    55896      -65     
==========================================
- Hits        46627    46549      -78     
- Misses       9334     9347      +13     
Impacted Files Coverage Δ
parquet/benches/arrow_writer.rs 0.00% <0.00%> (ø)
parquet/src/arrow/array_reader/list_array.rs 93.41% <ø> (ø)
parquet/src/arrow/schema.rs 96.81% <ø> (ø)
parquet/src/util/cursor.rs 63.86% <ø> (-13.45%) ⬇️
parquet/src/util/io.rs 88.33% <ø> (-1.67%) ⬇️
parquet_derive/src/lib.rs 0.00% <ø> (ø)
parquet_derive/src/parquet_field.rs 65.98% <0.00%> (ø)
parquet/src/data_type.rs 75.84% <50.00%> (ø)
parquet/src/file/writer.rs 92.85% <92.17%> (-0.45%) ⬇️
parquet/src/arrow/arrow_reader.rs 96.88% <100.00%> (-0.01%) ⬇️
... and 9 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5cf06bf...d699960. Read the comment docs.

@alamb alamb requested a review from sunchao May 23, 2022 10:23
@alamb alamb changed the title Rustify parquet writer (#1717) (#1163) Change parquet ArrowFileWriter to use standard std:io::Write rather custom ParquetWriter trait (#1717) (#1163) May 23, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold !

All in all I think this is a great improvement -- It is great to see that the API now reflects the "write only a single row group at a time" restriction in a way that the compiler can enforce. This also seems like a nice step towards tracking information (like bytes written) in a way that I can imagine actually parallelizing the encoding.

I do think it might be worth a little more investment to make the change "less breaking" (by marking some structs as deprecated for a while)

cc @pacman82 @manojkarthick @nevi-me

@@ -278,8 +276,8 @@ fn _create_nested_bench_batch(
#[inline]
fn write_batch(batch: &RecordBatch) -> Result<()> {
// Write batch to an in-memory writer
let cursor = InMemoryWriteableCursor::default();
let mut writer = ArrowWriter::try_new(cursor, batch.schema(), None)?;
let buffer = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is a nice example of the new API in action: use something that does std::io::Write

@@ -133,68 +131,6 @@ impl Seek for SliceableCursor {
}
}

/// Use this type to write Parquet to memory rather than a file.
#[derive(Debug, Default, Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of easing the transition of this change, what would you think about marking this struct "deprecated" and keeping the code backwards compatible for a few releases (e.g. impl Wite for InMemoryWriteableCursor)?

That might make the API change easier to manage and give users some time to remove it.

@@ -153,47 +153,6 @@ impl<R: ParquetReader> Length for FileSource<R> {
self.end - self.start
}
}

/// Struct that represents `File` output stream with position tracking.
/// Used as a sink in file writer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, I recommend considering leaving this structure in (and marking it as "deprecated") for a release or two to give people a chance to update their code rather over time.

drs.as_slice()
.write_to_row_group(row_group.as_mut())
.unwrap();
row_group.close().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like this pattern better (close the row group writer rather than passing it back to the parquet file writer to close)

//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
//! // ... write values to a column writer
//! row_group_writer.close_column(col_writer).unwrap();
//! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which trait indirection are you referring to?

/// Can be called multiple times. In subsequent calls will result in no-op and return
/// already created row group metadata.
fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
}

/// Callback invoked on closing a column chunk, arguments are:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also importantly, by keeping a mutable reference it lets the compiler prevent concurrent writes to the same writer, as reported in #1717

@@ -196,13 +203,13 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
};

// Write file metadata
let start_pos = self.buf.seek(SeekFrom::Current(0))?;
let start_pos = self.buf.bytes_written();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@@ -214,7 +221,7 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
}

#[inline]
fn assert_closed(&self) -> Result<()> {
fn assert_not_closed(&self) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub fn new(
schema_descr: SchemaDescPtr,
properties: WriterPropertiesPtr,
buf: &W,
buf: &'a mut TrackedWrite<W>,
on_close: Option<OnCloseRowGroup<'a>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this Option -- shouldn't it always be required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these writers are technically public and I don't really know what people might be using them for, I thought this was a way to make the change less annoying - you can just pass None and move on...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

}
}

/// Checks and finalises current column writer.
fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code appears to have been inlined into next_column below

@ahmedriza
Copy link

ahmedriza commented May 23, 2022

This is a great step forwards, and makes it cleaner as well as let the compiler enforce invariants to avoid things like #1717 happening.

Agree that keeping the current API deprecated will be good to allow users to transition. I've tested our code and the change is minimal from the API perspective.

@tustvold tustvold added the api-change Changes to the arrow API label May 23, 2022
@tustvold tustvold changed the title Change parquet ArrowFileWriter to use standard std:io::Write rather custom ParquetWriter trait (#1717) (#1163) Change parquet writers to use standard std:io::Write rather custom ParquetWriter trait (#1717) (#1163) May 24, 2022
@tustvold
Copy link
Contributor Author

I've removed the FileWriter and RowGroupWriter traits as discussed, and deprecated FileSink and InMemoryWriteableCursor.

Assuming CI doesn't find something I've missed I think this is now ready, unless anyone objects I intend to leave it open for another 24 hours and then get it merged.

@alamb
Copy link
Contributor

alamb commented May 24, 2022

It will be a nice feature in version 15 🎉

Copy link
Contributor

@nevi-me nevi-me left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through the changes, and agree that the interface becomes simpler with std::io::Write.

@tustvold tustvold merged commit 722fcfc into apache:master May 25, 2022
@alamb
Copy link
Contributor

alamb commented May 25, 2022

👨‍🍳 👌 -- this will be a very nice improvement in arrow 15 ❤️ -- thank you @tustvold

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-change Changes to the arrow API parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Trying to write parquet file in parallel results in corrupt file
5 participants