Skip to content

Commit

Permalink
ARROW-7684: [Rust] Example Flight client and server for DataFusion
Browse files Browse the repository at this point in the history
This PR adds DataFusion examples for a Flight client and server where the client can send a SQL query to the server and then receive the results.

I have manually tested with a Java client as well to confirm that it works.

Closes #6308 from andygrove/datafusion-flight-example and squashes the following commits:

788feef <Andy Grove> code cleanup
9c47338 <Neville Dipale> Complete flight client's record batch reader
1337b98 <Andy Grove> parse recordbatch
459bef3 <Andy Grove> client parses schema from ipc batches
31c894b <Andy Grove> update release test script
efe05ae <Andy Grove> update release test script
5ecea83 <Andy Grove> formatting
8b419da <Andy Grove> update release test script
03d2c84 <Andy Grove> client streams results
0a39a51 <Andy Grove> client can stream batches
e72c605 <Andy Grove> add starting point for flight-client example
ab28da8 <Andy Grove> get schema from query plan instead of from first batch
0901a3f <Neville Dipale> Merge branch 'datafusion-flight-example' of https://github.com/andygrove/arrow into datafusion-flight-example
ad2e3b0 <Neville Dipale> send schema before batches
996f2a4 <Andy Grove> Use PARQUET_TEST_DATA env var
260f9ca <Neville Dipale> fix license violation
516b66d <Neville Dipale> add helpers to convert record batch to flight data proto message
6beb4ea <Andy Grove> WIP example Flight server for DataFusion

Lead-authored-by: Andy Grove <andygrove73@gmail.com>
Co-authored-by: Neville Dipale <nevilledips@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
  • Loading branch information
2 people authored and kszucs committed Feb 7, 2020
1 parent c477183 commit fee1209
Show file tree
Hide file tree
Showing 11 changed files with 394 additions and 18 deletions.
8 changes: 6 additions & 2 deletions dev/release/00-prepare-test.rb
Expand Up @@ -276,7 +276,9 @@ def test_version_pre_tag
["-arrow = { path = \"../arrow\", version = \"#{@snapshot_version}\" }",
"-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }",
"+arrow = { path = \"../arrow\", version = \"#{@release_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"]
"+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"],
["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@snapshot_version}\" }",
"+arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }"]
],
},
{
Expand Down Expand Up @@ -458,7 +460,9 @@ def test_version_post_tag
["-arrow = { path = \"../arrow\", version = \"#{@release_version}\" }",
"-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }",
"+arrow = { path = \"../arrow\", version = \"#{@next_snapshot_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"]
"+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"],
["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }",
"+arrow-flight = { path = \"../arrow-flight\", version = \"#{@next_snapshot_version}\" }"]
],
},
{
Expand Down
4 changes: 3 additions & 1 deletion rust/arrow/Cargo.toml
Expand Up @@ -50,10 +50,12 @@ packed_simd = { version = "0.3.1", optional = true }
chrono = "0.4"
flatbuffers = "0.6.0"
hex = "0.4"
arrow-flight = { path = "../arrow-flight", optional = true }

[features]
simd = ["packed_simd"]
default = ["simd"]
flight = ["arrow-flight"]
default = ["simd", "flight"]

[dev-dependencies]
criterion = "0.2"
Expand Down
83 changes: 83 additions & 0 deletions rust/arrow/src/flight/mod.rs
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities to assist with reading and writing Arrow data as Flight messages

use std::convert::TryFrom;
use std::sync::Arc;

use flight::FlightData;

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::ipc::{convert, reader, writer};
use crate::record_batch::RecordBatch;

/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let (header, body) = writer::record_batch_to_bytes(batch);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: header,
data_body: body,
}
}
}

/// Convert a `Schema` to `FlightData` by converting to an IPC message
impl From<&Schema> for FlightData {
fn from(schema: &Schema) -> Self {
let schema = writer::schema_to_bytes(schema);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema,
data_body: vec![],
}
}
}

/// Try convert `FlightData` into an Arrow Schema
///
/// Returns an error if the `FlightData` header is not a valid IPC schema
impl TryFrom<&FlightData> for Schema {
type Error = ArrowError;
fn try_from(data: &FlightData) -> Result<Self> {
convert::schema_from_bytes(&data.data_header[..]).ok_or(ArrowError::ParseError(
"Unable to convert flight data to Arrow schema".to_string(),
))
}
}

/// Convert a FlightData message to a RecordBatch
pub fn flight_data_to_batch(
data: &FlightData,
schema: Arc<Schema>,
) -> Result<Option<RecordBatch>> {
// check that the data_header is a record batch message
let message = crate::ipc::get_root_as_message(&data.data_header[..]);
let batch_header = message
.header_as_record_batch()
.ok_or(ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
))?;
reader::read_record_batch(&data.data_body, batch_header, schema)
}

// TODO: add more explicit conversion that expoess flight descriptor and metadata options
6 changes: 6 additions & 0 deletions rust/arrow/src/ipc/convert.rs
Expand Up @@ -152,6 +152,12 @@ pub(crate) fn fb_to_schema(fb: ipc::Schema) -> Schema {
Schema::new_with_metadata(fields, metadata)
}

/// Deserialize an IPC message into a schema
pub(crate) fn schema_from_bytes(bytes: &[u8]) -> Option<Schema> {
let ipc = ipc::get_root_as_message(bytes);
ipc.header_as_schema().map(|schema| fb_to_schema(schema))
}

