Skip to content

Commit

Permalink
feat: add append_key_value_metadata (#3367)
Browse files Browse the repository at this point in the history
* Add update_key_value_metadata

* Add comments

* Address review

* fix clippy

* Update parquet/src/arrow/arrow_writer/mod.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* Fix reviews

* Test and fix

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
3 people committed Dec 20, 2022
1 parent 0f196b8 commit f521e11
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 2 deletions.
9 changes: 8 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use super::schema::{
use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::metadata::{KeyValue, RowGroupMetaDataPtr};
use crate::file::properties::WriterProperties;
use crate::file::writer::SerializedRowGroupWriter;
use crate::{data_type::*, file::writer::SerializedFileWriter};
Expand Down Expand Up @@ -158,6 +158,13 @@ impl<W: Write> ArrowWriter<W> {
self.flush_rows(self.buffered_rows)
}

/// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
///
/// This method provide a way to append kv_metadata after write RecordBatch
pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
self.writer.append_key_value_metadata(kv_metadata)
}

/// Flushes `num_rows` from the buffer into a new row group
fn flush_rows(&mut self, num_rows: usize) -> Result<()> {
if num_rows == 0 {
Expand Down
80 changes: 79 additions & 1 deletion parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ pub struct SerializedFileWriter<W: Write> {
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
row_group_index: usize,
// kv_metadatas will be appended to `props` when `write_metadata`
kv_metadatas: Vec<KeyValue>,
}

impl<W: Write> SerializedFileWriter<W> {
Expand All @@ -159,6 +161,7 @@ impl<W: Write> SerializedFileWriter<W> {
column_indexes: Vec::new(),
offset_indexes: Vec::new(),
row_group_index: 0,
kv_metadatas: Vec::new(),
})
}

Expand Down Expand Up @@ -309,12 +312,18 @@ impl<W: Write> SerializedFileWriter<W> {
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;

let key_value_metadata = match self.props.key_value_metadata() {
Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
None if self.kv_metadatas.is_empty() => None,
None => Some(self.kv_metadatas.clone()),
};

let file_metadata = parquet::FileMetaData {
num_rows,
row_groups,
key_value_metadata,
version: self.props.writer_version().as_num(),
schema: types::to_thrift(self.schema.as_ref())?,
key_value_metadata: self.props.key_value_metadata().cloned(),
created_by: Some(self.props.created_by().to_owned()),
column_orders: None,
encryption_algorithm: None,
Expand Down Expand Up @@ -347,6 +356,10 @@ impl<W: Write> SerializedFileWriter<W> {
}
}

pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
self.kv_metadatas.push(kv_metadata);
}

/// Writes the file footer and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.assert_previous_writer_closed()?;
Expand Down Expand Up @@ -1355,4 +1368,69 @@ mod tests {
})
});
}

fn test_kv_metadata(
initial_kv: Option<Vec<KeyValue>>,
final_kv: Option<Vec<KeyValue>>,
) {
let schema = Arc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![Arc::new(
types::Type::primitive_type_builder("col1", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap(),
)])
.build()
.unwrap(),
);
let mut out = Vec::with_capacity(1024);
let props = Arc::new(
WriterProperties::builder()
.set_key_value_metadata(initial_kv.clone())
.build(),
);
let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
let column = row_group_writer.next_column().unwrap().unwrap();
column.close().unwrap();
row_group_writer.close().unwrap();
if let Some(kvs) = &final_kv {
for kv in kvs {
writer.append_key_value_metadata(kv.clone())
}
}
writer.close().unwrap();

let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
let metadata = reader.metadata().file_metadata();
let keys = metadata.key_value_metadata();

match (initial_kv, final_kv) {
(Some(a), Some(b)) => {
let keys = keys.unwrap();
assert_eq!(keys.len(), a.len() + b.len());
assert_eq!(&keys[..a.len()], a.as_slice());
assert_eq!(&keys[a.len()..], b.as_slice());
}
(Some(v), None) => assert_eq!(keys.unwrap(), &v),
(None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
_ => assert!(keys.is_none()),
}
}

#[test]
fn test_append_metadata() {
let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());

test_kv_metadata(None, None);
test_kv_metadata(Some(vec![kv1.clone()]), None);
test_kv_metadata(None, Some(vec![kv2.clone()]));
test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
test_kv_metadata(Some(vec![]), Some(vec![kv2]));
test_kv_metadata(Some(vec![]), Some(vec![]));
test_kv_metadata(Some(vec![kv1]), Some(vec![]));
test_kv_metadata(None, Some(vec![]));
}
}

0 comments on commit f521e11

Please sign in to comment.