Skip to content

Commit

Permalink
refactor: removed spawns in favor of stream chains
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Oct 13, 2023
1 parent b80b206 commit cb69160
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 54 deletions.
2 changes: 1 addition & 1 deletion crates/interfaces/wick-interface-http/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
name: http
kind: wick/types@v1
metadata:
version: 0.4.0
version: 0.5.0
package:
registry:
host: registry.candle.dev
Expand Down
103 changes: 50 additions & 53 deletions crates/wick/wick-trigger-http/src/http/component_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures::stream::StreamExt;
use hyper::header::{CONTENT_LENGTH, CONTENT_TYPE};
use hyper::http::response::Builder;
use hyper::http::{HeaderName, HeaderValue};
use hyper::{Body, Response, StatusCode};
use parking_lot::Mutex;
use serde_json::{Map, Value};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use tracing::Span;
use uuid::Uuid;
Expand All @@ -15,6 +16,7 @@ use wick_interface_http::types::{self as wick_http};
use wick_packet::{
packets,
Base64Bytes,
BoxStream,
Entity,
InherentData,
Invocation,
Expand Down Expand Up @@ -158,12 +160,11 @@ pub(super) async fn respond(
let stream = stream.unwrap();
let builder = Response::builder();

let (handle, response, mut body_stream) = split_stream(stream);
let (response, mut body_stream) = split_stream(stream);

let response = match response.await {
Ok(response) => response?,
Err(e) => {
handle.abort();
return Ok(
Builder::new()
.status(StatusCode::INTERNAL_SERVER_ERROR)
Expand All @@ -180,43 +181,31 @@ pub(super) async fn respond(
.map_or(false, |v| v == "text/event-stream");

let res = if event_stream {
let (tx, rx) = unbounded_channel();
let _output_handle = tokio::spawn(async move {
while let Some(p) = body_stream.recv().await {
if !p.has_data() {
continue;
}
match codec {
Codec::Json => {
let chunk = p
.decode::<wick_http::HttpEvent>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(|v| to_sse_string_bytes(&v));
let _ = tx.send(chunk);
}
Codec::Raw => {
let chunk = p
.decode::<Base64Bytes>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(Into::into);
let _ = tx.send(chunk);
}
Codec::Text => {
let chunk = p
.decode::<String>()
.map_err(|e| HttpError::Utf8Text(e.to_string()))
.map(Into::into);
let _ = tx.send(chunk);
}
Codec::FormData => unreachable!("FormData is not supported as a decoder for HTTP responses"),
}
let body_stream = body_stream.filter_map(move |p| async move {
if !p.has_data() {
return None;
}
Some(match codec {
Codec::Json => p
.decode::<wick_http::HttpEvent>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(|v| to_sse_string_bytes(&v)),
Codec::Raw => p
.decode::<Base64Bytes>()
.map_err(|e| HttpError::Bytes(e.to_string()))
.map(Into::into),
Codec::Text => p
.decode::<String>()
.map_err(|e| HttpError::Utf8Text(e.to_string()))
.map(Into::into),
Codec::FormData => unreachable!("FormData is not supported as a decoder for HTTP responses"),
})
});
let body = Body::wrap_stream(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
let body = Body::wrap_stream(body_stream);
builder.body(body).unwrap()
} else {
let mut body = bytes::BytesMut::new();
while let Some(p) = body_stream.recv().await {
while let Some(p) = body_stream.next().await {
if let PacketPayload::Err(e) = p.payload() {
return Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned()));
}
Expand All @@ -241,52 +230,60 @@ pub(super) async fn respond(
}

fn split_stream(
mut stream: PacketStream,
stream: PacketStream,
) -> (
tokio::task::JoinHandle<()>,
oneshot::Receiver<Result<wick_http::HttpResponse, HttpError>>,
UnboundedReceiver<Packet>,
BoxStream<Packet>,
) {
let (body_tx, body_rx) = unbounded_channel();
let (res_tx, res_rx) = oneshot::channel();
let mut res_tx = Some(res_tx);

let handle = tokio::spawn(async move {
while let Some(packet) = stream.next().await {
match packet {
let res_tx = Arc::new(Mutex::new(Some(res_tx)));

let body = stream.filter_map(move |p| {
let res_tx = Arc::clone(&res_tx);
async move {
match p {
Ok(p) => {
if p.port() == "response" {
if p.is_done() {
continue;
return None;
}
let Some(sender) = res_tx.take() else {
let Some(sender) = res_tx.lock().take() else {
// we only respect the first packet to the response port.
continue;
return None;
};
if let PacketPayload::Err(e) = p.payload() {
let _ = sender.send(Err(HttpError::OutputStream(p.port().to_owned(), e.msg().to_owned())));
break;
return None;
}
let response: Result<wick_http::HttpResponse, _> = p
.decode()
.map_err(|e| HttpError::Deserialize("response".to_owned(), e.to_string()));
let _ = sender.send(response);
return None;
} else if p.port() == "body" {
let _ = body_tx.send(p);
return Some(p);
}
if let Some(sender) = res_tx.lock().take() {
if let PacketPayload::Err(e) = p.payload {
error!(error=%e,"http:stream:error");
let _ = sender.send(Err(HttpError::OperationError(e.to_string())));
}
};
None
}
Err(e) => {
if let Some(sender) = res_tx.take() {
if let Some(sender) = res_tx.lock().take() {
let _ = sender.send(Err(HttpError::OperationError(e.to_string())));
}
warn!(?e, "http:stream:error");
break;
};
error!(error=%e,"http:stream:error");
None
}
}
}
});

(handle, res_rx, body_rx)
(res_rx, body.boxed())
}

fn to_sse_string_bytes(event: &wick_http::HttpEvent) -> Vec<u8> {
Expand Down

0 comments on commit cb69160

Please sign in to comment.