diff --git a/store/postgres/src/chain_head_listener.rs b/store/postgres/src/chain_head_listener.rs index bdb249d1944..d9691cdf0d3 100644 --- a/store/postgres/src/chain_head_listener.rs +++ b/store/postgres/src/chain_head_listener.rs @@ -7,9 +7,9 @@ use graph::{ prometheus::{CounterVec, GaugeVec}, util::timed_rw_lock::TimedRwLock, }; -use std::str::FromStr; -use std::sync::Arc; +use std::sync::{atomic::AtomicBool, Arc}; use std::{collections::BTreeMap, time::Duration}; +use std::{str::FromStr, sync::atomic}; use lazy_static::lazy_static; @@ -38,16 +38,20 @@ lazy_static! { } struct Watcher { - sender: watch::Sender<()>, + sender: Arc>, receiver: watch::Receiver<()>, } impl Watcher { fn new() -> Self { let (sender, receiver) = watch::channel(()); - Watcher { sender, receiver } + Watcher { + sender: Arc::new(sender), + receiver, + } } + #[allow(dead_code)] fn send(&self) { // Unwrap: `self` holds a receiver. self.sender.send(()).unwrap() @@ -147,6 +151,7 @@ impl ChainHeadUpdateListener { ) { // Process chain head updates in a dedicated task graph::spawn(async move { + let sending_to_watcher = Arc::new(AtomicBool::new(false)); while let Some(notification) = receiver.recv().await { // Create ChainHeadUpdate from JSON let update: ChainHeadUpdate = @@ -173,9 +178,19 @@ impl ChainHeadUpdateListener { // If there are subscriptions for this network, notify them. if let Some(watcher) = watchers.read(&logger).get(&update.network_name) { - debug!(logger, "sending chain head update"; "network" => &update.network_name); - watcher.send(); - debug!(logger, "chain head update sent"; "network" => &update.network_name); + // Due to a tokio bug, we must assume that the watcher can deadlock, see + // https://github.com/tokio-rs/tokio/issues/4246. + if !sending_to_watcher.load(atomic::Ordering::SeqCst) { + let sending_to_watcher = sending_to_watcher.cheap_clone(); + let sender = watcher.sender.cheap_clone(); + tokio::task::spawn_blocking(move || { + sending_to_watcher.store(true, atomic::Ordering::SeqCst); + sender.send(()).unwrap(); + sending_to_watcher.store(false, atomic::Ordering::SeqCst); + }); + } else { + debug!(logger, "skipping chain head update, watcher is deadlocked"; "network" => &update.network_name); + } } } });