Skip to content

Commit

Permalink
Merge pull request #297 from losynix/fix_http_timeout
Browse files Browse the repository at this point in the history
Fix potential deadlock in ResponseStream
  • Loading branch information
losynix committed Sep 14, 2023
2 parents 752364a + 4789842 commit d225b3c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
33 changes: 21 additions & 12 deletions usbsas-server/src/appstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,35 +1154,44 @@ impl ResponseStream {
}
}

fn add_serialized_message(&mut self, message: &mut Vec<u8>) -> Result<(), ServiceError> {
fn add_serialized_message(
&mut self,
message: &mut Vec<u8>,
done: bool,
) -> Result<(), ServiceError> {
let mut messages = self.messages.lock()?;
messages.append(message);
// Also append "\r\n" in case multiple json messages are added between 2 polls
messages.append(&mut vec![13, 10]);
drop(messages);
if done {
self.done.store(true, Ordering::Relaxed);
}
if let Some(waker) = self.waker.lock()?.take() {
waker.wake();
}
Ok(())
}

fn add_message<T: Serialize>(&mut self, message: T) -> Result<(), ServiceError> {
self.add_serialized_message(&mut serde_json::to_vec(&message)?)
self.add_serialized_message(&mut serde_json::to_vec(&message)?, false)
}

fn report_progress(&mut self, status: &str, progress: f32) -> Result<(), ServiceError> {
self.add_message(ReportProgress { status, progress })
}

fn report_error(&mut self, msg: &str) -> Result<(), ServiceError> {
self.add_message(ReportError {
status: "fatal_error",
msg,
})?;
self.done()
self.add_serialized_message(
&mut serde_json::to_vec(&ReportError {
status: "fatal_error",
msg,
})?,
true,
)
}

pub fn done(&mut self) -> Result<(), ServiceError> {
let _messages = self.messages.lock().unwrap();
self.done.store(true, Ordering::Relaxed);
if let Some(waker) = self.waker.lock()?.take() {
waker.wake();
Expand All @@ -1200,16 +1209,16 @@ impl Drop for ResponseStream {
impl futures::Stream for ResponseStream {
type Item = Result<web::Bytes, actix_web::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.messages.lock().unwrap().len() == 0 {
let mut messages = self.messages.lock().unwrap();
if messages.len() == 0 {
if self.done.load(Ordering::Relaxed) {
return Poll::Ready(None);
} else {
*self.waker.lock().unwrap() = Some(cx.waker().clone());
return Poll::Pending;
}
}
Poll::Ready(Some(Ok(web::Bytes::copy_from_slice(
self.messages.lock().unwrap().drain(0..).as_slice(),
))))
let mess = messages.drain(0..);
Poll::Ready(Some(Ok(web::Bytes::copy_from_slice(mess.as_slice()))))
}
}
1 change: 0 additions & 1 deletion usbsas-server/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ impl IntegrationTester {
.expect("Couldn't run analyzer server");

let client = Client::builder()
.timeout(None)
.connect_timeout(Duration::from_secs(30))
.build()
.expect("couldn't build reqwest client");
Expand Down

0 comments on commit d225b3c

Please sign in to comment.