diff --git a/datasource/ethereum/src/block_stream.rs b/datasource/ethereum/src/block_stream.rs index ab2d7ebc513..b2d19a5079f 100644 --- a/datasource/ethereum/src/block_stream.rs +++ b/datasource/ethereum/src/block_stream.rs @@ -1,6 +1,5 @@ use futures::prelude::*; use futures::sync::mpsc::{channel, Receiver, Sender}; -use std; use std::cmp; use std::collections::HashSet; use std::env; @@ -1059,14 +1058,13 @@ where // Create a chain head update stream whose lifetime is tied to the // liftetime of the block stream; we do this to immediately terminate // the chain head update listener when the block stream is shut down - let mut chain_head_update_listener = self.chain_store.chain_head_updates(); let cancel_guard = CancelGuard::new(); - let chain_head_update_stream = chain_head_update_listener - .take_event_stream() - .unwrap() - .cancelable(&cancel_guard, move || { - debug!(logger_for_stream, "Terminating chain head updates"); - }); + let chain_head_update_stream = + self.chain_store + .chain_head_updates() + .cancelable(&cancel_guard, move || { + debug!(logger_for_stream, "Terminating chain head updates"); + }); // Create the actual subgraph-specific block stream let block_stream = BlockStream::new( @@ -1090,13 +1088,6 @@ where .map(|_| ()), ); - // Start listening for chain head updates - chain_head_update_listener.start(); - - // Leak the chain update listener; we'll terminate it by closing the - // block stream's chain head update sink - std::mem::forget(chain_head_update_listener); - block_stream } } diff --git a/graph/src/components/ethereum/listener.rs b/graph/src/components/ethereum/listener.rs index 793cf84b718..1dd2170cc3f 100644 --- a/graph/src/components/ethereum/listener.rs +++ b/graph/src/components/ethereum/listener.rs @@ -1,9 +1,8 @@ +use futures::Stream; use serde::de::{Deserialize, Deserializer, Error as DeserializerError}; use std::str::FromStr; use web3::types::H256; -use crate::components::EventProducer; - /// Deserialize an H256 hash (with or without '0x' prefix). fn deserialize_h256<'de, D>(deserializer: D) -> Result where @@ -22,7 +21,9 @@ pub struct ChainHeadUpdate { pub head_block_number: u64, } -pub trait ChainHeadUpdateListener: EventProducer { - /// Begin processing notifications coming in from Postgres. - fn start(&mut self); +pub type ChainHeadUpdateStream = Box + Send>; + +pub trait ChainHeadUpdateListener { + // Subscribe to chain head updates. + fn subscribe(&self) -> ChainHeadUpdateStream; } diff --git a/graph/src/components/ethereum/mod.rs b/graph/src/components/ethereum/mod.rs index 3771978aac5..5451b2322d9 100644 --- a/graph/src/components/ethereum/mod.rs +++ b/graph/src/components/ethereum/mod.rs @@ -8,7 +8,7 @@ pub use self::adapter::{ EthereumContractState, EthereumContractStateError, EthereumContractStateRequest, EthereumLogFilter, EthereumNetworkIdentifier, }; -pub use self::listener::{ChainHeadUpdate, ChainHeadUpdateListener}; +pub use self::listener::{ChainHeadUpdate, ChainHeadUpdateListener, ChainHeadUpdateStream}; pub use self::stream::{BlockStream, BlockStreamBuilder}; pub use self::types::{ EthereumBlock, EthereumBlockData, EthereumBlockPointer, EthereumEventData, diff --git a/graph/src/components/store.rs b/graph/src/components/store.rs index ce1d83d69c6..2793da0fa0a 100644 --- a/graph/src/components/store.rs +++ b/graph/src/components/store.rs @@ -1,8 +1,6 @@ use failure::Error; use futures::stream::poll_fn; -use futures::Future; -use futures::Stream; -use futures::{Async, Poll}; +use futures::{Async, Future, Poll, Stream}; use std::collections::HashSet; use std::env; use std::fmt; @@ -1017,7 +1015,7 @@ pub trait ChainStore: Send + Sync + 'static { fn attempt_chain_head_update(&self, ancestor_count: u64) -> Result, Error>; /// Subscribe to chain head updates. - fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener; + fn chain_head_updates(&self) -> ChainHeadUpdateStream; /// Get the current head block pointer for this chain. /// Any changes to the head block pointer will be to a block with a larger block number, never diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 32c123117fb..463c259a4da 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -60,10 +60,10 @@ pub mod prelude { pub use tokio::prelude::*; pub use crate::components::ethereum::{ - BlockStream, BlockStreamBuilder, ChainHeadUpdate, ChainHeadUpdateListener, EthereumAdapter, - EthereumAdapterError, EthereumBlock, EthereumBlockData, EthereumBlockPointer, - EthereumContractCall, EthereumContractCallError, EthereumEventData, EthereumLogFilter, - EthereumNetworkIdentifier, EthereumTransactionData, + BlockStream, BlockStreamBuilder, ChainHeadUpdate, ChainHeadUpdateListener, + ChainHeadUpdateStream, EthereumAdapter, EthereumAdapterError, EthereumBlock, + EthereumBlockData, EthereumBlockPointer, EthereumContractCall, EthereumContractCallError, + EthereumEventData, EthereumLogFilter, EthereumNetworkIdentifier, EthereumTransactionData, }; pub use crate::components::graphql::{ GraphQlRunner, QueryResultFuture, SubscriptionResultFuture, diff --git a/mock/src/store.rs b/mock/src/store.rs index 60da345adc8..39a441a976d 100644 --- a/mock/src/store.rs +++ b/mock/src/store.rs @@ -15,13 +15,7 @@ use graph_graphql::prelude::api_schema; pub struct MockChainHeadUpdateListener {} impl ChainHeadUpdateListener for MockChainHeadUpdateListener { - fn start(&mut self) {} -} - -impl EventProducer for MockChainHeadUpdateListener { - fn take_event_stream( - &mut self, - ) -> Option + Send>> { + fn subscribe(&self) -> ChainHeadUpdateStream { unimplemented!(); } } @@ -385,7 +379,7 @@ impl ChainStore for MockStore { unimplemented!(); } - fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener { + fn chain_head_updates(&self) -> ChainHeadUpdateStream { unimplemented!(); } @@ -504,7 +498,7 @@ impl ChainStore for FakeStore { unimplemented!(); } - fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener { + fn chain_head_updates(&self) -> ChainHeadUpdateStream { unimplemented!(); } diff --git a/store/postgres/src/chain_head_listener.rs b/store/postgres/src/chain_head_listener.rs index 49608a3afe5..6740568c72e 100644 --- a/store/postgres/src/chain_head_listener.rs +++ b/store/postgres/src/chain_head_listener.rs @@ -1,40 +1,60 @@ -use crate::notification_listener::{NotificationListener, SafeChannelName}; +use futures::sync::mpsc::{channel, Sender}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use uuid::Uuid; + use graph::prelude::{ChainHeadUpdateListener as ChainHeadUpdateListenerTrait, *}; use graph::serde_json; +use crate::notification_listener::{NotificationListener, SafeChannelName}; + +type ChainHeadUpdateSubscribers = Arc>>>; + pub struct ChainHeadUpdateListener { - notification_listener: NotificationListener, - network_name: String, + logger: Logger, + subscribers: ChainHeadUpdateSubscribers, + _listener: NotificationListener, } impl ChainHeadUpdateListener { pub fn new(logger: &Logger, postgres_url: String, network_name: String) -> Self { + let logger = logger.new(o!("component" => "ChainHeadUpdateListener")); + let subscribers = Arc::new(RwLock::new(HashMap::new())); + + // Create a Postgres notification listener for chain head updates + let mut listener = NotificationListener::new( + &logger, + postgres_url, + SafeChannelName::i_promise_this_is_safe("chain_head_updates"), + ); + + Self::listen(&logger, &mut listener, network_name, subscribers.clone()); + ChainHeadUpdateListener { - notification_listener: NotificationListener::new( - logger, - postgres_url, - SafeChannelName::i_promise_this_is_safe("chain_head_updates"), - ), - network_name, - } - } -} + logger, + subscribers, -impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener { - fn start(&mut self) { - self.notification_listener.start() + // We keep the listener around to tie its stream's lifetime to + // that of the chain head update listener and prevent it from + // terminating early + _listener: listener, + } } -} -impl EventProducer for ChainHeadUpdateListener { - fn take_event_stream( - &mut self, - ) -> Option + Send>> { - let network_name = self.network_name.clone(); + fn listen( + logger: &Logger, + listener: &mut NotificationListener, + network_name: String, + subscribers: ChainHeadUpdateSubscribers, + ) { + let logger = logger.clone(); - self.notification_listener.take_event_stream().map( - move |stream| -> Box + Send> { - Box::new(stream.filter_map(move |notification| { + // Process chain head updates in a dedicated task + tokio::spawn( + listener + .take_event_stream() + .unwrap() + .filter_map(move |notification| { // Create ChainHeadUpdate from JSON let update: ChainHeadUpdate = serde_json::from_value(notification.payload.clone()).unwrap_or_else(|_| { @@ -44,14 +64,65 @@ impl EventProducer for ChainHeadUpdateListener { ) }); - // Only include update if about the right network + // Only include update if it is for the network we're interested in if update.network_name == network_name { Some(update) } else { None } - })) - }, - ) + }) + .for_each(move |update| { + let logger = logger.clone(); + let senders = subscribers.read().unwrap().clone(); + let subscribers = subscribers.clone(); + + debug!( + logger, + "Received chain head update"; + "network" => &update.network_name, + "head_block_hash" => format!("{}", update.head_block_hash), + "head_block_number" => &update.head_block_number, + ); + + // Forward update to all susbcribers + stream::iter_ok::<_, ()>(senders).for_each(move |(id, sender)| { + let logger = logger.clone(); + let subscribers = subscribers.clone(); + + sender.send(update.clone()).then(move |result| { + if result.is_err() { + // If sending to a subscriber fails, we'll assume that + // the receiving end has been dropped. In this case we + // remove the subscriber + debug!(logger, "Unsubscribe"; "id" => &id); + subscribers.write().unwrap().remove(&id); + } + + // Move on to the next subscriber + Ok(()) + }) + }) + }), + ); + + // We're ready, start listening to chain head updaates + listener.start(); + } +} + +impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener { + fn subscribe(&self) -> ChainHeadUpdateStream { + // Generate a new (unique) UUID; we're looping just to be sure we avoid collisions + let mut id = Uuid::new_v4().to_string(); + while self.subscribers.read().unwrap().contains_key(&id) { + id = Uuid::new_v4().to_string(); + } + + debug!(self.logger, "Subscribe"; "id" => &id); + + // Create a subscriber and return the receiving end + let (sender, receiver) = channel(100); + self.subscribers.write().unwrap().insert(id, sender); + Box::new(receiver) } } diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index eb64977873f..56cfb53974a 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -12,7 +12,7 @@ use uuid::Uuid; use crate::notification_listener::JsonNotification; use graph::components::store::Store as StoreTrait; use graph::data::subgraph::schema::*; -use graph::prelude::*; +use graph::prelude::{ChainHeadUpdateListener as _, *}; use graph::serde_json; use graph::web3::types::H256; use graph::{tokio, tokio::timer::Interval}; @@ -61,7 +61,7 @@ pub struct Store { subscriptions: Arc>>>, // listen to StoreEvents emitted by emit_store_events listener: StoreEventListener, - postgres_url: String, + chain_head_update_listener: ChainHeadUpdateListener, network_name: String, genesis_block_ptr: EthereumBlockPointer, conn: Pool>, @@ -108,7 +108,11 @@ impl Store { logger: logger.clone(), subscriptions: Arc::new(RwLock::new(HashMap::new())), listener, - postgres_url: config.postgres_url.clone(), + chain_head_update_listener: ChainHeadUpdateListener::new( + &logger, + config.postgres_url, + config.network_name.clone(), + ), network_name: config.network_name.clone(), genesis_block_ptr: (net_identifiers.genesis_block_hash, 0u64).into(), conn: pool, @@ -981,12 +985,8 @@ impl ChainStore for Store { .and_then(|r| r.map_err(Error::from)) } - fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener { - Self::ChainHeadUpdateListener::new( - &self.logger, - self.postgres_url.clone(), - self.network_name.clone(), - ) + fn chain_head_updates(&self) -> ChainHeadUpdateStream { + self.chain_head_update_listener.subscribe() } fn chain_head_ptr(&self) -> Result, Error> {