Skip to content

Commit

Permalink
client parses schema from ipc batches
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Feb 1, 2020
1 parent 31c894b commit 459bef3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
6 changes: 6 additions & 0 deletions rust/arrow/src/ipc/reader.rs
Expand Up @@ -29,6 +29,7 @@ use crate::compute::cast;
use crate::datatypes::{DataType, IntervalUnit, Schema, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::ipc::convert::fb_to_schema;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use DataType::*;

Expand Down Expand Up @@ -663,6 +664,11 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
}
}

pub fn schema_from_bytes(bytes: &[u8]) -> Option<Schema> {
let ipc = ipc::get_root_as_message(&bytes[..]);
ipc.header_as_schema().map(|schema| fb_to_schema(schema))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 5 additions & 1 deletion rust/datafusion/examples/flight-client.rs
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow::ipc::reader;
use flight::flight_service_client::FlightServiceClient;

use flight::Ticket;

#[tokio::main]
Expand All @@ -31,6 +31,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

while let Some(batch) = stream.message().await? {
println!("BATCH = {:?}", batch);

let schema = reader::schema_from_bytes(&batch.data_header);

println!("SCHEMA = {:?}", schema);
}

Ok(())
Expand Down

0 comments on commit 459bef3

Please sign in to comment.