Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ mod roundtrip_tests {
];

for test_case in test_cases.into_iter() {
let proto: super::protobuf::ArrowType = (&test_case).into();
let proto: super::protobuf::ArrowType = (&test_case).try_into().unwrap();
let roundtrip: DataType = (&proto).try_into().unwrap();
assert_eq!(format!("{:?}", test_case), format!("{:?}", roundtrip));
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ impl AsLogicalPlan for LogicalPlanNode {
})
}
};
let schema: protobuf::Schema = schema.as_ref().into();
let schema: protobuf::Schema = schema.as_ref().try_into()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


let filters: Vec<protobuf::LogicalExprNode> = filters
.iter()
Expand Down Expand Up @@ -1048,7 +1048,7 @@ impl AsLogicalPlan for LogicalPlanNode {
location: location.clone(),
file_type: file_type.clone(),
has_header: *has_header,
schema: Some(df_schema.into()),
schema: Some(df_schema.try_into()?),
table_partition_cols: table_partition_cols.clone(),
if_not_exists: *if_not_exists,
delimiter: String::from(*delimiter),
Expand Down Expand Up @@ -1083,7 +1083,7 @@ impl AsLogicalPlan for LogicalPlanNode {
protobuf::CreateCatalogSchemaNode {
schema_name: schema_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.into()),
schema: Some(df_schema.try_into()?),
},
)),
}),
Expand All @@ -1096,7 +1096,7 @@ impl AsLogicalPlan for LogicalPlanNode {
protobuf::CreateCatalogNode {
catalog_name: catalog_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.into()),
schema: Some(df_schema.try_into()?),
},
)),
}),
Expand Down
124 changes: 74 additions & 50 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

