Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use std::vec::IntoIter;
use thrift::protocol::{TCompactOutputProtocol, TSerializable};

use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::{types::*, ArrayRef};
use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter};
use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef};
use arrow_schema::{
ArrowError, DataType as ArrowDataType, Field, IntervalUnit, Schema, SchemaRef,
};

use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
Expand Down Expand Up @@ -150,7 +152,7 @@ impl<W: Write + Send> ArrowWriter<W> {
Some(in_progress) => in_progress
.writers
.iter()
.map(|(_, x)| x.get_estimated_total_bytes() as usize)
.map(|x| x.get_estimated_total_bytes() as usize)
.sum(),
None => 0,
}
Expand Down Expand Up @@ -299,7 +301,7 @@ impl Read for ArrowColumnChunkReader {

/// A shared [`ArrowColumnChunk`]
///
/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
/// This allows it to be owned by lower level page writers whilst allowing access via
/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
type SharedColumnChunk = Arc<Mutex<ArrowColumnChunk>>;

Expand Down Expand Up @@ -347,13 +349,22 @@ impl PageWriter for ArrowPageWriter {
}
}

/// Encodes a leaf column to [`ArrowPageWriter`]
enum ArrowColumnWriter {
/// Serializes [ArrayRef]s to [ArrowColumnChunk]s which can be concatenated
/// to form a parquet row group
pub enum ArrowColumnWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the enum variants private as well, i.e. something like

pub struct ArrowColumnWriter(ArrowColumnWriterImpl);

enum ArrowColumnWriterImpl {
    ...
}

ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
Column(ColumnWriter<'static>),
}

impl ArrowColumnWriter {
/// Serializes an [ArrayRef] to a [ArrowColumnChunk] for an in progress row group.
pub fn write(&mut self, array: ArrayRef, field: Arc<Field>) -> Result<()> {
let mut levels = calculate_array_levels(&array, &field)?.into_iter();
let mut writer_iter = std::iter::once(self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually wrong if array is nested, and therefore comprises multiple leaf columns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, makes sense. Perhaps we could have something like:

pub struct ArrowColumnWriter(Vec<ArrowColumnWriterImpl>);

enum ArrowColumnWriterImpl{
...
}

which for a non nested column would contain only one ArrowColumnWriterImpl, but could hold multiple in the case of nested columns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a play with a slightly different API for this, will report back 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe #4871 the alternate API proposal?

write_leaves(&mut writer_iter, &mut levels, array.as_ref())?;
Ok(())
}

/// Returns the estimated total bytes for this column writer
fn get_estimated_total_bytes(&self) -> u64 {
match self {
Expand All @@ -365,7 +376,8 @@ impl ArrowColumnWriter {

/// Encodes [`RecordBatch`] to a parquet row group
pub struct ArrowRowGroupWriter {
writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
writers: Vec<ArrowColumnWriter>,
shared_buffers: Vec<SharedColumnChunk>,
schema: SchemaRef,
buffered_rows: usize,
}
Expand All @@ -376,31 +388,56 @@ impl ArrowRowGroupWriter {
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
) -> Result<Self> {
let mut writers = Vec::with_capacity(arrow.fields.len());
let mut writers_and_buffers = Vec::with_capacity(arrow.fields.len());
let mut leaves = parquet.columns().iter();
for field in &arrow.fields {
get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?;
get_arrow_column_writer(
field.data_type(),
props,
&mut leaves,
&mut writers_and_buffers,
)?;
}
let (shared_buffers, writers): (Vec<_>, Vec<_>) =
writers_and_buffers.into_iter().unzip();
Ok(Self {
writers,
shared_buffers,
schema: arrow.clone(),
buffered_rows: 0,
})
}

pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut().map(|(_, x)| x);
let mut writers = self.writers.iter_mut();
for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
let mut levels = calculate_array_levels(array, field)?.into_iter();
write_leaves(&mut writers, &mut levels, array.as_ref())?;
}
Ok(())
}

pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}

/// Takes ownership of all [ArrowColumnWriter]s from this [ArrowRowGroupWriter]
/// Caller must restore ownership with give_col_writers before calling close method.
pub fn take_col_writers(&mut self) -> Vec<ArrowColumnWriter> {
self.writers.drain(..).collect()
}

/// Restores ownership of all [ArrowColumnWriter]s. Caller is responsible for
/// returning the [Vec] in the same order returned by take_col_writers method.
pub fn give_col_writers(&mut self, writers: Vec<ArrowColumnWriter>) {
self.writers = writers;
}
Comment on lines +425 to +435
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a massive fan of this API tbh, I'll have a play and see what I can come up with

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let me know if you come up with something more elegant!

My original attempt was just to provide a mutable reference to the writers, but it is more challenging to handle mutable references safely vs. passing ownership in parallel async tasks (I considered giving https://docs.rs/async-scoped/latest/async_scoped/ a try but decided against it).

I moved away from entirely deconstructing the ArrowRowGroupWriterin order to keep the SharedColumnChunk private.


pub fn close(self) -> Result<Vec<(ArrowColumnChunk, ColumnCloseResult)>> {
self.writers
self.shared_buffers
.into_iter()
.zip(self.writers)
.map(|(chunk, writer)| {
let close_result = match writer {
ArrowColumnWriter::ByteArray(c) => c.close()?,
Expand Down