-
Notifications
You must be signed in to change notification settings - Fork 916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chain head listener: workaround watcher deadlock #2982
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,9 @@ use graph::{ | |
prometheus::{CounterVec, GaugeVec}, | ||
util::timed_rw_lock::TimedRwLock, | ||
}; | ||
use std::str::FromStr; | ||
use std::sync::Arc; | ||
use std::sync::{atomic::AtomicBool, Arc}; | ||
use std::{collections::BTreeMap, time::Duration}; | ||
use std::{str::FromStr, sync::atomic}; | ||
|
||
use lazy_static::lazy_static; | ||
|
||
|
@@ -38,16 +38,20 @@ lazy_static! { | |
} | ||
|
||
struct Watcher { | ||
sender: watch::Sender<()>, | ||
sender: Arc<watch::Sender<()>>, | ||
receiver: watch::Receiver<()>, | ||
} | ||
|
||
impl Watcher { | ||
fn new() -> Self { | ||
let (sender, receiver) = watch::channel(()); | ||
Watcher { sender, receiver } | ||
Watcher { | ||
sender: Arc::new(sender), | ||
receiver, | ||
} | ||
} | ||
|
||
#[allow(dead_code)] | ||
fn send(&self) { | ||
// Unwrap: `self` holds a receiver. | ||
self.sender.send(()).unwrap() | ||
|
@@ -147,6 +151,7 @@ impl ChainHeadUpdateListener { | |
) { | ||
// Process chain head updates in a dedicated task | ||
graph::spawn(async move { | ||
let sending_to_watcher = Arc::new(AtomicBool::new(false)); | ||
while let Some(notification) = receiver.recv().await { | ||
// Create ChainHeadUpdate from JSON | ||
let update: ChainHeadUpdate = | ||
|
@@ -173,9 +178,19 @@ impl ChainHeadUpdateListener { | |
|
||
// If there are subscriptions for this network, notify them. | ||
if let Some(watcher) = watchers.read(&logger).get(&update.network_name) { | ||
debug!(logger, "sending chain head update"; "network" => &update.network_name); | ||
watcher.send(); | ||
debug!(logger, "chain head update sent"; "network" => &update.network_name); | ||
// Due to a tokio bug, we must assume that the watcher can deadlock, see | ||
// https://github.com/tokio-rs/tokio/issues/4246. | ||
if !sending_to_watcher.load(atomic::Ordering::SeqCst) { | ||
let sending_to_watcher = sending_to_watcher.cheap_clone(); | ||
let sender = watcher.sender.cheap_clone(); | ||
tokio::task::spawn_blocking(move || { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can that lead to a large number of dead threads hanging around? Maybe we need another There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, of course, you are right. |
||
sending_to_watcher.store(true, atomic::Ordering::SeqCst); | ||
sender.send(()).unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before we were using the |
||
sending_to_watcher.store(false, atomic::Ordering::SeqCst); | ||
}); | ||
} else { | ||
debug!(logger, "skipping chain head update, watcher is deadlocked"; "network" => &update.network_name); | ||
} | ||
} | ||
} | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We didn't
unwrap
this call before?