use crate::protobuf::{
self,
arrow_type::ArrowTypeEnum,
plan_type::PlanTypeEnum::{
FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
OptimizedLogicalPlan, OptimizedPhysicalPlan,
Expand Down Expand Up @@ -124,27 +125,36 @@ impl Error {
}
}

impl From<&Field> for protobuf::Field {
fn from(field: &Field) -> Self {
Self {
impl TryFrom<&Field> for protobuf::Field {
type Error = Error;

fn try_from(field: &Field) -> Result<Self, Self::Error> {
let arrow_type = field.data_type().try_into()?;
Ok(Self {
name: field.name().to_owned(),
arrow_type: Some(Box::new(field.data_type().into())),
arrow_type: Some(Box::new(arrow_type)),
nullable: field.is_nullable(),
children: Vec::new(),
}
})
}
}

impl From<&DataType> for protobuf::ArrowType {
fn from(val: &DataType) -> Self {
Self {
arrow_type_enum: Some(val.into()),
}
impl TryFrom<&DataType> for protobuf::ArrowType {
type Error = Error;

fn try_from(val: &DataType) -> Result<Self, Self::Error> {
let arrow_type_enum: ArrowTypeEnum = val.try_into()?;
Ok(Self {
arrow_type_enum: Some(arrow_type_enum),
})
}
}

impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fn from(val: &DataType) -> Self {
impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
type Error = Error;

fn try_from(val: &DataType) -> Result<Self, Self::Error> {
let res =
match val {
DataType::Null => Self::None(EmptyMessage {}),
DataType::Boolean => Self::Bool(EmptyMessage {}),
Expand Down Expand Up @@ -185,53 +195,55 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
DataType::Utf8 => Self::Utf8(EmptyMessage {}),
DataType::LargeUtf8 => Self::LargeUtf8(EmptyMessage {}),
DataType::List(item_type) => Self::List(Box::new(protobuf::List {
field_type: Some(Box::new(item_type.as_ref().into())),
field_type: Some(Box::new(item_type.as_ref().try_into()?)),
})),
DataType::FixedSizeList(item_type, size) => {
Self::FixedSizeList(Box::new(protobuf::FixedSizeList {
field_type: Some(Box::new(item_type.as_ref().into())),
field_type: Some(Box::new(item_type.as_ref().try_into()?)),
list_size: *size,
}))
}
DataType::LargeList(item_type) => Self::LargeList(Box::new(protobuf::List {
field_type: Some(Box::new(item_type.as_ref().into())),
field_type: Some(Box::new(item_type.as_ref().try_into()?)),
})),
DataType::Struct(struct_fields) => Self::Struct(protobuf::Struct {
sub_field_types: struct_fields
.iter()
.map(|field| field.into())
.collect::<Vec<_>>(),
.map(|field| field.try_into())
.collect::<Result<Vec<_>, Error>>()?,
}),
DataType::Union(union_types, type_ids, union_mode) => {
let union_mode = match union_mode {
UnionMode::Sparse => protobuf::UnionMode::Sparse,
UnionMode::Dense => protobuf::UnionMode::Dense,
};
Self::Union(protobuf::Union {
union_types: union_types.iter().map(Into::into).collect(),
union_types: union_types.iter().map(|field| field.try_into()).collect::<Result<Vec<_>, Error>>()?,
union_mode: union_mode.into(),
type_ids: type_ids.iter().map(|x| *x as i32).collect(),
})
}
DataType::Dictionary(key_type, value_type) => {
Self::Dictionary(Box::new(protobuf::Dictionary {
key: Some(Box::new(key_type.as_ref().into())),
value: Some(Box::new(value_type.as_ref().into())),
key: Some(Box::new(key_type.as_ref().try_into()?)),
value: Some(Box::new(value_type.as_ref().try_into()?)),
}))
}
DataType::Decimal128(whole, fractional) => Self::Decimal(protobuf::Decimal {
whole: *whole as u64,
fractional: *fractional as u64,
}),
DataType::Decimal256(_, _) => {
unimplemented!("Proto serialization error: The Decimal256 data type is not yet supported")
return Err(Error::General("Proto serialization error: The Decimal256 data type is not yet supported".to_owned()))
}
DataType::Map(_, _) => {
unimplemented!(
"Proto serialization error: The Map data type is not yet supported"
)
return Err(Error::General(
"Proto serialization error: The Map data type is not yet supported".to_owned()
))
}
}
};

Ok(res)
}
}

Expand All @@ -252,48 +264,60 @@ impl From<&Column> for protobuf::Column {
}
}

impl From<&Schema> for protobuf::Schema {
fn from(schema: &Schema) -> Self {
Self {
impl TryFrom<&Schema> for protobuf::Schema {
type Error = Error;

fn try_from(schema: &Schema) -> Result<Self, Self::Error> {
Ok(Self {
columns: schema
.fields()
.iter()
.map(protobuf::Field::from)
.collect::<Vec<_>>(),
}
.map(protobuf::Field::try_from)
.collect::<Result<Vec<_>, Error>>()?,
})
}
}

impl From<SchemaRef> for protobuf::Schema {
fn from(schema: SchemaRef) -> Self {
Self {
impl TryFrom<SchemaRef> for protobuf::Schema {
type Error = Error;

fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
Ok(Self {
columns: schema
.fields()
.iter()
.map(protobuf::Field::from)
.collect::<Vec<_>>(),
}
.map(protobuf::Field::try_from)
.collect::<Result<Vec<_>, Error>>()?,
})
}
}

impl From<&DFField> for protobuf::DfField {
fn from(f: &DFField) -> protobuf::DfField {
protobuf::DfField {
field: Some(f.field().into()),
impl TryFrom<&DFField> for protobuf::DfField {
type Error = Error;

fn try_from(f: &DFField) -> Result<Self, Self::Error> {
Ok(Self {
field: Some(f.field().try_into()?),
qualifier: f.qualifier().map(|r| protobuf::ColumnRelation {
relation: r.to_string(),
}),
}
})
}
}

impl From<&DFSchemaRef> for protobuf::DfSchema {
fn from(s: &DFSchemaRef) -> protobuf::DfSchema {
let columns = s.fields().iter().map(|f| f.into()).collect::<Vec<_>>();
protobuf::DfSchema {
impl TryFrom<&DFSchemaRef> for protobuf::DfSchema {
type Error = Error;

fn try_from(s: &DFSchemaRef) -> Result<Self, Self::Error> {
let columns = s
.fields()
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, Error>>()?;
Ok(Self {
columns,
metadata: s.metadata().clone(),
}
})
}
}

Expand Down Expand Up @@ -771,7 +795,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Expr::Cast { expr, data_type } => {
let expr = Box::new(protobuf::CastNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
arrow_type: Some(data_type.into()),
arrow_type: Some(data_type.try_into()?),
});
Self {
expr_type: Some(ExprType::Cast(expr)),
Expand Down Expand Up @@ -954,7 +978,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::ListValue(
protobuf::ScalarListValue {
field: Some(boxed_field.as_ref().into()),
field: Some(boxed_field.as_ref().try_into()?),
values: Vec::new(),
},
)),
Expand Down Expand Up @@ -1044,7 +1068,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::ListValue(
protobuf::ScalarListValue {
field: Some(boxed_field.as_ref().into()),
field: Some(boxed_field.as_ref().try_into()?),
values: type_checked_values,
},
)),
Expand Down