diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 2e170738f1a8..5fff1aaa794a 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -27,9 +27,11 @@ use std::vec::IntoIter; use thrift::protocol::{TCompactOutputProtocol, TSerializable}; use arrow_array::cast::AsArray; -use arrow_array::types::*; +use arrow_array::{types::*, ArrayRef}; use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter}; -use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef}; +use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field, IntervalUnit, Schema, SchemaRef, +}; use super::schema::{ add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, @@ -150,7 +152,7 @@ impl ArrowWriter { Some(in_progress) => in_progress .writers .iter() - .map(|(_, x)| x.get_estimated_total_bytes() as usize) + .map(|x| x.get_estimated_total_bytes() as usize) .sum(), None => 0, } @@ -299,7 +301,7 @@ impl Read for ArrowColumnChunkReader { /// A shared [`ArrowColumnChunk`] /// -/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via +/// This allows it to be owned by lower level page writers whilst allowing access via /// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows type SharedColumnChunk = Arc>; @@ -347,13 +349,22 @@ impl PageWriter for ArrowPageWriter { } } -/// Encodes a leaf column to [`ArrowPageWriter`] -enum ArrowColumnWriter { +/// Serializes [ArrayRef]s to [ArrowColumnChunk]s which can be concatenated +/// to form a parquet row group +pub enum ArrowColumnWriter { ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>), Column(ColumnWriter<'static>), } impl ArrowColumnWriter { + /// Serializes an [ArrayRef] to a [ArrowColumnChunk] for an in progress row group. + pub fn write(&mut self, array: ArrayRef, field: Arc) -> Result<()> { + let mut levels = calculate_array_levels(&array, &field)?.into_iter(); + let mut writer_iter = std::iter::once(self); + write_leaves(&mut writer_iter, &mut levels, array.as_ref())?; + Ok(()) + } + /// Returns the estimated total bytes for this column writer fn get_estimated_total_bytes(&self) -> u64 { match self { @@ -365,7 +376,8 @@ impl ArrowColumnWriter { /// Encodes [`RecordBatch`] to a parquet row group pub struct ArrowRowGroupWriter { - writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>, + writers: Vec, + shared_buffers: Vec, schema: SchemaRef, buffered_rows: usize, } @@ -376,13 +388,21 @@ impl ArrowRowGroupWriter { props: &WriterPropertiesPtr, arrow: &SchemaRef, ) -> Result { - let mut writers = Vec::with_capacity(arrow.fields.len()); + let mut writers_and_buffers = Vec::with_capacity(arrow.fields.len()); let mut leaves = parquet.columns().iter(); for field in &arrow.fields { - get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?; + get_arrow_column_writer( + field.data_type(), + props, + &mut leaves, + &mut writers_and_buffers, + )?; } + let (shared_buffers, writers): (Vec<_>, Vec<_>) = + writers_and_buffers.into_iter().unzip(); Ok(Self { writers, + shared_buffers, schema: arrow.clone(), buffered_rows: 0, }) @@ -390,7 +410,7 @@ impl ArrowRowGroupWriter { pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { self.buffered_rows += batch.num_rows(); - let mut writers = self.writers.iter_mut().map(|(_, x)| x); + let mut writers = self.writers.iter_mut(); for (array, field) in batch.columns().iter().zip(&self.schema.fields) { let mut levels = calculate_array_levels(array, field)?.into_iter(); write_leaves(&mut writers, &mut levels, array.as_ref())?; @@ -398,9 +418,26 @@ impl ArrowRowGroupWriter { Ok(()) } + pub fn schema(&self) -> &Arc { + &self.schema + } + + /// Takes ownership of all [ArrowColumnWriter]s from this [ArrowRowGroupWriter] + /// Caller must restore ownership with give_col_writers before calling close method. + pub fn take_col_writers(&mut self) -> Vec { + self.writers.drain(..).collect() + } + + /// Restores ownership of all [ArrowColumnWriter]s. Caller is responsible for + /// returning the [Vec] in the same order returned by take_col_writers method. + pub fn give_col_writers(&mut self, writers: Vec) { + self.writers = writers; + } + pub fn close(self) -> Result> { - self.writers + self.shared_buffers .into_iter() + .zip(self.writers) .map(|(chunk, writer)| { let close_result = match writer { ArrowColumnWriter::ByteArray(c) => c.close()?,