Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow stream #36

Closed
happysalada opened this issue Feb 12, 2024 · 11 comments · Fixed by #39
Closed

Arrow stream #36

happysalada opened this issue Feb 12, 2024 · 11 comments · Fixed by #39
Labels
enhancement New feature or request

Comments

@happysalada
Copy link

Hey, thanks again for this library !
Would you consider adding apache arrow streams as well ?
Its a format used for dataframes, so well used for data intensive applications
Thank you !

@abdolence
Copy link
Owner

Hey, this is interesting, yes.
I'm familiar with the Apache Arrow project, though used it just once and never used in Rust.

Do you have particular use case in mind? Like sharing record of batches from array? What about client support?

@abdolence abdolence added the enhancement New feature or request label Feb 13, 2024
@happysalada
Copy link
Author

so the polars dataframe library (fastest dataframe library to my knowledge) uses arrow format under the hood. So any axum server returning a stream of recordbatch, could be consumed with a python polars dataframe client.
Did I answer your question ?

@abdolence
Copy link
Owner

Yes, thanks for more details. I'll explore this further myself and need to contemplate more.
Of course, if you already working on this, I'll gladly accept PRs :)

@happysalada
Copy link
Author

I might take a bit of time, but I want to have a look at it, thanks for being open for it!

@fcenedes
Copy link

fcenedes commented Mar 29, 2024

heya, first of all thanks for this little but handy crate.
I was looking to support arrow RecordBatch too in our project. here is what it may looks like (Arrow IPC format) :

use crate::stream_format::StreamingFormat;
use futures::Stream;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use http::HeaderMap;
use http_body::Frame;
use arrow_array::RecordBatch;
use arrow_ipc::CompressionType;
use arrow_ipc::writer::{IpcWriteOptions, StreamWriter};


pub struct ArrowIpcStreamFormat;

impl StreamingFormat<RecordBatch> for ArrowIpcStreamFormat {
    fn to_bytes_stream<'a, 'b>(
        &'a self,
        stream: BoxStream<'b, RecordBatch>,
    ) -> BoxStream<'b, Result<Frame<axum::body::Bytes>, axum::Error>> {
        let stream_bytes = Box::pin(stream.then(|batch| async move {
            let schema = batch.schema().clone();
            let cursor: Vec<u8> = Vec::new();
            let props: IpcWriteOptions = IpcWriteOptions::default().try_with_compression(Some(CompressionType::ZSTD)).unwrap();
            let mut writer: StreamWriter<Vec<u8>> = StreamWriter::try_new_with_options(cursor, &schema, props).map_err(|e|axum::Error::new(e))?;
            writer.write(&batch).map_err(|e|axum::Error::new(e))?;
            writer.finish().map_err(|e|axum::Error::new(e))?;

           let data = writer
                .into_inner()
                .map_err(|e|axum::Error::new(e))
                .map(axum::body::Bytes::from)
                .map(Frame::data);
            data
        }));
        Box::pin(stream_bytes)
    }

    fn http_response_trailers(&self) -> Option<HeaderMap> {
        let mut header_map = HeaderMap::new();
        header_map.insert(
            http::header::CONTENT_TYPE,
            http::header::HeaderValue::from_static("application/octet-stream"),
        );
        Some(header_map)
    }
}

impl<'a> crate::StreamBodyAs<'a> {
    pub fn arrow_ipc_stream<S>(stream: S) -> Self
        where
            S: Stream<Item=RecordBatch> + 'a + Send,
    {
        Self::new(ArrowIpcStreamFormat, stream)
    }
}

and it can be used that way :

use std::sync::Arc;
use std::time::Duration;
use arrow_array::{array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use axum::response::IntoResponse;
use axum::routing::*;
use axum::Router;

use futures::prelude::*;
use tokio::net::TcpListener;
use tokio_stream::StreamExt;

use axum_streams::*;



fn source_test_stream() -> impl stream::Stream<Item = RecordBatch> {
    // Simulating a stream with a vec of recordbatch and throttling to show how it works
    stream::iter((0..10).map(|_| {
        let schema = Arc::new(Schema::new(vec![
            Field::new("field1", DataType::Utf8, false),
            Field::new("field2", DataType::Utf8, false),
        ]));

        let array1 = Arc::new(array::StringArray::from(vec!["hello", "arrow"]));
        let array2 = Arc::new(array::StringArray::from(vec!["goodbye", "arrow"]));
        let batch = RecordBatch::try_new(schema.clone(), vec![array1, array2]).unwrap();

        batch
    }))
        .throttle(Duration::from_millis(500))
}

async fn test_arrow_stream() -> impl IntoResponse {
    StreamBodyAs::arrow_ipc_stream(source_test_stream())
}

#[tokio::main]
async fn main() {
    // build our application with a route
    let app = Router::new()
        // `GET /` goes to `root`
        .route("/arrow-stream", get(test_arrow_stream));

    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();

    axum::serve(listener, app).await.unwrap();
}

of course one may wanna move the logic to compress or not as something optional..but anyway it gives the idea i believe.

@abdolence
Copy link
Owner

Thanks for the example. I can add something similar in the next releases if this is something people would like to have in general.

@abdolence abdolence linked a pull request Mar 30, 2024 that will close this issue
@abdolence
Copy link
Owner

This is the implementation and API, I've prepared for now:
#39

Let me know if this is something you would expect.

@fcenedes
Copy link

fcenedes commented Mar 30, 2024

this is good stuff. thanks you. i see you have updated a couple of deps at the same time (good bye future_utils).

@abdolence
Copy link
Owner

abdolence commented Apr 13, 2024

Trying to work with the client implementation and the official stream decoders I realized the previous implementation wasn't neither correct nor efficient.
So I rewrote this in v0.14.1 to follow the expected stream format prepended only once with schema and with the correct ending. The size of stream also dramatically decreased.

v0.14.0...master

@abdolence
Copy link
Owner

Released in https://github.com/abdolence/axum-streams-rs/releases/tag/v0.14.1

@fcenedes
Copy link

Brilliant, and good catch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants