Skip to content

Commit

Permalink
parse recordbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Feb 1, 2020
1 parent 459bef3 commit 1337b98
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
10 changes: 10 additions & 0 deletions rust/arrow/src/ipc/reader.rs
Expand Up @@ -669,6 +669,16 @@ pub fn schema_from_bytes(bytes: &[u8]) -> Option<Schema> {
ipc.header_as_schema().map(|schema| fb_to_schema(schema))
}

pub fn recordbatch_from_bytes(
bytes: &[u8],
schema: Arc<Schema>,
) -> Result<Option<RecordBatch>> {
let ipc = ipc::get_root_as_message(&bytes[..]);
match ipc.header_as_record_batch() {
Some(batch) => read_record_batch(&bytes[..].to_vec(), batch, schema),
None => Ok(None),
}
}
#[cfg(test)]
mod tests {
use super::*;
Expand Down
34 changes: 27 additions & 7 deletions rust/datafusion/examples/flight-client.rs
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::Schema;
use arrow::ipc::reader;
use flight::flight_service_client::FlightServiceClient;
use flight::Ticket;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -28,13 +30,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

let mut stream = client.do_get(request).await?.into_inner();

while let Some(batch) = stream.message().await? {
println!("BATCH = {:?}", batch);

let schema = reader::schema_from_bytes(&batch.data_header);

println!("SCHEMA = {:?}", schema);
let mut batch_schema: Option<Arc<Schema>> = None;

while let Some(flight_data) = stream.message().await? {
println!("FlightData = {:?}", flight_data);

if let Some(schema) = reader::schema_from_bytes(&flight_data.data_header) {
println!("Schema = {:?}", schema);
batch_schema = Some(Arc::new(schema.clone()));
}

match batch_schema {
Some(ref schema) => {
if let Some(record_batch) = reader::recordbatch_from_bytes(
&flight_data.data_header,
schema.clone(),
)? {
println!(
"record_batch has {} columns and {} rows",
record_batch.num_columns(),
record_batch.num_rows()
);
}
}
None => {}
}
}

Ok(())
Expand Down

0 comments on commit 1337b98

Please sign in to comment.