Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages dropped silently when using SinkWrite #431

Closed
mavax opened this issue Oct 7, 2020 · 11 comments · Fixed by actix/actix-net#409
Closed

Messages dropped silently when using SinkWrite #431

mavax opened this issue Oct 7, 2020 · 11 comments · Fixed by actix/actix-net#409

Comments

@mavax
Copy link

mavax commented Oct 7, 2020

Expected Behavior

When 3 messages are written into SinkWrite, they should all be processed. Possibly related to: #384. Could also be somewhere in awc.

Current Behavior

The third message is not processed.

Steps to Reproduce (for bugs)

Example:

use actix::io::SinkWrite;
use actix::prelude::*;
use actix_codec::Framed;
use awc::{error::WsProtocolError, ws, BoxedSocket, Client};
use futures::stream::{SplitSink, SplitStream};
use futures_util::stream::StreamExt;

type WsFramedSink = SplitSink<Framed<BoxedSocket, ws::Codec>, ws::Message>;
type WsFramedStream = SplitStream<Framed<BoxedSocket, ws::Codec>>;
struct WsCmdClient {
    sink: SinkWrite<ws::Message, WsFramedSink>,
}

impl WsCmdClient {
    pub fn start(sink: WsFramedSink, _stream: WsFramedStream) -> Addr<Self> {
        WsCmdClient::create(|ctx| WsCmdClient {
            sink: SinkWrite::new(sink, ctx),
        })
    }
}
impl Actor for WsCmdClient {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Context<Self>) {
        println!("WsCmdClient started");
    }
}

impl actix::io::WriteHandler<WsProtocolError> for WsCmdClient {}

#[derive(Message, Debug)]
#[rtype(result = "()")]
struct MyMessage(u32);
impl Handler<MyMessage> for WsCmdClient {
    type Result = ();

    fn handle(&mut self, msg: MyMessage, _ctx: &mut Self::Context) {
        println!("In handler {:?}", msg);
        if let Some(error) = self.sink.write(ws::Message::Text(msg.0.to_string())) {
            println!("Error WsCmdClient {:?}", error);
        }
    }
}

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (_, framed) = Client::default()
        .ws("http://localhost:7878")
        .connect()
        .await?;
    let (sink, stream): (WsFramedSink, WsFramedStream) = framed.split();
    let addr = WsCmdClient::start(sink, stream);

    for i in 0..3 {
        addr.do_send(MyMessage(i));
    }
    let _ = tokio::signal::ctrl_c().await?;
    Ok(())
}

Opened a server with wscat -l 7878 and output of the example is:

WsCmdClient started
In handler MyMessage(0)
In handler MyMessage(1)
In handler MyMessage(2)

However the server only sees:

< 0
> 

In my real life example the third message is the first one missing.

Context

Messages dropped silently. Using in conjunction with awc::ws

Your Environment

  • OS
    Arch Linux: 5.8.3-arch1-1
    Also reproducing on Ubuntu 20.04
  • Rust Version (I.e, output of rustc -V):
    rustc 1.45.2 (d3fb005a3 2020-07-31)
  • Actix Version:
    0.10
    awc 2.0.0

I am currently investigating the issue, but I would appreciate if anybody has an idea on where the problem could be.

@mavax
Copy link
Author

mavax commented Oct 7, 2020

I have updated the message above with an example reproducing the issue.

@mavax
Copy link
Author

mavax commented Oct 7, 2020

Investigated a bit more.

Repeating the block:

    tokio::time::delay_for(Duration::from_secs(1)).await;
    for i in 0..3 {
        addr.do_send(MyMessage(i));
    }

In the example leads to two polls for every three messages subsequently. One poll in the first 3 messages.

Also, adding:

ctx.run_interval(Duration::from_millis(50), |_,_| {});

To the actor context makes it poll the SinkWrite and works around the issue.

The issue appears to be with the poll method in the SinkWriteFuture being called fewer times than expected, replacing the part that uses start_send with:

    while let Poll::Ready(res) = Pin::new(&mut inner.sink).poll_ready(cx) {
            match res {
                Ok(()) => {
                    if let Some(item) = inner.buffer.pop_front() {
                        // send front of buffer to sink                                                                                 
                        let _ = Pin::new(&mut inner.sink).start_send(item);
                    } else {
                        break;
                    }
                }
                Err(_err) => {}
            }
        }

Seems to work around this issue by processing several messages in one poll. However, I haven't given a lot of thought to the implementation and I don't know how many other things that would break, but I hope it helps!

@johsunds
Copy link

Any new info on this? Did you find the underlying issue?

@fakeshadow
Copy link
Contributor

The source of the problem should be poll_ready should take the context waker and wake up SinkWriteFuture later when it gives out pending.
And this should be an issue for actix-codec crate's Framed type which lives in actix-net repo.

@elpiel
Copy link

elpiel commented Oct 26, 2021

The source of the problem should be poll_ready should take the context waker and wake up SinkWriteFuture later when it gives out pending. And this should be an issue for actix-codec crate's Framed type which lives in actix-net repo.

I found the mentioned code and I'm pasting a link to it here. Could you guide me through the changes needed as I'd like to work on this issue for hacktoberfest?

https://github.com/actix/actix-net/blob/1c8fcaebbc2f385acece09f7fcc2fd565daae1c4/actix-codec/src/framed.rs#L303-L309

@thalesfragoso
Copy link
Member

@elpiel I haven't looked at it much in dept, but maybe a solution could be to call poll_flush when backpressuring in poll_ready ? See also tokio's implementation:
https://github.com/tokio-rs/tokio/blob/75c07770bfbfea4e5fd914af819c741ed9c3fc36/tokio-util/src/codec/framed_impl.rs#L258-L264

@fakeshadow
Copy link
Contributor

fakeshadow commented Nov 2, 2021

The source of the problem should be poll_ready should take the context waker and wake up SinkWriteFuture later when it gives out pending. And this should be an issue for actix-codec crate's Framed type which lives in actix-net repo.

I found the mentioned code and I'm pasting a link to it here. Could you guide me through the changes needed as I'd like to work on this issue for hacktoberfest?

https://github.com/actix/actix-net/blob/1c8fcaebbc2f385acece09f7fcc2fd565daae1c4/actix-codec/src/framed.rs#L303-L309

Sorry I have notification disabled.I personally would prefer to use tokio codec directly instead of a homebrew frame. That said a fix to current code is great too as it's much less work for the same purpose.
Like @thalesfragoso pointed out the tokio codec code could be helpful as it likely to be the source code where actix-codec copied/pasted.

@brockelmore
Copy link

brockelmore commented Nov 4, 2021

Any update here? i'm constantly hitting this via Continuation frames in a websocket actor. The run_interval workaround doesn't work for me for some reason. And the Framed type is so deeply integrated in actix that I can't fork and move quickly ahead. If the above solution wants to be taken from tokio it looks equivalent to:

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.is_write_buf_full() {
            self.flush()
        } else {
            Poll::Pending
        }
    }

If this is truly all that is needed i can throw together the PR as its a bit time sensitive for us.

@robjtede
Copy link
Member

robjtede commented Nov 4, 2021

if you want to give it a crack then feel free, this is not currently at top of priority list

@thalesfragoso
Copy link
Member

@brockelmore You need to register the waker when returning Poll::Pending, so something like this could work:

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    if self.is_write_ready() {
        Poll::Ready(Ok(()))
    } else {
        self.flush(cx)
    }
}

@robjtede
Copy link
Member

robjtede commented Nov 5, 2021

actix-codec 0.4.1 is released with this fixed, should just be a cargo update to get it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants