Skip to content

Commit

Permalink
refactor: fix #166 by ignoring different ws message types
Browse files Browse the repository at this point in the history
  • Loading branch information
1c3t3a committed Mar 19, 2022
1 parent cfd012d commit 4ea0038
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 39 deletions.
26 changes: 16 additions & 10 deletions engineio/src/transports/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,23 @@ impl Transport for WebsocketTransport {
fn poll(&self) -> Result<Bytes> {
let mut receiver = self.receiver.lock()?;

// if this is a binary payload, we mark it as a message
let received_df = receiver.recv_dataframe()?;
match received_df.opcode {
Opcode::Binary => {
let mut message = BytesMut::with_capacity(received_df.data.len() + 1);
message.put_u8(PacketId::Message as u8);
message.put(received_df.take_payload().as_ref());

Ok(message.freeze())
loop {
// if this is a binary payload, we mark it as a message
let received_df = receiver.recv_dataframe()?;
match received_df.opcode {
Opcode::Binary => {
let mut message = BytesMut::with_capacity(received_df.data.len() + 1);
message.put_u8(PacketId::Message as u8);
message.put(received_df.take_payload().as_ref());

return Ok(message.freeze());
}
Opcode::Text => {
return Ok(Bytes::from(received_df.take_payload()));
}
// ignore ping/pong messages
_ => (),
}
_ => Ok(Bytes::from(received_df.take_payload())),
}
}

Expand Down
62 changes: 33 additions & 29 deletions engineio/src/transports/websocket_secure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,41 +101,45 @@ impl Transport for WebsocketSecureTransport {
}

fn poll(&self) -> Result<Bytes> {
let received_df: DataFrame;
loop {
let mut receiver = self.client.lock()?;
receiver.set_nonblocking(true)?;

match receiver.recv_dataframe() {
Ok(payload) => {
received_df = payload;
break;
let received_df: DataFrame;
loop {
let mut receiver = self.client.lock()?;
receiver.set_nonblocking(true)?;

match receiver.recv_dataframe() {
Ok(payload) => {
received_df = payload;
break;
}
// Special case to fix https://github.com/1c3t3a/rust-socketio/issues/133
// This error occures when the websocket connection times out on the receive method.
// The error kind is platform specific, on Unix systems this errors with `ErrorKind::WouldBlock`,
// on Windows with `ErrorKind::TimedOut`.
// As a result we're going to release the lock on the client,
// so that other threads (especially emit) are able to access the client.
Err(WebSocketError::IoError(err))
if matches!(err.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => {}
Err(err) => return Err(err.into()),
}
// Special case to fix https://github.com/1c3t3a/rust-socketio/issues/133
// This error occures when the websocket connection times out on the receive method.
// The error kind is platform specific, on Unix systems this errors with `ErrorKind::WouldBlock`,
// on Windows with `ErrorKind::TimedOut`.
// As a result we're going to release the lock on the client,
// so that other threads (especially emit) are able to access the client.
Err(WebSocketError::IoError(err))
if matches!(err.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => {}
Err(err) => return Err(err.into()),
receiver.set_nonblocking(false)?;
drop(receiver);
sleep(Duration::from_millis(200));
}
receiver.set_nonblocking(false)?;
drop(receiver);
sleep(Duration::from_millis(200));
}

// if this is a binary payload, we mark it as a message
match received_df.opcode {
Opcode::Binary => {
let mut message = BytesMut::with_capacity(received_df.data.len() + 1);
message.put_u8(PacketId::Message as u8);
message.put(received_df.take_payload().as_ref());
// if this is a binary payload, we mark it as a message
match received_df.opcode {
Opcode::Binary => {
let mut message = BytesMut::with_capacity(received_df.data.len() + 1);
message.put_u8(PacketId::Message as u8);
message.put(received_df.take_payload().as_ref());

Ok(message.freeze())
return Ok(message.freeze());
}
Opcode::Text => return Ok(Bytes::from(received_df.take_payload())),
// ignore ping/pong messages
_ => (),
}
_ => Ok(Bytes::from(received_df.take_payload())),
}
}

Expand Down

0 comments on commit 4ea0038

Please sign in to comment.