Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pbjson-types = { workspace = true }
prost = { workspace = true }
substrait = { version = "0.53", features = ["serde"] }
url = { workspace = true }
tokio = { workspace = true, features = ["fs"] }

[dev-dependencies]
datafusion = { workspace = true, features = ["nested_expressions"] }
Expand Down
45 changes: 31 additions & 14 deletions datafusion/substrait/src/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,59 @@ use datafusion::error::Result;
use datafusion::prelude::*;

use prost::Message;
use std::path::Path;
use substrait::proto::Plan;
use tokio::{
fs::OpenOptions,
io::{AsyncReadExt, AsyncWriteExt},
};

use std::fs::OpenOptions;
use std::io::{Read, Write};
/// Plans a sql and serializes the generated logical plan to bytes.
/// The bytes are then written into a file at `path`.
///
/// Returns an error if the file already exists.
pub async fn serialize(
sql: &str,
ctx: &SessionContext,
path: impl AsRef<Path>,
) -> Result<()> {
let protobuf_out = serialize_bytes(sql, ctx).await?;

#[allow(clippy::suspicious_open_options)]
pub async fn serialize(sql: &str, ctx: &SessionContext, path: &str) -> Result<()> {
let protobuf_out = serialize_bytes(sql, ctx).await;
let mut file = OpenOptions::new().create(true).write(true).open(path)?;
file.write_all(&protobuf_out?)?;
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(path)
.await?;
file.write_all(&protobuf_out).await?;
Ok(())
}

/// Plans a sql and serializes the generated logical plan to bytes.
pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) -> Result<Vec<u8>> {
Comment on lines +52 to 53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we replace the sql arg with a DataFrame arg, this function (and serialize) can be non-async.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure which one is preferable for users.
@alamb cc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave this one as a str in this PR and we can potentially add a DataFrame specific one as a follow on

However, given there is already an API to serialize LogicalPlans directly I am not sure how much more value a DataFrame one would add

https://docs.rs/datafusion-substrait/latest/datafusion_substrait/#example-serializing-logicalplans

let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let proto = producer::to_substrait_plan(&plan, &ctx.state())?;

let mut protobuf_out = Vec::<u8>::new();
proto.encode(&mut protobuf_out).map_err(|e| {
DataFusionError::Substrait(format!("Failed to encode substrait plan: {e}"))
})?;
proto
.encode(&mut protobuf_out)
.map_err(|e| DataFusionError::Substrait(format!("Failed to encode plan: {e}")))?;
Ok(protobuf_out)
}

pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
/// Reads the file at `path` and deserializes a plan from the bytes.
pub async fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {
Comment on lines +65 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up suggestion (breaking change): this function is marked async but it doesn't have to be (see comment on deserialize_bytes).

Suggested change
/// Reads the file at `path` and deserializes a plan from the bytes.
pub async fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {
/// Reads the file at `path` and deserializes a plan from the bytes.
pub fn deserialize(path: impl AsRef<Path>) -> Result<Box<Plan>> {

Copy link
Contributor Author

@niebayes niebayes Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to contribute more to the datafusion-substrait crate. Personally, I would like to reserve the breaking change in one of the following PR.

What do you think? @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree -- let's discuss API changes in follow on PRs / issues

let mut protobuf_in = Vec::<u8>::new();

let mut file = OpenOptions::new().read(true).open(path)?;
let mut file = OpenOptions::new().read(true).open(path).await?;
file.read_to_end(&mut protobuf_in).await?;

file.read_to_end(&mut protobuf_in)?;
deserialize_bytes(protobuf_in).await
}

/// Deserializes a plan from the bytes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up suggestion (breaking change): this function is marked async but it doesn't have to be.

pub async fn deserialize_bytes(proto_bytes: Vec<u8>) -> Result<Box<Plan>> {
Ok(Box::new(Message::decode(&*proto_bytes).map_err(|e| {
DataFusionError::Substrait(format!("Failed to decode substrait plan: {e}"))
DataFusionError::Substrait(format!("Failed to decode plan: {e}"))
})?))
}
20 changes: 20 additions & 0 deletions datafusion/substrait/tests/cases/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#[cfg(test)]
mod tests {
use datafusion::common::assert_contains;
use datafusion::datasource::provider_as_source;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
Expand All @@ -31,6 +32,25 @@ mod tests {
use substrait::proto::rel_common::{Emit, EmitKind};
use substrait::proto::{rel, RelCommon};

#[tokio::test]
async fn serialize_to_file() -> Result<()> {
let ctx = create_context().await?;
let path = "tests/serialize_to_file.bin";
let sql = "SELECT a, b FROM data";

// Test case 1: serializing to a non-existing file should succeed.
serializer::serialize(sql, &ctx, path).await?;
serializer::deserialize(path).await?;

// Test case 2: serializing to an existing file should fail.
let got = serializer::serialize(sql, &ctx, path).await.unwrap_err();
assert_contains!(got.to_string(), "File exists");

fs::remove_file(path)?;

Ok(())
}

#[tokio::test]
async fn serialize_simple_select() -> Result<()> {
let ctx = create_context().await?;
Expand Down
Loading