New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle ordering bin packets #35
Handle ordering bin packets #35
Conversation
eaaf422
to
ce505df
Compare
I don't now if it is worth it, If I understood this well, you are going to implement another channel/queue of message in the We are moving away from the original problem which was "how can we give back owned data to the user in case of error". |
currently user is able to break flow, if main packet successfully sent, what should do user if they don't retry sending binaries. |
Okay, nevermind, I thought it was for all failed packets. |
if it's impossible to send failed attachments again during new send call - it will return error and new not serialized message. the main idea of this PR is preventing flow breaks |
Moreover currently if user tries to emit two events containing attachments in parallel it will break flow also, even without any retry logic |
@Totodore Any suggestions about the name of GoodNameError? other suggestions? |
I would say |
dab2ba7
to
5fc9c21
Compare
done! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a good solution, however it is quite complex so it requires some doc to understand the process. And I'm afraid it will be hard to add some features over this (like for instance the Remote adapter feature and syncronisation between multiple instances).
Could you add an example in the example crate to show how to manage errors like that ?
socketioxide/src/socket.rs
Outdated
@@ -60,6 +170,7 @@ impl<A: Adapter> Socket<A> { | |||
sid, | |||
extensions: Extensions::new(), | |||
config, | |||
sender: Mutex::new(PacketSender::new(tx, VecDeque::new())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The introduction of a Mutex make impossible to send things in a multithreading way. I know it is necessary though. But could you benchmark this with a high throughput for one socket and the ping/pong delay and memory comsumption to be sure it is not an issue ?
socketioxide/src/lib.rs
Outdated
@@ -61,7 +61,8 @@ | |||
//! ``` | |||
|
|||
pub mod adapter; | |||
pub mod retryer; | |||
pub mod errors; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only export needed error like below and not the whole mod
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didnt get you. I used this mod in example to handle errors. Did you mean it's needed to split into public and private error modules?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just like line 68 where it is already done (only AckError
and Error
are exported from root level)
23e47bf
to
fb409f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I said before, I know that it is necessary to have a correct retry mecanism. I think it is really important for the user to abstract things. As we can see in the example it is really painful to do that each time we wan't to emit something.
For instance we could add automatic retry in case of failure configurable from the SocketIoConfig
. Or just try to reemit failed packets in the background at the next .emit() call.
Another way that we didn't considered (and it's a bit too late maybe) is to have a .on_error
which is called when there is failed packets...
@@ -0,0 +1,82 @@ | |||
//! This a end to end test server used with this [test suite](https://github.com/socketio/socket.io-protocol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a little description about the example
.ping_interval(Duration::from_millis(300)) | ||
.ping_timeout(Duration::from_millis(200)) | ||
.max_payload(1e6 as u64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These weird configs are used only for the test suites and shouldn't be in the examples (that's why I moved e2e testing binaries from the examples)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure that it should be in examples at all, it's not mandatory to handle this error
|
||
socket.on("message", |socket, data: Value, bin, _| async move { | ||
info!("Received event: {:?} {:?}", data, bin); | ||
if let Err(BroadcastError::SendError(mut errors)) = | ||
socket.bin(bin).emit("message-back", data) | ||
{ | ||
let err = errors.pop().unwrap(); | ||
if let SendError::TransportError(TransportError::SendFailedBinPayloads(_)) = err | ||
{ | ||
while let Err(TransportError::SendFailedBinPayloads(_)) = | ||
socket.retry_failed() | ||
{ | ||
sleep(Duration::from_millis(10)).await; | ||
info!("retry"); | ||
} | ||
} | ||
} | ||
}); | ||
|
||
socket.on("message-with-ack", |_, data: Value, bin, ack| async move { | ||
info!("Received event: {:?} {:?}", data, bin); | ||
if let Err(AckSenderError::SendError { | ||
send_error: SendError::TransportError(TransportError::SendFailedBinPayloads(_)), | ||
socket, | ||
}) = ack.bin(bin).send(data) | ||
{ | ||
while let Err(TransportError::SendFailedBinPayloads(_)) = socket.retry_failed() | ||
{ | ||
sleep(Duration::from_millis(10)).await; | ||
info!("retry"); | ||
} | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The control flow is hard and the retry mecanism complex... It requires some comments so the reader easily understand what is happening
Actually any call of |
55ebfbd
to
b03e917
Compare
So I have a suggestion... Currently I'm not very satisfied about the end user experience who wan't to manage the if let Err(BroadcastError::SendError(mut errors)) =
socket.bin(bin).emit("message-back", data)
{
let err = errors.pop().unwrap();
if let SendError::TransportError(TransportError::SendFailedBinPayloads(_)) = err
{
while let Err(TransportError::SendFailedBinPayloads(_)) =
socket.retry_failed()
{
sleep(Duration::from_millis(10)).await;
info!("retry");
}
}
} Moreover, it is a saw as a bad practice to discard errors so it will be weird for the user to almost always discard What I propose is :
I know that it is a bit too late and that you did a lot about returning What do you think @biryukovmaxim |
I like the idea of using special handler more than handling every emit call, however the handler should provide Error enum as input, so user has to match this error. It will expect almost the same code from user as in the example but it will include more variants of error. I think we can keep internal logic the same as in the pr. So public functions like |
I think it's possible to use warn and return unit right now, make errors private again. In further implement the error handler logic handling errors in proper way |
Let's do this ! :) |
send packets via locked PacketSender owned by socket
b03e917
to
730f787
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now we can merge this and we will add on_error handler in another PR.
Do you agree @biryukovmaxim ?
current mechanism of retrying/resending packets in socketioxide can brake flow in case of retry launches when another binary payload sends.
moreover, if anybody tries to send packets and one of them includes binary attachment,it can break flow also.
Suggested solution:
internal packet sender logic:
public API changes:
socket.retry_failed() - tries to resend failed queue messages
AckSender returns error containing socket and SendError to make it possible to resend ack