Skip to content

Commit

Permalink
JSON nl format fix and error handling updates (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Jul 27, 2023
1 parent 23fb59d commit ed6ef6c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 19 deletions.
5 changes: 4 additions & 1 deletion src/csv_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ where

fn http_response_trailers(&self) -> Option<HeaderMap> {
let mut header_map = HeaderMap::new();
header_map.insert(http::header::CONTENT_TYPE, "text/csv".parse().unwrap());
header_map.insert(
http::header::CONTENT_TYPE,
http::header::HeaderValue::from_static("text/csv"),
);
Some(header_map)
}
}
Expand Down
43 changes: 27 additions & 16 deletions src/json_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,22 @@ where
let stream_bytes: BoxStream<Result<axum::body::Bytes, axum::Error>> = Box::pin({
stream.enumerate().map(|(index, obj)| {
let mut buf = BytesMut::new().writer();
if index != 0 {
buf.write_all(JSON_ARRAY_SEP_BYTES).unwrap();
}
match serde_json::to_writer(&mut buf, &obj) {
Ok(_) => Ok(buf.into_inner().freeze()),
Err(e) => Err(axum::Error::new(e)),

let sep_write_res = if index != 0 {
buf.write_all(JSON_ARRAY_SEP_BYTES)
.map_err(axum::Error::new)
} else {
Ok(())
};

match sep_write_res {
Ok(_) => {
match serde_json::to_writer(&mut buf, &obj).map_err(axum::Error::new) {
Ok(_) => Ok(buf.into_inner().freeze()),
Err(e) => Err(e),
}
}
Err(e) => Err(e),
}
})
});
Expand All @@ -53,7 +63,7 @@ where
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
"application/json".parse().unwrap(),
http::header::HeaderValue::from_static("application/json"),
);
Some(header_map)
}
Expand All @@ -76,14 +86,14 @@ where
stream: BoxStream<'b, T>,
) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
let stream_bytes: BoxStream<Result<axum::body::Bytes, axum::Error>> = Box::pin({
stream.enumerate().map(|(index, obj)| {
stream.map(|obj| {
let mut buf = BytesMut::new().writer();
if index != 0 {
buf.write_all(JSON_NL_SEP_BYTES).unwrap();
}
match serde_json::to_writer(&mut buf, &obj) {
Ok(_) => Ok(buf.into_inner().freeze()),
Err(e) => Err(axum::Error::new(e)),
match serde_json::to_writer(&mut buf, &obj).map_err(axum::Error::new) {
Ok(_) => match buf.write_all(JSON_NL_SEP_BYTES).map_err(axum::Error::new) {
Ok(_) => Ok(buf.into_inner().freeze()),
Err(e) => Err(e),
},
Err(e) => Err(e),
}
})
});
Expand All @@ -95,7 +105,7 @@ where
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
"application/jsonstream".parse().unwrap(),
http::header::HeaderValue::from_static("application/jsonstream"),
);
Some(header_map)
}
Expand Down Expand Up @@ -197,7 +207,8 @@ mod tests {
.iter()
.map(|item| serde_json::to_string(item).unwrap())
.collect::<Vec<String>>()
.join("\n");
.join("\n")
+ "\n";

let res = client.get("/").send().await.unwrap();
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion src/protobuf_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
"application/x-protobuf-stream".parse().unwrap(),
http::header::HeaderValue::from_static("application/x-protobuf-stream"),
);
Some(header_map)
}
Expand Down
2 changes: 1 addition & 1 deletion src/text_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl StreamingFormat<String> for TextStreamFormat {
let mut header_map = HeaderMap::new();
header_map.insert(
http::header::CONTENT_TYPE,
"text/plain; charset=utf-8".parse().unwrap(),
http::header::HeaderValue::from_static("text/plain; charset=utf-8"),
);
Some(header_map)
}
Expand Down

0 comments on commit ed6ef6c

Please sign in to comment.