diff --git a/src/handlers.rs b/src/handlers.rs index 67188ef..3b56f8b 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -55,8 +55,7 @@ impl WasmRouteHandler { let response = compose_response(stream_writer).await?; // TODO: handle errors - // TODO: c'mon man - tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + tokio::task::yield_now().await; Ok(response) } diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 9634cd3..7b6e23a 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -7,21 +7,22 @@ pub struct StreamWriter { pending: Arc>>, done: Arc>, // A way for the write side to signal new data to the stream side - write_index: Arc>, - write_index_sender: Arc>, - write_index_receiver: tokio::sync::watch::Receiver, + // ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY + // write_index: Arc>, + // write_index_sender: Arc>, + // write_index_receiver: tokio::sync::watch::Receiver, } impl StreamWriter { pub fn new() -> Self { - let write_index = 0; - let (tx, rx) = tokio::sync::watch::channel(write_index); + // let write_index = 0; + // let (tx, rx) = tokio::sync::watch::channel(write_index); Self { pending: Arc::new(RwLock::new(vec![])), done: Arc::new(RwLock::new(false)), - write_index: Arc::new(RwLock::new(write_index)), - write_index_sender: Arc::new(tx), - write_index_receiver: rx, + // write_index: Arc::new(RwLock::new(write_index)), + // write_index_sender: Arc::new(tx), + // write_index_receiver: rx, } } @@ -34,11 +35,14 @@ impl StreamWriter { Err(e) => Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) }; - { - let mut write_index = self.write_index.write().unwrap(); - *write_index = *write_index + 1; - self.write_index_sender.send(*write_index).unwrap(); - } + // This was meant to wake up listener threads when there was new data but it ended up + // just stalling until input was complete. TODO: investigate so we can get rid of the + // duration-based polling. + // { + // let mut write_index = self.write_index.write().unwrap(); + // *write_index = *write_index + 1; + // self.write_index_sender.send(*write_index).unwrap(); + // } result } @@ -68,6 +72,9 @@ impl StreamWriter { return Err(anyhow::anyhow!("Internal error: StreamWriter::header_block can't take lock: {}", e)); }, } + // See comments on the as_stream loop, though using the change signal + // blocked this *completely* until end of writing! (And everything else + // waits on this.) tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; } } @@ -82,21 +89,26 @@ impl StreamWriter { if self.is_done() { return; } else { - // Not sure this is the smoothest way to do it. The oldest way was: - // tokio::time::sleep(tokio::time::Duration::from_micros(20)).await; - // which is a hideous kludge but subjectively felt quicker (but the - // number say not, so what is truth anyway) - match self.write_index_receiver.changed().await { - Ok(_) => continue, - Err(e) => { - // If this ever happens (which it, cough, shouldn't), it means all senders have - // closed, which _should_ mean we are done. Log the error - // but don't return it to the stream: the response as streamed so far - // _should_ be okay! - tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); - return; - } - } + // Not sure how to do this better. I tried using a signal that data + // had changed (via tokio::sync::watch::channel()), but that effectively + // blocked - we got the first chunk quickly but then it stalled waiting + // for the change notification. Polling is awful (and this interval is + // probably too aggressive) but I don't know how to get signalling + // to work! + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + + // For the record: this is what I tried: + // match self.write_index_receiver.changed().await { + // Ok(_) => continue, + // Err(e) => { + // // If this ever happens (which it, cough, shouldn't), it means all senders have + // // closed, which _should_ mean we are done. Log the error + // // but don't return it to the stream: the response as streamed so far + // // _should_ be okay! + // tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); + // return; + // } + // } } } else { yield Ok(v);