Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions datasource/ethereum/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use futures::prelude::*;
use futures::sync::mpsc::{channel, Receiver, Sender};
use std;
use std::cmp;
use std::collections::HashSet;
use std::env;
Expand Down Expand Up @@ -1059,14 +1058,13 @@ where
// Create a chain head update stream whose lifetime is tied to the
// liftetime of the block stream; we do this to immediately terminate
// the chain head update listener when the block stream is shut down
let mut chain_head_update_listener = self.chain_store.chain_head_updates();
let cancel_guard = CancelGuard::new();
let chain_head_update_stream = chain_head_update_listener
.take_event_stream()
.unwrap()
.cancelable(&cancel_guard, move || {
debug!(logger_for_stream, "Terminating chain head updates");
});
let chain_head_update_stream =
self.chain_store
.chain_head_updates()
.cancelable(&cancel_guard, move || {
debug!(logger_for_stream, "Terminating chain head updates");
});

// Create the actual subgraph-specific block stream
let block_stream = BlockStream::new(
Expand All @@ -1090,13 +1088,6 @@ where
.map(|_| ()),
);

// Start listening for chain head updates
chain_head_update_listener.start();

// Leak the chain update listener; we'll terminate it by closing the
// block stream's chain head update sink
std::mem::forget(chain_head_update_listener);

block_stream
}
}
11 changes: 6 additions & 5 deletions graph/src/components/ethereum/listener.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use futures::Stream;
use serde::de::{Deserialize, Deserializer, Error as DeserializerError};
use std::str::FromStr;
use web3::types::H256;

use crate::components::EventProducer;

/// Deserialize an H256 hash (with or without '0x' prefix).
fn deserialize_h256<'de, D>(deserializer: D) -> Result<H256, D::Error>
where
Expand All @@ -22,7 +21,9 @@ pub struct ChainHeadUpdate {
pub head_block_number: u64,
}

