How can I send messages on a websocket whenever a tokio::sync::watch changes? #2652
-
How can I send messages on a websocket whenever a tokio::sync::watch changes?I'm getting a lot of lifetime/ownership issues when trying to do this. I've been struggling for days trying to get these 2 components to fit together. Is it even possible to do this in "safe Rust"? My idea to accomplish this was to spawn an async function in the while rx.changed().await.is_ok() {
println!("sending = {:?}", *rx.borrow());
ctx.text(*rx.borrow());
// `ctx` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
} Full code: use std::time::{Duration, Instant};
use actix::prelude::*;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use std::sync::Arc;
use tokio::sync::watch;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
#[actix_web::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info");
env_logger::init();
HttpServer::new(move || {
let state = State::new("");
App::new()
.app_data(web::Data::new(state))
.wrap(middleware::Logger::default())
.service(web::resource("/").route(web::get().to(ws_index)))
})
.bind("127.0.0.1:808")?
.run()
.await
}
/// do websocket handshake and start `MyWebSocket` actor
async fn ws_index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let evt = req.app_data::<web::Data<State>>().unwrap().clone();
ws::start(MyWebSocket::new(evt.into_inner().as_ref().clone()), &req, stream)
}
#[derive(Clone)]
struct State {
sender: Arc<watch::Sender<String>>,
receiver: watch::Receiver<String>
}
impl State {
fn new(init: impl AsRef<str>) -> Self {
let (sender, receiver) = watch::channel(init.as_ref().to_string());
Self { sender: Arc::new(sender), receiver }
}
}
struct MyWebSocket {
heartbeat: Instant,
state: State,
spawn_handle: Option<SpawnHandle>
}
impl MyWebSocket {
fn new(state: State) -> Self {
Self {
heartbeat: Instant::now(),
spawn_handle: None,
state,
}
}
fn set_value(&self, value: impl AsRef<str>) {
if let Err(err) = self.state.sender.send(value.as_ref().to_string()) {
println!("Error {}", err);
}
}
}
impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.heartbeat) > CLIENT_TIMEOUT {
println!("Websocket Client heartbeat failed, disconnecting!");
ctx.stop();
return;
}
ctx.ping(b"");
});
let mut rx = self.state.receiver.clone();
let future = async move {
while rx.changed().await.is_ok() {
println!("sending = {:?}", *rx.borrow());
ctx.text(*rx.borrow());
// `ctx` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
}
};
self.spawn_handle = Some(ctx.spawn(future.into_actor(self)));
}
fn stopped(&mut self, ctx: &mut Self::Context) {
if let Some(handle) = &self.spawn_handle {
ctx.cancel_future(*handle);
self.spawn_handle = None;
}
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
println!("WS: {:?}", msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
self.heartbeat = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.heartbeat = Instant::now();
}
Ok(ws::Message::Text(text)) => self.set_value(text),
Ok(ws::Message::Binary(bin)) => {
let text = std::str::from_utf8(&bin).unwrap_or("");
self.set_value(text);
},
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => ctx.stop(),
}
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
actix does not suppor reference futures. which means anything like this would not work: fn handle(&mut self, msg: M, ctx: &mut Context<Self>) {
async {
let _ = &self;
let _ = &msg;
let _ = &ctx;
}
} You sync channel usage is actually a stream and you can use impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.heartbeat) > CLIENT_TIMEOUT {
println!("Websocket Client heartbeat failed, disconnecting!");
ctx.stop();
return;
}
ctx.ping(b"");
});
let mut rx = self.state.receiver.clone();
self.spawn_handle = Some(ctx.add_stream(async_stream::stream! {
while rx.changed().await.is_ok() {
yield rx.borrow().to_string()
};
}));
}
}
impl StreamHandler<String> for MyWebSocket {
fn handle(
&mut self,
msg: String,
ctx: &mut Self::Context,
) {
ctx.text(msg);
}
}
|
Beta Was this translation helpful? Give feedback.
actix does not suppor reference futures. which means anything like this would not work:
You sync channel usage is actually a stream and you can use
Context::add_stream
for handling it asActorStream
.