Skip to content

Commit

Permalink
[feat] add text stream support (#17)
Browse files Browse the repository at this point in the history
* [feat] add text stream support

* [misc] update document and format
  • Loading branch information
cxgreat2014 committed May 21, 2023
1 parent 8e07888 commit 48efc56
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 1 deletion.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ default = []
json = ["dep:serde", "dep:serde_json"]
csv = ["dep:csv", "dep:serde"]
protobuf = ["dep:prost"]
text = []

[dev-dependencies]
futures = "0.3"
Expand Down Expand Up @@ -67,5 +68,10 @@ name = "protobuf-example"
path = "examples/protobuf-example.rs"
required-features = ["protobuf"]

[[example]]
name = "text-example"
path = "examples/text-example.rs"
required-features = ["text"]

[build-dependencies]
cargo-husky = { version = "1.5", default-features = false, features = ["run-for-all", "prepush-hook", "run-cargo-fmt"] }
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Library provides HTTP response streaming support for [axum web framework](https:
- JSON lines stream format
- CSV stream
- Protobuf len-prefixed stream format
- Text stream

This type of responses are useful when you are reading huge stream of objects from some source (such as database, file, etc)
and want to avoid huge memory allocation.
Expand All @@ -18,7 +19,7 @@ and want to avoid huge memory allocation.
Cargo.toml:
```toml
[dependencies]
axum-streams = { version = "0.8", features=["json", "csv", "protobuf"] }
axum-streams = { version = "0.8", features=["json", "csv", "protobuf", "text"] }
```

## Compatibility matrix
Expand Down Expand Up @@ -59,6 +60,10 @@ async fn test_csv_stream() -> impl IntoResponse {
StreamBodyAs::csv(source_test_stream())
}

async fn test_text_stream() -> impl IntoResponse {
StreamBodyAs::text(source_test_stream())
}

```

All examples available at [examples](examples) directory.
Expand Down
37 changes: 37 additions & 0 deletions examples/text-example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use axum::response::IntoResponse;
use axum::routing::*;
use axum::Router;
use std::net::SocketAddr;

use futures::prelude::*;
use tokio_stream::StreamExt;

use axum_streams::*;

fn source_test_stream() -> impl Stream<Item = String> {
// Simulating a stream with a plain vector and throttling to show how it works
stream::iter(vec![
"苟利国家生死以,岂因祸福避趋之?".to_string();
1000
])
.throttle(std::time::Duration::from_millis(50))
}

async fn test_text_stream() -> impl IntoResponse {
StreamBodyAs::text(source_test_stream())
}

#[tokio::main]
async fn main() {
// build our application with a route
let app = Router::new()
// `GET /` goes to `root`
.route("/text-stream", get(test_text_stream));

let addr = SocketAddr::from(([127, 0, 0, 1], 8080));

axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ mod csv_format;
#[cfg(feature = "csv")]
pub use csv_format::CsvStreamFormat;

#[cfg(feature = "text")]
mod text_format;
#[cfg(feature = "text")]
pub use text_format::TextStreamFormat;

#[cfg(feature = "protobuf")]
mod protobuf_format;
#[cfg(feature = "protobuf")]
Expand Down
106 changes: 106 additions & 0 deletions src/text_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use crate::stream_format::StreamingFormat;
use futures::Stream;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use http::HeaderMap;

pub struct TextStreamFormat;

impl TextStreamFormat {
pub fn new() -> Self {
Self {}
}
}

impl StreamingFormat<String> for TextStreamFormat {
fn to_bytes_stream<'a, 'b>(
&'a self,
stream: BoxStream<'b, String>,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
fn write_text_record(obj: String) -> Result<Vec<u8>, axum::Error> {
let obj_vec = obj.as_bytes().to_vec();
Ok(obj_vec)
}

let stream_bytes: BoxStream<Result<axum::body::Bytes, axum::Error>> = Box::pin({
stream.map(move |obj| {
let write_text_res = write_text_record(obj);
write_text_res.map(axum::body::Bytes::from)
})
});

Box::pin(stream_bytes)
}

fn http_response_trailers(&self) -> Option<HeaderMap> {
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
"text/plain; charset=utf-8".parse().unwrap(),
);
Some(header_map)
}
}

impl<'a> crate::StreamBodyAs<'a> {
pub fn text<S>(stream: S) -> Self
where
S: Stream<Item = String> + 'a + Send,
{
Self::new(TextStreamFormat::new(), stream)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_client::*;
use crate::StreamBodyAs;
use axum::{routing::*, Router};
use futures_util::stream;

#[tokio::test]
async fn serialize_text_stream_format() {
#[derive(Clone, prost::Message)]
struct TestOutputStructure {
#[prost(string, tag = "1")]
foo1: String,
#[prost(string, tag = "2")]
foo2: String,
}

let test_stream_vec = vec![
String::from("bar1"),
String::from("bar2"),
String::from("bar3"),
String::from("bar4"),
String::from("bar5"),
String::from("bar6"),
String::from("bar7"),
String::from("bar8"),
String::from("bar9"),
];

let test_stream = Box::pin(stream::iter(test_stream_vec.clone()));

let app = Router::new().route(
"/",
get(|| async { StreamBodyAs::new(TextStreamFormat::new(), test_stream) }),
);

let client = TestClient::new(app);

let expected_text_buf: Vec<u8> = test_stream_vec
.iter()
.flat_map(|obj| {
let obj_vec = obj.as_bytes().to_vec();
obj_vec
})
.collect();

let res = client.get("/").send().await.unwrap();
let body = res.bytes().await.unwrap().to_vec();

assert_eq!(body, expected_text_buf);
}
}

0 comments on commit 48efc56

Please sign in to comment.