pub trait ChainHeadUpdateListener: EventProducer<ChainHeadUpdate> {
/// Begin processing notifications coming in from Postgres.
fn start(&mut self);
pub type ChainHeadUpdateStream = Box<Stream<Item = ChainHeadUpdate, Error = ()> + Send>;

pub trait ChainHeadUpdateListener {
// Subscribe to chain head updates.
fn subscribe(&self) -> ChainHeadUpdateStream;
}
2 changes: 1 addition & 1 deletion graph/src/components/ethereum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use self::adapter::{
EthereumContractState, EthereumContractStateError, EthereumContractStateRequest,
EthereumLogFilter, EthereumNetworkIdentifier,
};
pub use self::listener::{ChainHeadUpdate, ChainHeadUpdateListener};
pub use self::listener::{ChainHeadUpdate, ChainHeadUpdateListener, ChainHeadUpdateStream};
pub use self::stream::{BlockStream, BlockStreamBuilder};
pub use self::types::{
EthereumBlock, EthereumBlockData, EthereumBlockPointer, EthereumEventData,
Expand Down
6 changes: 2 additions & 4 deletions graph/src/components/store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use failure::Error;
use futures::stream::poll_fn;
use futures::Future;
use futures::Stream;
use futures::{Async, Poll};
use futures::{Async, Future, Poll, Stream};
use std::collections::HashSet;
use std::env;
use std::fmt;
Expand Down Expand Up @@ -1017,7 +1015,7 @@ pub trait ChainStore: Send + Sync + 'static {
fn attempt_chain_head_update(&self, ancestor_count: u64) -> Result<Vec<H256>, Error>;

/// Subscribe to chain head updates.
fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener;
fn chain_head_updates(&self) -> ChainHeadUpdateStream;

/// Get the current head block pointer for this chain.
/// Any changes to the head block pointer will be to a block with a larger block number, never
Expand Down
8 changes: 4 additions & 4 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ pub mod prelude {
pub use tokio::prelude::*;

pub use crate::components::ethereum::{
BlockStream, BlockStreamBuilder, ChainHeadUpdate, ChainHeadUpdateListener, EthereumAdapter,
EthereumAdapterError, EthereumBlock, EthereumBlockData, EthereumBlockPointer,
EthereumContractCall, EthereumContractCallError, EthereumEventData, EthereumLogFilter,
EthereumNetworkIdentifier, EthereumTransactionData,
BlockStream, BlockStreamBuilder, ChainHeadUpdate, ChainHeadUpdateListener,
ChainHeadUpdateStream, EthereumAdapter, EthereumAdapterError, EthereumBlock,
EthereumBlockData, EthereumBlockPointer, EthereumContractCall, EthereumContractCallError,
EthereumEventData, EthereumLogFilter, EthereumNetworkIdentifier, EthereumTransactionData,
};
pub use crate::components::graphql::{
GraphQlRunner, QueryResultFuture, SubscriptionResultFuture,
Expand Down
12 changes: 3 additions & 9 deletions mock/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@ use graph_graphql::prelude::api_schema;
pub struct MockChainHeadUpdateListener {}

impl ChainHeadUpdateListener for MockChainHeadUpdateListener {
fn start(&mut self) {}
}

impl EventProducer<ChainHeadUpdate> for MockChainHeadUpdateListener {
fn take_event_stream(
&mut self,
) -> Option<Box<Stream<Item = ChainHeadUpdate, Error = ()> + Send>> {
fn subscribe(&self) -> ChainHeadUpdateStream {
unimplemented!();
}
}
Expand Down Expand Up @@ -385,7 +379,7 @@ impl ChainStore for MockStore {
unimplemented!();
}

fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener {
fn chain_head_updates(&self) -> ChainHeadUpdateStream {
unimplemented!();
}

Expand Down Expand Up @@ -504,7 +498,7 @@ impl ChainStore for FakeStore {
unimplemented!();
}

fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener {
fn chain_head_updates(&self) -> ChainHeadUpdateStream {
unimplemented!();
}

Expand Down
127 changes: 99 additions & 28 deletions store/postgres/src/chain_head_listener.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,60 @@
use crate::notification_listener::{NotificationListener, SafeChannelName};
use futures::sync::mpsc::{channel, Sender};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use uuid::Uuid;

use graph::prelude::{ChainHeadUpdateListener as ChainHeadUpdateListenerTrait, *};
use graph::serde_json;

use crate::notification_listener::{NotificationListener, SafeChannelName};

type ChainHeadUpdateSubscribers = Arc<RwLock<HashMap<String, Sender<ChainHeadUpdate>>>>;

pub struct ChainHeadUpdateListener {
notification_listener: NotificationListener,
network_name: String,
logger: Logger,
subscribers: ChainHeadUpdateSubscribers,
_listener: NotificationListener,
}

impl ChainHeadUpdateListener {
pub fn new(logger: &Logger, postgres_url: String, network_name: String) -> Self {
let logger = logger.new(o!("component" => "ChainHeadUpdateListener"));
let subscribers = Arc::new(RwLock::new(HashMap::new()));

// Create a Postgres notification listener for chain head updates
let mut listener = NotificationListener::new(
&logger,
postgres_url,
SafeChannelName::i_promise_this_is_safe("chain_head_updates"),
);

Self::listen(&logger, &mut listener, network_name, subscribers.clone());

ChainHeadUpdateListener {
notification_listener: NotificationListener::new(
logger,
postgres_url,
SafeChannelName::i_promise_this_is_safe("chain_head_updates"),
),
network_name,
}
}
}
logger,
subscribers,

impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
fn start(&mut self) {
self.notification_listener.start()
// We keep the listener around to tie its stream's lifetime to
// that of the chain head update listener and prevent it from
// terminating early
_listener: listener,
}
}
}

impl EventProducer<ChainHeadUpdate> for ChainHeadUpdateListener {
fn take_event_stream(
&mut self,
) -> Option<Box<Stream<Item = ChainHeadUpdate, Error = ()> + Send>> {
let network_name = self.network_name.clone();
fn listen(
logger: &Logger,
listener: &mut NotificationListener,
network_name: String,
subscribers: ChainHeadUpdateSubscribers,
) {
let logger = logger.clone();

self.notification_listener.take_event_stream().map(
move |stream| -> Box<Stream<Item = _, Error = _> + Send> {
Box::new(stream.filter_map(move |notification| {
// Process chain head updates in a dedicated task
tokio::spawn(
listener
.take_event_stream()
.unwrap()
.filter_map(move |notification| {
// Create ChainHeadUpdate from JSON
let update: ChainHeadUpdate =
serde_json::from_value(notification.payload.clone()).unwrap_or_else(|_| {
Expand All @@ -44,14 +64,65 @@ impl EventProducer<ChainHeadUpdate> for ChainHeadUpdateListener {
)
});

// Only include update if about the right network
// Only include update if it is for the network we're interested in
if update.network_name == network_name {
Some(update)
} else {
None
}
}))
},
)
})
.for_each(move |update| {
let logger = logger.clone();
let senders = subscribers.read().unwrap().clone();
let subscribers = subscribers.clone();

debug!(
logger,
"Received chain head update";
"network" => &update.network_name,
"head_block_hash" => format!("{}", update.head_block_hash),
"head_block_number" => &update.head_block_number,
);

// Forward update to all susbcribers
stream::iter_ok::<_, ()>(senders).for_each(move |(id, sender)| {
let logger = logger.clone();
let subscribers = subscribers.clone();

sender.send(update.clone()).then(move |result| {
if result.is_err() {
// If sending to a subscriber fails, we'll assume that
// the receiving end has been dropped. In this case we
// remove the subscriber
debug!(logger, "Unsubscribe"; "id" => &id);
subscribers.write().unwrap().remove(&id);
}

// Move on to the next subscriber
Ok(())
})
})
}),
);

// We're ready, start listening to chain head updaates
listener.start();
}
}

impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
fn subscribe(&self) -> ChainHeadUpdateStream {
// Generate a new (unique) UUID; we're looping just to be sure we avoid collisions
let mut id = Uuid::new_v4().to_string();
while self.subscribers.read().unwrap().contains_key(&id) {
id = Uuid::new_v4().to_string();
}

debug!(self.logger, "Subscribe"; "id" => &id);

// Create a subscriber and return the receiving end
let (sender, receiver) = channel(100);
self.subscribers.write().unwrap().insert(id, sender);
Box::new(receiver)
}
}
18 changes: 9 additions & 9 deletions store/postgres/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use uuid::Uuid;
use crate::notification_listener::JsonNotification;
use graph::components::store::Store as StoreTrait;
use graph::data::subgraph::schema::*;
use graph::prelude::*;
use graph::prelude::{ChainHeadUpdateListener as _, *};
use graph::serde_json;
use graph::web3::types::H256;
use graph::{tokio, tokio::timer::Interval};
Expand Down Expand Up @@ -61,7 +61,7 @@ pub struct Store {
subscriptions: Arc<RwLock<HashMap<String, Sender<StoreEvent>>>>,
// listen to StoreEvents emitted by emit_store_events
listener: StoreEventListener,
postgres_url: String,
chain_head_update_listener: ChainHeadUpdateListener,
network_name: String,
genesis_block_ptr: EthereumBlockPointer,
conn: Pool<ConnectionManager<PgConnection>>,
Expand Down Expand Up @@ -108,7 +108,11 @@ impl Store {
logger: logger.clone(),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
listener,
postgres_url: config.postgres_url.clone(),
chain_head_update_listener: ChainHeadUpdateListener::new(
&logger,
config.postgres_url,
config.network_name.clone(),
),
network_name: config.network_name.clone(),
genesis_block_ptr: (net_identifiers.genesis_block_hash, 0u64).into(),
conn: pool,
Expand Down Expand Up @@ -981,12 +985,8 @@ impl ChainStore for Store {
.and_then(|r| r.map_err(Error::from))
}

fn chain_head_updates(&self) -> Self::ChainHeadUpdateListener {
Self::ChainHeadUpdateListener::new(
&self.logger,
self.postgres_url.clone(),
self.network_name.clone(),
)
fn chain_head_updates(&self) -> ChainHeadUpdateStream {
self.chain_head_update_listener.subscribe()
}

fn chain_head_ptr(&self) -> Result<Option<EthereumBlockPointer>, Error> {
Expand Down