Skip to content

Commit

Permalink
Document Async decoder usage (#4043) (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Apr 10, 2023
1 parent ff670c5 commit 250d95b
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 0 deletions.
3 changes: 3 additions & 0 deletions arrow-csv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ regex = { version = "1.7.0", default-features = false, features = ["std", "unico

[dev-dependencies]
tempfile = "3.3"
futures = "0.3"
tokio = { version = "1.27", default-features = false, features = ["io-util"] }
bytes = "1.4"
80 changes: 80 additions & 0 deletions arrow-csv/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! CSV Reader
//!
//! # Basic Usage
//!
//! This CSV reader allows CSV files to be read into the Arrow memory model. Records are
//! loaded in batches and are then converted from row-based data to columnar data.
//!
Expand All @@ -39,6 +41,84 @@
//! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, None, None);
//! let batch = csv.next().unwrap().unwrap();
//! ```
//!
//! # Async Usage
//!
//! The lower-level [`Decoder`] can be integrated with various forms of async data streams.
//!
//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
//!
//! ```
//! # use std::task::{Poll, ready};
//! # use bytes::{Buf, Bytes};
//! # use arrow_schema::ArrowError;
//! # use futures::stream::{Stream, StreamExt};
//! # use arrow_array::RecordBatch;
//! # use arrow_csv::reader::Decoder;
//! #
//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
//! mut decoder: Decoder,
//! mut input: S,
//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
//! let mut buffered = Bytes::new();
//! futures::stream::poll_fn(move |cx| {
//! loop {
//! if buffered.is_empty() {
//! if let Some(b) = ready!(input.poll_next_unpin(cx)) {
//! buffered = b;
//! }
//! }
//! let decoded = match decoder.decode(buffered.as_ref()) {
//! // Note: the decoder needs to be called with an empty
//! // array to delimit the final record
//! Ok(0) => break,
//! Ok(decoded) => decoded,
//! Err(e) => return Poll::Ready(Some(Err(e))),
//! };
//! buffered.advance(decoded);
//! }
//!
//! Poll::Ready(decoder.flush().transpose())
//! })
//! }
//!
//! ```
//!
//! In a similar vein, it can also be used with tokio-based IO primitives
//!
//! ```
//! # use std::pin::Pin;
//! # use std::task::{Poll, ready};
//! # use futures::Stream;
//! # use tokio::io::AsyncBufRead;
//! # use arrow_array::RecordBatch;
//! # use arrow_csv::reader::Decoder;
//! # use arrow_schema::ArrowError;
//! fn decode_stream<R: AsyncBufRead + Unpin>(
//! mut decoder: Decoder,
//! mut reader: R,
//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
//! futures::stream::poll_fn(move |cx| {
//! loop {
//! let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
//! Ok(b) => b,
//! Err(e) => return Poll::Ready(Some(Err(e.into()))),
//! };
//! let decoded = match decoder.decode(b) {
//! // Note: the decoder needs to be called with an empty
//! // array to delimit the final record
//! Ok(0) => break,
//! Ok(decoded) => decoded,
//! Err(e) => return Poll::Ready(Some(Err(e))),
//! };
//! Pin::new(&mut reader).consume(decoded);
//! }
//!
//! Poll::Ready(decoder.flush().transpose())
//! })
//! }
//! ```
//!

mod records;

Expand Down
3 changes: 3 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ lexical-core = { version = "0.8", default-features = false }
tempfile = "3.3"
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
futures = "0.3"
tokio = { version = "1.27", default-features = false, features = ["io-util"] }
bytes = "1.4"
107 changes: 107 additions & 0 deletions arrow-json/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,113 @@
//! A faster JSON reader that will eventually replace [`Reader`]
//!
//! [`Reader`]: crate::reader::Reader
//!
//! # Basic Usage
//!
//! [`RawReader`] can be used directly with synchronous data sources, such as [`std::fs::File`]
//!
//! ```
//! # use arrow_schema::*;
//! # use std::fs::File;
//! # use std::io::BufReader;
//! # use std::sync::Arc;
//!
//! let schema = Arc::new(Schema::new(vec![
//! Field::new("a", DataType::Float64, false),
//! Field::new("b", DataType::Float64, false),
//! Field::new("c", DataType::Boolean, true),
//! ]));
//!
//! let file = File::open("test/data/basic.json").unwrap();
//!
//! let mut json = arrow_json::RawReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
//! let batch = json.next().unwrap().unwrap();
//! ```
//!
//! # Async Usage
//!
//! The lower-level [`RawDecoder`] can be integrated with various forms of async data streams.
//!
//! For example, see below for how it can be used with an arbitrary `Stream` of `Bytes`
//!
//! ```
//! # use std::task::{Poll, ready};
//! # use bytes::{Buf, Bytes};
//! # use arrow_schema::ArrowError;
//! # use futures::stream::{Stream, StreamExt};
//! # use arrow_array::RecordBatch;
//! # use arrow_json::RawDecoder;
//! #
//! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
//! mut decoder: RawDecoder,
//! mut input: S,
//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
//! let mut buffered = Bytes::new();
//! futures::stream::poll_fn(move |cx| {
//! loop {
//! if buffered.is_empty() {
//! buffered = match ready!(input.poll_next_unpin(cx)) {
//! Some(b) => b,
//! None => break,
//! };
//! }
//! let decoded = match decoder.decode(buffered.as_ref()) {
//! Ok(decoded) => decoded,
//! Err(e) => return Poll::Ready(Some(Err(e))),
//! };
//! let read = buffered.len();
//! buffered.advance(decoded);
//! if decoded != read {
//! break
//! }
//! }
//!
//! Poll::Ready(decoder.flush().transpose())
//! })
//! }
//!
//! ```
//!
//! In a similar vein, it can also be used with tokio-based IO primitives
//!
//! ```
//! # use std::sync::Arc;
//! # use arrow_schema::{DataType, Field, Schema};
//! # use std::pin::Pin;
//! # use std::task::{Poll, ready};
//! # use futures::{Stream, TryStreamExt};
//! # use tokio::io::{AsyncBufRead, BufReader};
//! # use arrow_array::RecordBatch;
//! # use arrow_json::{RawDecoder, RawReaderBuilder};
//! # use arrow_schema::ArrowError;
//! # use tokio::fs::File;
//! fn decode_stream<R: AsyncBufRead + Unpin>(
//! mut decoder: RawDecoder,
//! mut reader: R,
//! ) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
//! futures::stream::poll_fn(move |cx| {
//! loop {
//! let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
//! Ok(b) if b.is_empty() => break,
//! Ok(b) => b,
//! Err(e) => return Poll::Ready(Some(Err(e.into()))),
//! };
//! let read = b.len();
//! let decoded = match decoder.decode(b) {
//! Ok(decoded) => decoded,
//! Err(e) => return Poll::Ready(Some(Err(e))),
//! };
//! Pin::new(&mut reader).consume(decoded);
//! if decoded != read {
//! break;
//! }
//! }
//!
//! Poll::Ready(decoder.flush().transpose())
//! })
//! }
//! ```
//!

use crate::raw::boolean_array::BooleanArrayDecoder;
use crate::raw::decimal_array::DecimalArrayDecoder;
Expand Down

0 comments on commit 250d95b

Please sign in to comment.