Skip to content

Commit

Permalink
Example update
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Mar 30, 2024
1 parent dd2b7ac commit 2d08cd4
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
6 changes: 4 additions & 2 deletions examples/arrow-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use axum_streams::*;

fn source_test_stream(schema: Arc<Schema>) -> impl Stream<Item = RecordBatch> {
// Simulating a stream with a plain vector and throttling to show how it works
stream::iter((0..10).map(move |_| {
stream::iter((0i64..10i64).map(move |idx| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![idx, idx * 2, idx * 3])),
Arc::new(StringArray::from(vec!["New York", "London", "Gothenburg"])),
Arc::new(Float64Array::from(vec![40.7128, 51.5074, 57.7089])),
Arc::new(Float64Array::from(vec![-74.0060, -0.1278, 11.9746])),
Expand All @@ -29,11 +30,12 @@ fn source_test_stream(schema: Arc<Schema>) -> impl Stream<Item = RecordBatch> {

async fn test_text_stream() -> impl IntoResponse {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
]));
StreamBodyAs::arrow(schema.clone(), source_test_stream(schema.clone()))
StreamBodyAs::arrow_ipc(schema.clone(), source_test_stream(schema.clone()))
}

#[tokio::main]
Expand Down
16 changes: 8 additions & 8 deletions src/arrow_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use http::HeaderMap;
use http_body::Frame;
use std::sync::Arc;

pub struct ArrowRecordBatchStreamFormat {
pub struct ArrowRecordBatchStreamIpcFormat {
schema: SchemaRef,
options: IpcWriteOptions,
}

impl ArrowRecordBatchStreamFormat {
impl ArrowRecordBatchStreamIpcFormat {
pub fn new(schema: Arc<Schema>) -> Self {
Self::with_options(schema, IpcWriteOptions::default())
}
Expand All @@ -28,7 +28,7 @@ impl ArrowRecordBatchStreamFormat {
}
}

impl StreamingFormat<RecordBatch> for ArrowRecordBatchStreamFormat {
impl StreamingFormat<RecordBatch> for ArrowRecordBatchStreamIpcFormat {
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, RecordBatch>,
Expand Down Expand Up @@ -66,19 +66,19 @@ impl StreamingFormat<RecordBatch> for ArrowRecordBatchStreamFormat {
}

impl<'a> crate::StreamBodyAs<'a> {
pub fn arrow<S>(schema: SchemaRef, stream: S) -> Self
pub fn arrow_ipc<S>(schema: SchemaRef, stream: S) -> Self
where
S: Stream<Item = RecordBatch> + 'a + Send,
{
Self::new(ArrowRecordBatchStreamFormat::new(schema), stream)
Self::new(ArrowRecordBatchStreamIpcFormat::new(schema), stream)
}

pub fn arrow_with_options<S>(schema: SchemaRef, stream: S, options: IpcWriteOptions) -> Self
pub fn arrow_ipc_with_options<S>(schema: SchemaRef, stream: S, options: IpcWriteOptions) -> Self
where
S: Stream<Item = RecordBatch> + 'a + Send,
{
Self::new(
ArrowRecordBatchStreamFormat::with_options(schema, options),
ArrowRecordBatchStreamIpcFormat::with_options(schema, options),
stream,
)
}
Expand Down Expand Up @@ -130,7 +130,7 @@ mod tests {
"/",
get(|| async move {
StreamBodyAs::new(
ArrowRecordBatchStreamFormat::new(app_schema.clone()),
ArrowRecordBatchStreamIpcFormat::new(app_schema.clone()),
test_stream,
)
}),
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub use protobuf_format::ProtobufStreamFormat;
#[cfg(feature = "arrow")]
mod arrow_format;
#[cfg(feature = "arrow")]
pub use arrow_format::ArrowRecordBatchStreamFormat;
pub use arrow_format::ArrowRecordBatchStreamIpcFormat;

#[cfg(test)]
mod test_client;

0 comments on commit 2d08cd4

Please sign in to comment.