Have an application-global state that's accessible from request handlers #2805
-
Hello everyone, for my new project (https://github.com/Orbitale/pagoo), I'd like my HTTP server to be run conjointedly with a "worker" system (see the app graph, I hope it helps). I run the HTTP server, this works perfectly fine, but now I'd like my single request handler to be able to append elements to a queue, and alongside that, I'd like a separate thread to run an infinite loop that'd poll/listen (whatever method suits me as long as it works 🤣) to this queue and shift elements from the top of it on each iteration, one by one, and do something with it. All I need is as simple as that, however, the not-so-simple part is the Queue itself: how to make sure Actix-based request handlers can "mutate" some data that's on the main thread while another specific thread will "retrieve" this data and do something with it? I tried using a global "Queue" object, however I saw that Actix will create different instances of the App object for every spawned worker, so this means that every data injected into it via I'm out of ideas now, and after hours of research, I'm starting to think about using an external child process and communicate via RPC or even a data source (like files), or even try to use in-memory Sqlite for that (by using your "Databases" guide), but that would be intense polling on a third-party library that I hope could be avoided somehow. Does anyone have an idea on how to implement this kind of feature? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
Are you looking for something like this? #[derive(Debug)]
struct Message;
impl Message {
fn new() -> Self {
Self
}
}
#[actix_web::get("/")]
async fn handler(
sender: actix_web::web::Data<tokio::sync::mpsc::Sender<Message>>,
) -> actix_web::HttpResponse {
let _ = sender.send(Message::new()).await;
actix_web::HttpResponse::Ok().finish()
}
fn start_workers(mut receiver: tokio::sync::mpsc::Receiver<Message>) {
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
println!("Got message: {:?}", msg);
}
});
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let (tx, rx) = tokio::sync::mpsc::channel(8);
start_workers(rx);
actix_web::HttpServer::new(move || {
actix_web::App::new()
.app_data(actix_web::web::Data::new(tx.clone()))
.service(handler)
})
.bind("0.0.0.0:8080")?
.run()
.await
} |
Beta Was this translation helpful? Give feedback.
Are you looking for something like this?