Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting using parallel parquet writer outside of Datafusion query execution #9493

Open
wiedld opened this issue Mar 7, 2024 · 11 comments
Labels
enhancement New feature or request

Comments

@wiedld
Copy link
Contributor

wiedld commented Mar 7, 2024

Is your feature request related to a problem or challenge?

We would like faster parquet write performance, outside of the Datafusion execution context.

We are currently utilizing the (non-parallelized) ArrowWriter for parquet writing both within, and outside of, Datafusion query execution. Writing data in the parquet format is expensive computationally due to the encoding and compression involved, and can easily become a bottleneck when writing large parquet files.

Screen Shot 2024-03-07 at 10 48 16 AM

Datafusion recently introduced a parallelized parquet writer as part of the COPYTO execution. This writer parallelizes the column writes with minimal memory overhead; streamed record batches are immediately encoded to compressed arrow column leafs, and the final serialized parquet is flushed to the sink in chunks without needing to retain the whole parquet in memory.

Screen Shot 2024-03-07 at 10 48 44 AM

We conducted a POC in order to use the existing ParquetSink outside of a Datafusion query, and assessed impact. Our specific use case spends 49-59% of their CPU cycles in parquet writing (a.k.a. we have a write-heavy benchmark). When we switched from baseline (using single threaded ArrowWriter) to the parallelized parquet writing, we had a performance improvements of 22-43% faster. This provides ample motivation to request that a more principled solution be provided in order to have parallelized parquet writing more readily accessible.

Describe the solution you'd like

The ability to use parallelized parquet writing outside of the Datafusion query execution. Specifically, we would like to propose some public API which is not tied to the COPY TO execution operator.

Describe alternatives you've considered

Our specific POC required the exposure of the FileMetaData for created parquet files, and had to compensate for a metadata mutation performed within ArrowWriter (but not ParquetSink). However, the decided solution should not be conflated with the POC we performed in order to assess potential impact for our use case. Given our anticipation that many other users may also benefit from parallelized parquet writes, the solution should consider a broader range of needs.

Additional context

No response

@wiedld wiedld added the enhancement New feature or request label Mar 7, 2024
@wiedld wiedld changed the title Provide parallelized parquet writing outside of Datafusion query execution. Provide faster parquet writing outside of Datafusion query execution. Mar 7, 2024
@alamb
Copy link
Contributor

alamb commented Mar 7, 2024

Thank you @wiedld -- would it be possible to create a PR with the content from main...wiedld:arrow-datafusion:test/parquet-data-sink to help the API discussion?

At a high level I would like to suggest a new API (factored out from the current ParquetSink).

I believe @tustvold was potentially thinking about proposing some/all of this API upstream in the parquet crate (maybe by adding some mode / option to AsyncWriter. but he may have other ideas)

Here is the kind of API I was imagining in DataFusion

// get a RecordBatchStream somehow
let stream: SendableRecordBatchStream = plan.execute(0);

// location in object store to write to
let  object_store_path = ...;

// properties to pass to the underlying prquet writer
let  parquet_writer_properties = ...;


// Create a new parallel writer for writing a single file
// Note this API doesn't handle writing partitioned/multiple files, that would
// still be done by the ParquetSink
let writer = ParallelParquetWriter::builder()
  .with_target(object_store_path)
  .with_writer_properties(parquet_writer_properties)
  // how many row groups should the parquet writer attempt to write in parallel
  // note the writer may buffer the (uncompressed) RecordBatches for up to `N - 1`
  // row groups. This setting defaults to `1`.
  // TODO? Should we have this?
  .with_concurrent_row_groups(2)
  .build()?

// Invoke the writer: encodes the columns in parallel and uploads each row group
// using a multi-part put.
writer.write_all(stream)
  .await?

Also Some of the changes are probably pretty straight forward to pull into their own PRs, such as the following. @wiedld perhaps you could make a PR to add those (non potentially controversial APIs)?

    /// Returns as Url
    pub fn as_url(&self) -> &Url {
        &self.url
    }

@alamb
Copy link
Contributor

alamb commented Mar 7, 2024

cc @devinjdangelo and @metesynnada as I think you have worked on this feature previously

@devinjdangelo
Copy link
Contributor

I have also wanted to leverage this code outside of DataFusion, so I'm happy to support!

Also, @wiedld I love the diagrams! What did you use to make them?

@wiedld
Copy link
Contributor Author

wiedld commented Mar 8, 2024

Also, @wiedld I love the diagrams! What did you use to make them?

Google drawing embedded in gdocs used for project planning. @alamb did the initial version, which I later updated for the parallelized writes. Maybe I should convert some of these to ascii diagrams?

@alamb
Copy link
Contributor

alamb commented Mar 8, 2024

Maybe I should convert some of these to ascii diagrams?

This would be a natural thing to do as part of creating such an API. I think the value of ASCII diagrams is low unless they end up in the source code

@devinjdangelo
Copy link
Contributor

I made a very simplistic ascii diagram in the original PR #7655 (comment)

The diagram here is much clearer though.

@wiedld
Copy link
Contributor Author

wiedld commented Mar 11, 2024

Thank you @wiedld -- would it be possible to create a PR with the content from main...wiedld:arrow-datafusion:test/parquet-data-sink to help the API discussion?

PR created, and intentionally highlights the gaps (for our specific use case) when trying to use ParquetSink to write parquet, in place of the ArrowWriter.

Also Some of the changes are probably pretty straight forward to pull into their own PRs, such as the following.

The ObjectStoreUrl::as_url is not required, since there is another accessor available (which I missed at the time).

@alamb alamb changed the title Provide faster parquet writing outside of Datafusion query execution. Supporting using parallel parquet writer outside of Datafusion query execution Mar 11, 2024
@alamb
Copy link
Contributor

alamb commented Mar 14, 2024

As we were discussing this API internally with @tustvold one thing he pointed out is that the current code pretty much requires using the same tokio threadpool for compute (parquet encoding) and I/O (the object store multi-part write). This can cause various problems, depending on what your system is doing.

Some discussion on CPU bound work in tokio: https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/

Thus, one thing that would be nice to think about in this API is how we can support doing the IO (e.g. put_mulitpart on a different threadpool (aka tokio Runtime)

I believe @tustvold has also been thinking about this in the context of apache/arrow-rs#5458 and may even be planning on porting some/all of the parallelized parquet writer upstream to parquet (I don't fully know the plan yet)

Therefore, as we go through this exercise, we may want to help / join forces upstream / take those plans into account as we figure out the right API to extract

@devinjdangelo
Copy link
Contributor

As we were discussing this API internally with @tustvold one thing he pointed out is that the current code pretty much requires using the same tokio threadpool for compute (parquet encoding) and I/O (the object store multi-part write). This can cause various problems, depending on what your system is doing.

I think the parallel writer facilitates a fairly straightforward way to move the most CPU heavy work (serialization) to a separate thread. Here is a draft PR #9605.

I think the main objection to using spawn_blocking here is that tokio doesn't actually manage a blocking thread-pool, but just keeps spawning new threads. Rayon or a second tokio runtime would likely perform better in a case where we needed 100s of parallel tasks (e.g. 4 open row groups on a parquet file with 64 columns on a system with only 8 CPU cores).

@devinjdangelo
Copy link
Contributor

Thank you @tustvold and @alamb for the additional context/comments on #9605. I understand now more what you are trying to facilitate on the influxdb side. I agree it will take some reworking to have parallel serialization happening on one runtime/pool while IO is happening on another runtime. I have some nascent ideas, but I would have to think through it more. I am interested to hear what you all come up with as well.

One easy but perhaps unsatisfying idea would be to run the parallel serialization entirely on the IO runtime, but just minor refactor to use spawn blocking as I did in #9605. I do think we can come up with better than this 😄

@tustvold
Copy link
Contributor

I am interested to hear what you all come up with as well.

My long-term thinking is that apache/arrow-rs#5500 will make it relatively straightforward to provide a Box<dyn Upload> that spawns the put parts to a separate IO threadpool. I am not sure what the short-term solution might look like

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants