Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ avro = ["datafusion/avro", "datafusion-common/avro"]
[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true, default-features = false, features = ["parquet", "nested_expressions"] }
datafusion = { workspace = true, default-features = false }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-proto-common = { workspace = true }
Expand All @@ -57,7 +57,7 @@ serde = { version = "1.0", optional = true }
serde_json = { workspace = true, optional = true }

[dev-dependencies]
datafusion = { workspace = true, default-features = false, features = ["sql"] }
datafusion = { workspace = true, default-features = false, features = ["sql", "nested_expressions"] }
datafusion-functions = { workspace = true, default-features = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window-common = { workspace = true }
Expand Down
277 changes: 148 additions & 129 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
use std::sync::Arc;

use datafusion::{
config::{
CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
TableParquetOptions,
},
config::{CsvOptions, JsonOptions},
datasource::file_format::{
arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory,
parquet::ParquetFormatFactory, FileFormatFactory,
FileFormatFactory,
},
prelude::SessionContext,
};
Expand All @@ -34,12 +31,7 @@ use datafusion_common::{
};
use prost::Message;

use crate::protobuf::{
parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto,
JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto,
ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto,
TableParquetOptions as TableParquetOptionsProto,
};
use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto};

use super::LogicalExtensionCodec;

Expand Down Expand Up @@ -355,16 +347,32 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
}
}

impl TableParquetOptionsProto {
fn from_factory(factory: &ParquetFormatFactory) -> Self {
let global_options = if let Some(ref options) = factory.options {
options.clone()
} else {
return TableParquetOptionsProto::default();
};
#[cfg(feature = "parquet")]
mod parquet {
use super::*;

use crate::protobuf::{
parquet_column_options, parquet_options,
ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions,
ParquetOptions as ParquetOptionsProto,
TableParquetOptions as TableParquetOptionsProto,
};

use datafusion::{
config::{ParquetColumnOptions, ParquetOptions, TableParquetOptions},
datasource::file_format::parquet::ParquetFormatFactory,
};

impl TableParquetOptionsProto {
fn from_factory(factory: &ParquetFormatFactory) -> Self {
let global_options = if let Some(ref options) = factory.options {
options.clone()
} else {
return TableParquetOptionsProto::default();
};

let column_specific_options = global_options.column_specific_options;
#[allow(deprecated)] // max_statistics_size
let column_specific_options = global_options.column_specific_options;
#[allow(deprecated)] // max_statistics_size
TableParquetOptionsProto {
global: Some(ParquetOptionsProto {
enable_page_index: global_options.global.enable_page_index,
Expand Down Expand Up @@ -453,12 +461,12 @@ impl TableParquetOptionsProto {
})
.collect(),
}
}
}
}

impl From<&ParquetOptionsProto> for ParquetOptions {
fn from(proto: &ParquetOptionsProto) -> Self {
#[allow(deprecated)] // max_statistics_size
impl From<&ParquetOptionsProto> for ParquetOptions {
fn from(proto: &ParquetOptionsProto) -> Self {
#[allow(deprecated)] // max_statistics_size
ParquetOptions {
enable_page_index: proto.enable_page_index,
pruning: proto.pruning,
Expand Down Expand Up @@ -511,12 +519,12 @@ impl From<&ParquetOptionsProto> for ParquetOptions {
parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(),
}),
}
}
}
}

impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
fn from(proto: ParquetColumnOptionsProto) -> Self {
#[allow(deprecated)] // max_statistics_size
impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
fn from(proto: ParquetColumnOptionsProto) -> Self {
#[allow(deprecated)] // max_statistics_size
ParquetColumnOptions {
bloom_filter_enabled: proto.bloom_filter_enabled_opt.map(
|parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v,
Expand All @@ -540,124 +548,135 @@ impl From<ParquetColumnOptionsProto> for ParquetColumnOptions {
.bloom_filter_ndv_opt
.map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v),
}
}
}
}

impl From<&TableParquetOptionsProto> for TableParquetOptions {
fn from(proto: &TableParquetOptionsProto) -> Self {
TableParquetOptions {
global: proto
.global
.as_ref()
.map(ParquetOptions::from)
.unwrap_or_default(),
column_specific_options: proto
.column_specific_options
.iter()
.map(|parquet_column_options| {
(
parquet_column_options.column_name.clone(),
ParquetColumnOptions::from(
parquet_column_options.options.clone().unwrap_or_default(),
),
)
})
.collect(),
key_value_metadata: proto
.key_value_metadata
.iter()
.map(|(k, v)| (k.clone(), Some(v.clone())))
.collect(),
crypto: Default::default(),
impl From<&TableParquetOptionsProto> for TableParquetOptions {
fn from(proto: &TableParquetOptionsProto) -> Self {
TableParquetOptions {
global: proto
.global
.as_ref()
.map(ParquetOptions::from)
.unwrap_or_default(),
column_specific_options: proto
.column_specific_options
.iter()
.map(|parquet_column_options| {
(
parquet_column_options.column_name.clone(),
ParquetColumnOptions::from(
parquet_column_options
.options
.clone()
.unwrap_or_default(),
),
)
})
.collect(),
key_value_metadata: proto
.key_value_metadata
.iter()
.map(|(k, v)| (k.clone(), Some(v.clone())))
.collect(),
crypto: Default::default(),
}
}
}
}

#[derive(Debug)]
pub struct ParquetLogicalExtensionCodec;
#[derive(Debug)]
pub struct ParquetLogicalExtensionCodec;

// TODO! This is a placeholder for now and needs to be implemented for real.
impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
fn try_decode(
&self,
_buf: &[u8],
_inputs: &[datafusion_expr::LogicalPlan],
_ctx: &SessionContext,
) -> datafusion_common::Result<datafusion_expr::Extension> {
not_impl_err!("Method not implemented")
}
// TODO! This is a placeholder for now and needs to be implemented for real.
impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
fn try_decode(
&self,
_buf: &[u8],
_inputs: &[datafusion_expr::LogicalPlan],
_ctx: &SessionContext,
) -> datafusion_common::Result<datafusion_expr::Extension> {
not_impl_err!("Method not implemented")
}

fn try_encode(
&self,
_node: &datafusion_expr::Extension,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
not_impl_err!("Method not implemented")
}
fn try_encode(
&self,
_node: &datafusion_expr::Extension,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
not_impl_err!("Method not implemented")
}

fn try_decode_table_provider(
&self,
_buf: &[u8],
_table_ref: &TableReference,
_schema: arrow::datatypes::SchemaRef,
_ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>> {
not_impl_err!("Method not implemented")
}
fn try_decode_table_provider(
&self,
_buf: &[u8],
_table_ref: &TableReference,
_schema: arrow::datatypes::SchemaRef,
_ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn datafusion::datasource::TableProvider>>
{
not_impl_err!("Method not implemented")
}

fn try_encode_table_provider(
&self,
_table_ref: &TableReference,
_node: Arc<dyn datafusion::datasource::TableProvider>,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
not_impl_err!("Method not implemented")
}
fn try_encode_table_provider(
&self,
_table_ref: &TableReference,
_node: Arc<dyn datafusion::datasource::TableProvider>,
_buf: &mut Vec<u8>,
) -> datafusion_common::Result<()> {
not_impl_err!("Method not implemented")
}

fn try_decode_file_format(
&self,
buf: &[u8],
_ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to decode TableParquetOptionsProto: {e:?}"
fn try_decode_file_format(
&self,
buf: &[u8],
_ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
let proto = TableParquetOptionsProto::decode(buf).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to decode TableParquetOptionsProto: {e:?}"
))
})?;
let options: TableParquetOptions = (&proto).into();
Ok(Arc::new(
datafusion::datasource::file_format::parquet::ParquetFormatFactory {
options: Some(options),
},
))
})?;
let options: TableParquetOptions = (&proto).into();
Ok(Arc::new(ParquetFormatFactory {
options: Some(options),
}))
}
}

fn try_encode_file_format(
&self,
buf: &mut Vec<u8>,
node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
let options = if let Some(parquet_factory) =
node.as_any().downcast_ref::<ParquetFormatFactory>()
{
parquet_factory.options.clone().unwrap_or_default()
} else {
return Err(DataFusionError::Execution(
"Unsupported FileFormatFactory type".to_string(),
));
};
fn try_encode_file_format(
&self,
buf: &mut Vec<u8>,
node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
use datafusion::datasource::file_format::parquet::ParquetFormatFactory;

let options = if let Some(parquet_factory) =
node.as_any().downcast_ref::<ParquetFormatFactory>()
{
parquet_factory.options.clone().unwrap_or_default()
} else {
return Err(DataFusionError::Execution(
"Unsupported FileFormatFactory type".to_string(),
));
};

let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
options: Some(options),
});
let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory {
options: Some(options),
});

proto.encode(buf).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to encode TableParquetOptionsProto: {e:?}"
))
})?;
proto.encode(buf).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to encode TableParquetOptionsProto: {e:?}"
))
})?;

Ok(())
Ok(())
}
}
}
#[cfg(feature = "parquet")]
pub use parquet::ParquetLogicalExtensionCodec;

#[derive(Debug)]
pub struct ArrowLogicalExtensionCodec;
Expand Down