Skip to content

Commit

Permalink
[CHORE]: make parquet metadata (de)serializable (#2346)
Browse files Browse the repository at this point in the history
⚠️ Depends on #2341

you can view the actual diff
[here](universalmind303/Daft@arrow2-migrate...universalmind303:Daft:arrow-serde)
  • Loading branch information
universalmind303 committed Jun 7, 2024
1 parent 97c8023 commit 975125a
Show file tree
Hide file tree
Showing 21 changed files with 117 additions and 111 deletions.
6 changes: 3 additions & 3 deletions src/arrow2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ rand = {version = "0.8", optional = true}
regex = {version = "1.9", optional = true}
regex-syntax = {version = "0.7", optional = true}
# Arrow integration tests support
serde = {version = "^1.0", features = ["rc"], optional = true}
serde = {version = "^1.0", features = ["rc", "derive"]}
serde_derive = {version = "^1.0", optional = true}
serde_json = {version = "^1.0", features = [
"preserve_order"
Expand Down Expand Up @@ -186,7 +186,7 @@ io_ipc_write_async = ["io_ipc", "futures"]
io_json = ["io_json_read", "io_json_write"]
# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"]
io_json_integration = ["hex", "serde_derive", "serde_json", "io_ipc"]
io_json_read = ["json-deserializer", "indexmap", "lexical-core"]
io_json_write = [
"streaming-iterator",
Expand Down Expand Up @@ -222,7 +222,7 @@ io_parquet_snappy = ["parquet2/snappy"]
# compression backends
io_parquet_zstd = ["parquet2/zstd"]
io_print = ["comfy-table"]
serde_types = ["serde", "serde_derive"]
serde_types = ["serde_derive"]
simd = []

[lib]
Expand Down
6 changes: 2 additions & 4 deletions src/arrow2/src/datatypes/field.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::{DataType, Metadata};

#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};

/// Represents Arrow's metadata of a "column".
///
Expand All @@ -11,8 +10,7 @@ use serde_derive::{Deserialize, Serialize};
///
/// Almost all IO in this crate uses [`Field`] to represent logical information about the data
/// to be serialized.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct Field {
/// Its name
pub name: String,
Expand Down
15 changes: 5 additions & 10 deletions src/arrow2/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ pub use schema::Schema;
use std::collections::BTreeMap;
use std::sync::Arc;

#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};

/// typedef for [BTreeMap<String, String>] denoting [`Field`]'s and [`Schema`]'s metadata.
pub type Metadata = BTreeMap<String, String>;
Expand All @@ -28,8 +27,7 @@ pub(crate) type Extension = Option<(String, Option<String>)>;
/// which declares the in-memory representation of data.
/// The [`DataType::Extension`] is special in that it augments a [`DataType`] with metadata to support custom types.
/// Use `to_logical_type` to desugar such type and return its abstraction logical type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DataType {
/// Null type
Null,
Expand Down Expand Up @@ -161,8 +159,7 @@ pub enum DataType {
}

/// Mode of [`DataType::Union`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum UnionMode {
/// Dense union
Dense,
Expand Down Expand Up @@ -193,8 +190,7 @@ impl UnionMode {
}

/// The time units defined in Arrow.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TimeUnit {
/// Time in seconds.
Second,
Expand All @@ -207,8 +203,7 @@ pub enum TimeUnit {
}

/// Interval units defined in Arrow
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum IntervalUnit {
/// The number of elapsed whole months.
YearMonth,
Expand Down
9 changes: 3 additions & 6 deletions src/arrow2/src/datatypes/physical_type.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
pub use crate::types::PrimitiveType;

#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};

/// The set of physical types: unique in-memory representations of an Arrow array.
/// A physical type has a one-to-many relationship with a [`crate::datatypes::DataType`] and
/// a one-to-one mapping to each struct in this crate that implements [`crate::array::Array`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum PhysicalType {
/// A Null with no allocation.
Null,
Expand Down Expand Up @@ -54,8 +52,7 @@ impl PhysicalType {

/// the set of valid indices types of a dictionary-encoded Array.
/// Each type corresponds to a variant of [`crate::array::DictionaryArray`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum IntegerType {
/// A signed 8-bit integer.
Int8,
Expand Down
6 changes: 2 additions & 4 deletions src/arrow2/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use super::{Field, Metadata};

#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};

/// An ordered sequence of [`Field`]s with associated [`Metadata`].
///
/// [`Schema`] is an abstraction used to read from, and write to, Arrow IPC format,
/// Apache Parquet, and Apache Avro. All these formats have a concept of a schema
/// with fields and metadata.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct Schema {
/// The fields composing this schema.
pub fields: Vec<Field>,
Expand Down
2 changes: 1 addition & 1 deletion src/arrow2/src/io/json_integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! These utilities define structs that read the integration JSON format for integration testing purposes.

use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::error::Error;
Expand Down
6 changes: 2 additions & 4 deletions src/arrow2/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ pub use native::*;
mod offset;
pub use offset::*;

#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};

/// The set of all implementations of the sealed trait [`NativeType`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum PrimitiveType {
/// A signed 8-bit integer.
Int8,
Expand Down
3 changes: 1 addition & 2 deletions src/parquet2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lz4 = {version = "1.24", optional = true}
lz4_flex = {version = "^0.9", optional = true}
parquet-format-safe = "0.2"
seq-macro = {version = "0.3", default-features = false}
serde = {version = "^1.0", optional = true, features = ["derive"]}
serde = {version = "^1.0", features = ["derive"]}
snap = {version = "^1.1", optional = true}
streaming-decompression = "0.1"
xxhash-rust = {version = "0.8", optional = true, features = ["xxh64"]}
Expand All @@ -25,7 +25,6 @@ default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"]
full = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter", "async"]
gzip = ["flate2/rust_backend"]
gzip_zlib_ng = ["flate2/zlib-ng"]
serde_types = ["serde"]
snappy = ["snap"]

[lib]
Expand Down
7 changes: 2 additions & 5 deletions src/parquet2/src/indexes/intervals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ use parquet_format_safe::PageLocation;

use crate::error::Error;

#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};

/// An interval
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Interval {
/// Its start
pub start: usize,
Expand Down Expand Up @@ -84,8 +82,7 @@ pub fn compute_rows(
}

/// An enum describing a page that was either selected in a filter pushdown or skipped
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FilteredPage {
/// Location of the page in the file
pub start: u64,
Expand Down
33 changes: 8 additions & 25 deletions src/parquet2/src/metadata/column_chunk_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,23 @@ use crate::compression::Compression;
use crate::error::{Error, Result};
use crate::schema::types::PhysicalType;
use crate::statistics::{deserialize_statistics, Statistics};

#[cfg(feature = "serde_types")]
mod serde_types {
pub use parquet_format_safe::thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol,
};
pub use serde::de::Error as DeserializeError;
pub use serde::ser::Error as SerializeError;
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub use std::io::Cursor;
}
#[cfg(feature = "serde_types")]
use serde_types::*;
pub use parquet_format_safe::thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
pub use serde::de::Error as DeserializeError;
pub use serde::ser::Error as SerializeError;
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub use std::io::Cursor;

/// Metadata for a column chunk.
// This contains the `ColumnDescriptor` associated with the chunk so that deserializers have
// access to the descriptor (e.g. physical, converted, logical).
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ColumnChunkMetaData {
#[cfg_attr(
feature = "serde_types",
serde(serialize_with = "serialize_column_chunk")
)]
#[cfg_attr(
feature = "serde_types",
serde(deserialize_with = "deserialize_column_chunk")
)]
#[serde(serialize_with = "serialize_column_chunk")]
#[serde(deserialize_with = "deserialize_column_chunk")]
column_chunk: ColumnChunk,
column_descr: ColumnDescriptor,
}

#[cfg(feature = "serde_types")]
fn serialize_column_chunk<S>(
column_chunk: &ColumnChunk,
serializer: S,
Expand All @@ -56,7 +40,6 @@ where
serializer.serialize_bytes(&buf)
}

#[cfg(feature = "serde_types")]
fn deserialize_column_chunk<'de, D>(deserializer: D) -> std::result::Result<ColumnChunk, D::Error>
where
D: Deserializer<'de>,
Expand Down
8 changes: 3 additions & 5 deletions src/parquet2/src/metadata/column_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::schema::types::{ParquetType, PrimitiveType};
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};

/// A descriptor of a parquet column. It contains the necessary information to deserialize
/// a parquet column.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)]

pub struct Descriptor {
/// The [`PrimitiveType`] of this column
pub primitive_type: PrimitiveType,
Expand All @@ -20,8 +19,7 @@ pub struct Descriptor {
/// A descriptor for leaf-level primitive columns.
/// This encapsulates information such as definition and repetition levels and is used to
/// re-assemble nested data.
#[derive(Debug, PartialEq, Clone)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
pub struct ColumnDescriptor {
/// The descriptor this columns' leaf.
pub descriptor: Descriptor,
Expand Down
4 changes: 1 addition & 3 deletions src/parquet2/src/metadata/column_order.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use super::sort::SortOrder;
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};

/// Column order that specifies what method was used to aggregate min/max values for
/// statistics.
///
/// If column order is undefined, then it is the legacy behaviour and all values should
/// be compared as signed values/bytes.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
pub enum ColumnOrder {
/// Column uses the order defined by its logical or physical type
/// (if there is no logical type), parquet-format 2.4.0+.
Expand Down
66 changes: 65 additions & 1 deletion src/parquet2/src/metadata/file_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,76 @@ use crate::{error::Error, metadata::get_sort_order};

use super::{column_order::ColumnOrder, schema_descriptor::SchemaDescriptor, RowGroupMetaData};
use parquet_format_safe::ColumnOrder as TColumnOrder;
use serde::{Deserialize, Serialize};

pub use crate::thrift_format::KeyValue;
mod key_value_metadata_serde {
use super::*;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SerializableKeyValue {
pub key: String,
pub value: Option<String>,
}
impl From<KeyValue> for SerializableKeyValue {
fn from(kv: KeyValue) -> Self {
Self {
key: kv.key,
value: kv.value,
}
}
}

impl From<SerializableKeyValue> for KeyValue {
fn from(kv: SerializableKeyValue) -> Self {
Self {
key: kv.key,
value: kv.value,
}
}
}

pub fn serialize<S>(
key_value_metadata: &Option<Vec<KeyValue>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match key_value_metadata {
Some(key_value_metadata) => {
let serializable_key_value_metadata: Vec<SerializableKeyValue> = key_value_metadata
.clone()
.into_iter()
.map(SerializableKeyValue::from)
.collect();
serializer.serialize_some(&serializable_key_value_metadata)
}
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Vec<KeyValue>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let serializable_key_value_metadata: Option<Vec<SerializableKeyValue>> =
Option::deserialize(deserializer)?;
match serializable_key_value_metadata {
Some(serializable_key_value_metadata) => {
let key_value_metadata: Vec<KeyValue> = serializable_key_value_metadata
.into_iter()
.map(KeyValue::from)
.collect();
Ok(Some(key_value_metadata))
}
None => Ok(None),
}
}
}
/// Metadata for a Parquet file.
// This is almost equal to [`parquet_format_safe::FileMetaData`] but contains the descriptors,
// which are crucial to deserialize pages.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FileMetaData {
/// version of this file.
pub version: i32,
Expand All @@ -26,6 +89,7 @@ pub struct FileMetaData {
/// The row groups of this file
pub row_groups: Vec<RowGroupMetaData>,
/// key_value_metadata of this file.
#[serde(with = "key_value_metadata_serde")]
pub key_value_metadata: Option<Vec<KeyValue>>,
/// schema descriptor.
pub schema_descr: SchemaDescriptor,
Expand Down
Loading

0 comments on commit 975125a

Please sign in to comment.