/// Get the Arrow data type from the flatbuffer Field table
pub(crate) fn get_data_type(field: ipc::Field) -> DataType {
match field.type_type() {
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/ipc/reader.rs
Expand Up @@ -348,7 +348,7 @@ fn create_list_array(
}

/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
fn read_record_batch(
pub(crate) fn read_record_batch(
buf: &Vec<u8>,
batch: ipc::RecordBatch,
schema: Arc<Schema>,
Expand Down
37 changes: 23 additions & 14 deletions rust/arrow/src/ipc/writer.rs
Expand Up @@ -209,8 +209,7 @@ impl<W: Write> Drop for StreamWriter<W> {
}
}

/// Convert the schema to its IPC representation, and write it to the `writer`
fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
pub(crate) fn schema_to_bytes(schema: &Schema) -> Vec<u8> {
let mut fbb = FlatBufferBuilder::new();
let schema = {
let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
Expand All @@ -227,9 +226,13 @@ fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<
fbb.finish(data, None);

let data = fbb.finished_data();
let written = write_padded_data(writer, data, WriteDataType::Header);
data.to_vec()
}

written
/// Convert the schema to its IPC representation, and write it to the `writer`
fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
let data = schema_to_bytes(schema);
write_padded_data(writer, &data[..], WriteDataType::Header)
}

/// The message type being written. This determines whether to write the data length or not.
Expand Down Expand Up @@ -266,13 +269,8 @@ fn write_padded_data<R: Write>(
Ok(total_len as usize)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_record_batch<R: Write>(
writer: &mut BufWriter<R>,
batch: &RecordBatch,
is_stream: bool,
) -> Result<(usize, usize)> {
/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data
pub(crate) fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec<u8>, Vec<u8>) {
let mut fbb = FlatBufferBuilder::new();

let mut nodes: Vec<ipc::FieldNode> = vec![];
Expand Down Expand Up @@ -313,13 +311,24 @@ fn write_record_batch<R: Write>(
let root = message.finish();
fbb.finish(root, None);
let finished_data = fbb.finished_data();

(finished_data.to_vec(), arrow_data)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
fn write_record_batch<R: Write>(
writer: &mut BufWriter<R>,
batch: &RecordBatch,
is_stream: bool,
) -> Result<(usize, usize)> {
let (meta_data, arrow_data) = record_batch_to_bytes(batch);
// write the length of data if writing to stream
if is_stream {
let total_len: u32 = finished_data.len() as u32;
let total_len: u32 = meta_data.len() as u32;
writer.write(&total_len.to_le_bytes()[..])?;
}
let meta_written =
write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?;
let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?;
let arrow_data_written =
write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?;
Ok((meta_written, arrow_data_written))
Expand Down
2 changes: 2 additions & 0 deletions rust/arrow/src/lib.rs
Expand Up @@ -33,6 +33,8 @@ pub mod compute;
pub mod csv;
pub mod datatypes;
pub mod error;
#[cfg(feature = "flight")]
pub mod flight;
pub mod ipc;
pub mod json;
pub mod memory;
Expand Down
6 changes: 6 additions & 0 deletions rust/datafusion/Cargo.toml
Expand Up @@ -56,6 +56,12 @@ crossbeam = "0.7.1"
[dev-dependencies]
criterion = "0.2.0"
tempdir = "0.3.7"
futures = "0.3"
prost = "0.6"
tokio = { version = "0.2", features = ["macros"] }
tonic = "0.1"
flatbuffers = "0.6.0"
arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" }

[[bench]]
name = "aggregate_query_sql"
Expand Down
28 changes: 28 additions & 0 deletions rust/datafusion/examples/README.md
@@ -0,0 +1,28 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Examples

## Single Process

The examples `csv_sql.rs` and `parquet_sql.rs` demonstrate building a query plan from a SQL statement and then executing the query plan against local CSV and Parquet files, respectively.

## Distributed

The `flight-client.rs` and `flight-server.rs` examples demonstrate how to run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol.
62 changes: 62 additions & 0 deletions rust/datafusion/examples/flight-client.rs
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::convert::TryFrom;
use std::sync::Arc;

use arrow::array::Int32Array;
use arrow::datatypes::Schema;
use arrow::flight::flight_data_to_batch;
use flight::flight_service_client::FlightServiceClient;
use flight::Ticket;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = FlightServiceClient::connect("http://localhost:50051").await?;

let request = tonic::Request::new(Ticket {
ticket: "SELECT id FROM alltypes_plain".into(),
});

let mut stream = client.do_get(request).await?.into_inner();

// the schema should be the first message returned, else client should error
let flight_data = stream.message().await?.unwrap();
// convert FlightData to a stream
let schema = Arc::new(Schema::try_from(&flight_data)?);
println!("Schema: {:?}", schema);

// all the remaining stream messages should be dictionary and record batches
while let Some(flight_data) = stream.message().await? {
// the unwrap is infallible and thus safe
let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap();

println!(
"record_batch has {} columns and {} rows",
record_batch.num_columns(),
record_batch.num_rows()
);
let column = record_batch.column(0);
let column = column
.as_any()
.downcast_ref::<Int32Array>()
.expect("Unable to get column");
println!("Column 1: {:?}", column);
}

Ok(())
}

0 comments on commit fee1209

Please sign in to comment.