diff --git a/Cargo.lock b/Cargo.lock index e3fc9a8..a905286 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -705,6 +705,7 @@ dependencies = [ "lru", "mlua", "once_cell", + "pin-project", "prometheus", "proptest", "rand 0.8.4", diff --git a/Cargo.toml b/Cargo.toml index 17bba0b..398bfb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ toml = "0.5" arc-swap = "1.4" mlua = { version = "0.6", features = ["luajit", "vendored"] } awc-uds = "0.1.0" +pin-project = "1.0" solana-account-decoder = { version = "=1.8.2", optional = true } solana-sdk = { version = "=1.8.2", optional = true } diff --git a/src/main.rs b/src/main.rs index 9ba88a2..273680a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -96,7 +96,7 @@ async fn config_read_loop(path: PathBuf, rpc: Arc>) { async fn run(options: cli::Options) -> Result<()> { let accounts = AccountsDb::new(); - let program_accounts = ProgramAccountsDb::new(); + let program_accounts = ProgramAccountsDb::default(); let rpc_slot = Arc::new(AtomicU64::new(0)); let _rpc_monitor = cache_rpc::rpc_monitor::RpcMonitor::init( diff --git a/src/metrics.rs b/src/metrics.rs index f89e337..757da1e 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -77,6 +77,7 @@ pub struct PubSubMetrics { pub pubsub_program_slot: IntGaugeVec, pub pubsub_account_slot: IntGaugeVec, pub websocket_reconnects: IntCounterVec, + pub subscriptions_skipped: IntCounter, } pub fn pubsub_metrics() -> &'static PubSubMetrics { @@ -246,6 +247,11 @@ pub fn pubsub_metrics() -> &'static PubSubMetrics { &["connection_id"] ) .unwrap(), + subscriptions_skipped: register_int_counter!( + "subscriptions_skipped", + "Number of account subscriptions skipped, due to presence of owner-program subscription" + ) + .unwrap(), }); &METRICS } diff --git a/src/pubsub.rs b/src/pubsub.rs index 1971132..469b886 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -1,8 +1,11 @@ use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::pin::Pin; use std::sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }; +use std::task::Poll; use std::time::Duration; use actix::io::SinkWrite; @@ -13,7 +16,11 @@ use actix::prelude::{ use actix::Arbiter; use actix_http::ws; use bytes::BytesMut; -use futures_util::stream::StreamExt; +use futures_util::{ + future::{join, Join}, + stream::StreamExt, +}; +use pin_project::pin_project; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use tokio::sync::mpsc; @@ -87,6 +94,54 @@ pub struct PubSubManager { subscriptions_allowed: Arc, } +impl Actor for PubSubManager { + type Context = Context; +} + +impl Handler for PubSubManager { + type Result = (); + fn handle(&mut self, msg: PubSubSubscribe, _: &mut Self::Context) -> Self::Result { + let sub = Subscription::Account(msg.key); + self.subscribe(sub, msg.commitment, None, Some(msg.owner)); + } +} + +type IsSubActiveRequest = actix::prelude::Request; +#[allow(clippy::large_enum_variant)] +#[pin_project(project = SubscriptionActiveProject)] +pub enum SubscriptionActive { + Ready(bool), + RequestWithOwner(#[pin] Join), + Request(#[pin] IsSubActiveRequest), +} + +impl Future for SubscriptionActive { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + match self.project() { + SubscriptionActiveProject::Ready(res) => Poll::Ready(*res), + SubscriptionActiveProject::RequestWithOwner(req) => { + match req.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready((Ok(true), _)) => { + metrics().subscriptions_skipped.inc(); // owner subscription exists + Poll::Ready(true) + } + Poll::Ready((_, Ok(true))) => Poll::Ready(true), + Poll::Ready(_) => Poll::Ready(false), + } + } + + SubscriptionActiveProject::Request(req) => match req.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(value)) => Poll::Ready(value), + Poll::Ready(Err(_)) => Poll::Ready(false), + }, + } + } +} + impl PubSubManager { pub fn init( connections: u32, @@ -99,21 +154,28 @@ impl PubSubManager { let mut workers = Vec::new(); let config = Arc::new(worker_config); for id in 0..connections { - let active = Arc::new(AtomicBool::new(false)); + let connected = Arc::new(AtomicBool::new(false)); let addr = AccountUpdateManager::init( id, accounts.clone(), program_accounts.clone(), - Arc::clone(&active), + Arc::clone(&connected), rpc_slot.clone(), Arc::clone(&config), ); - workers.push((addr, active)) + workers.push((addr, connected)) } - PubSubManager { + let manager = PubSubManager { workers, subscriptions_allowed, + }; + let actor = manager.clone().start(); + // make sure all the workers, have an address of pubsub manager + for (w, _) in &manager.workers { + w.do_send(InitManager(Addr::clone(&actor))); } + + manager } fn get_idx_by_key(&self, key: (Pubkey, Commitment)) -> usize { @@ -131,14 +193,48 @@ impl PubSubManager { self.workers[idx].0.clone() } - pub fn subscription_active(&self, key: (Pubkey, Commitment)) -> bool { + pub fn websocket_connected(&self, key: (Pubkey, Commitment)) -> bool { let idx = self.get_idx_by_key(key); self.workers[idx].1.load(Ordering::Relaxed) } - pub fn reset(&self, sub: Subscription, commitment: Commitment, filters: Option) { + pub fn subscription_active( + &self, + sub: Subscription, + commitment: Commitment, + owner: Option, + ) -> SubscriptionActive { + if self.websocket_connected((sub.key(), commitment)) { + let addr = self.get_addr_by_key((sub.key(), commitment)); + + owner + .map(|key| (self.get_addr_by_key((key, commitment)), key)) + .map(|(owner_addr, key)| { + SubscriptionActive::RequestWithOwner(join( + owner_addr.send(IsSubActive { + sub: Subscription::Program(key), + commitment, + }), + addr.send(IsSubActive { sub, commitment }), + )) + }) + .unwrap_or_else(|| { + SubscriptionActive::Request(addr.send(IsSubActive { sub, commitment })) + }) + } else { + SubscriptionActive::Ready(false) + } + } + + pub fn reset( + &self, + sub: Subscription, + commitment: Commitment, + filters: Option, + owner: Option, + ) { let addr = self.get_addr_by_key((sub.key(), commitment)); - addr.do_send(AccountCommand::Reset(sub, commitment, filters)) + addr.do_send(AccountCommand::Reset(sub, commitment, filters, owner)) } #[inline] @@ -146,9 +242,20 @@ impl PubSubManager { self.subscriptions_allowed.load(Ordering::Relaxed) } - pub fn subscribe(&self, sub: Subscription, commitment: Commitment, filters: Option) { + pub fn subscribe( + &self, + sub: Subscription, + commitment: Commitment, + filters: Option, + owner: Option, + ) { let addr = self.get_addr_by_key((sub.key(), commitment)); - addr.do_send(AccountCommand::Subscribe(sub, commitment, filters)) + addr.do_send(AccountCommand::Subscribe(sub, commitment, filters, owner)) + } + + pub fn unsubscribe(&self, key: Pubkey, commitment: Commitment) { + let addr = self.get_addr_by_key((key, commitment)); + addr.do_send(AccountCommand::Unsubscribe(key, commitment)) } } @@ -171,7 +278,60 @@ impl Connection { } } -type ProgramFilters = HashMap<(Pubkey, Commitment), FilterTree>; +#[derive(Default)] +struct ProgramFilters { + filtered: HashMap<(Pubkey, Commitment), FilterTree>, + nofilters: HashMap<(Pubkey, Commitment), SpawnHandle>, +} + +impl ProgramFilters { + fn len(&self) -> usize { + self.filtered.len() + self.nofilters.len() + } + + fn get(&self, key: &(Pubkey, Commitment)) -> Option<&FilterTree> { + self.filtered.get(key) + } + + fn insert( + &mut self, + key: (Pubkey, Commitment), + filters: Option, + handle: SpawnHandle, + ) -> Option { + match filters { + Some(filters) => self + .filtered + .entry(key) + .or_insert_with(FilterTree::new) + .insert(filters, handle), + None => self.nofilters.insert(key, handle), + } + } + + fn remove( + &mut self, + key: &(Pubkey, Commitment), + filters: &Option, + ) -> Option { + if let Some(filters) = filters { + self.filtered + .get_mut(key) + .map(|entry| entry.remove(filters)) + .flatten() + } else { + self.nofilters.remove(key) + } + } + fn remove_all(&mut self, key: &(Pubkey, Commitment)) -> impl Iterator { + self.filtered + .remove(key) + .into_iter() + .flatten() + .map(|(_, h)| h) + .chain(self.nofilters.remove(key).into_iter()) + } +} pub struct AccountUpdateManager { actor_id: u32, @@ -188,10 +348,11 @@ pub struct AccountUpdateManager { purge_queue: Option>, additional_keys: ProgramFilters, last_received_at: Instant, - active: Arc, + connected: Arc, rpc_slot: Arc, buffer: BytesMut, config: Arc, + manager: Option>, } impl std::fmt::Debug for AccountUpdateManager { @@ -205,7 +366,7 @@ impl AccountUpdateManager { actor_id: u32, accounts: AccountsDb, program_accounts: ProgramAccountsDb, - active: Arc, + connected: Arc, rpc_slot: Arc, config: Arc, ) -> Addr { @@ -222,14 +383,15 @@ impl AccountUpdateManager { active_accounts: HashMap::default(), request_id: 1, accounts: accounts.clone(), - program_accounts: program_accounts.clone(), + program_accounts, purge_queue: None, - additional_keys: HashMap::default(), - active, + additional_keys: ProgramFilters::default(), + connected, rpc_slot, last_received_at: Instant::now(), buffer: BytesMut::new(), config, + manager: None, }) } @@ -539,18 +701,17 @@ impl AccountUpdateManager { ctx: &mut Context, sub: Subscription, commitment: Commitment, - filters: Filters, + filters: Option, ) { let key = sub.key(); + let to_purge = filters.clone(); + let spawn_handle = ctx.run_later(self.config.ttl, move |actor, ctx| { + actor.purge_filter(ctx, sub, commitment, to_purge); + }); + let handle = self .additional_keys - .entry((key, commitment)) - .or_insert_with(FilterTree::new) - .insert(filters.clone(), { - ctx.run_later(self.config.ttl, move |actor, ctx| { - actor.purge_filter(ctx, sub, commitment, filters); - }) - }); + .insert((key, commitment), filters, spawn_handle); metrics() .additional_keys_entries .with_label_values(&[&self.actor_name]) @@ -579,13 +740,10 @@ impl AccountUpdateManager { ctx: &mut Context, sub: Subscription, commitment: Commitment, - filters: Filters, + filters: Option, ) { let key = sub.key(); - let handle = self - .additional_keys - .get_mut(&(key, commitment)) - .and_then(|map| map.remove(&filters)); + let handle = self.additional_keys.remove(&(key, commitment), &filters); if let Some(handle) = handle { info!(key = %sub.key(), commitment = ?commitment, filter = ?filters, "purging filters"); @@ -594,44 +752,43 @@ impl AccountUpdateManager { .with_label_values(&[&self.actor_name]) .dec(); - for key in self - .program_accounts - .remove_all(&key, commitment, Some(filters)) - { - self.accounts.remove(&key, commitment) - } - ctx.cancel_future(handle); } + let accounts_for_filter = self + .program_accounts + .remove_keys_for_filter(&(key, commitment), filters); + for arc in accounts_for_filter { + let key = *arc; + drop(arc); + self.accounts.remove(&key, commitment) + } } fn purge_key(&mut self, ctx: &mut Context, sub: &Subscription, commitment: Commitment) { info!(self.actor_id, message = "purge", key = %sub.key(), commitment = ?commitment); match sub { Subscription::Program(program_key) => { - let keys = self + let filters_count = self .additional_keys - .remove(&(*program_key, commitment)) - .into_iter() - .flatten() - .map(|(filter, handle)| { + .remove_all(&(*program_key, commitment)) + .map(|handle| { ctx.cancel_future(handle); - (program_key, Some(filter)) }) - .chain(Some((program_key, None))); - for (key, filter) in keys { - if filter.is_some() { - metrics() - .filters - .with_label_values(&[&self.actor_name]) - .dec(); - } - for key in self - .program_accounts - .remove_all(key, commitment, filter.clone()) - { - self.accounts.remove(&key, commitment) - } + .count(); + metrics() + .filters + .with_label_values(&[&self.actor_name]) + .sub(filters_count as i64); + + let program_key = &(*program_key, commitment); + let accounts_to_remove = self.program_accounts.remove_all(program_key); + // cleanup all child accounts, that were kept in AccountsDb, + // while program subscription was active + for arc in accounts_to_remove { + let acc = *arc; + // now ref in AccountsDb should be the only one left + drop(arc); + self.accounts.remove(&acc, commitment); } metrics() .additional_keys_entries @@ -639,24 +796,43 @@ impl AccountUpdateManager { .set(self.additional_keys.len() as i64); } Subscription::Account(key) => { + let data = self + .accounts + .get(key) + .and_then(|state| state.get_ref(commitment)) + .map(|acc_ref| (acc_ref, self.accounts.get_owner(key, commitment))); + match data { + Some((acc_ref, Some(pubkey))) => { + let program_key = (pubkey, commitment); + self.program_accounts + .untrack_account_key(&program_key, acc_ref); + } + Some((acc_ref, None)) => { + warn!("empty account, reference exists without actual data"); + drop(acc_ref); + } + None => (), + } self.accounts.remove(key, commitment); } } } fn update_status(&self) { - let is_active = self.id_to_sub.len() == self.subs.len() && self.connection.is_connected(); - self.active.store(is_active, Ordering::Relaxed); + let connected = self.connection.is_connected(); + let active = self.id_to_sub.len() == self.subs.len() && connected; + + self.connected.store(connected, Ordering::Relaxed); metrics() .websocket_connected .with_label_values(&[&self.actor_name]) - .set(if self.connection.is_connected() { 1 } else { 0 }); + .set(if connected { 1 } else { 0 }); metrics() .websocket_active .with_label_values(&[&self.actor_name]) - .set(if is_active { 1 } else { 0 }); + .set(if active { 1 } else { 0 }); } fn process_ws_message( @@ -713,7 +889,8 @@ impl AccountUpdateManager { if sub.is_account() { self.active_accounts.remove(&(sub.key(), commitment)); } - self.purge_key(ctx, &sub, commitment); + // no need to call `purge_key` as unsubscription request can only be + // called from `Purge` command, which calls it for us } InflightRequest::SlotSub(_) => { warn!(self.actor_id, request_id = id, error = ?error, "slot subscribe failed"); @@ -796,7 +973,8 @@ impl AccountUpdateManager { .subscription_lifetime .observe(times.since_creation().as_secs_f64()); } - self.purge_key(ctx, &sub, commitment); + // no need to call `purge_key` as unsubscription request can only be + // called from `Purge` command, which calls it for us } else { warn!(self.actor_id, sub = %sub, commitment = ?commitment, "unsubscribe for unknown subscription"); } @@ -919,17 +1097,15 @@ impl AccountUpdateManager { *commitment, ); - self.program_accounts.update_account( - &program_key, - key_ref.clone(), + let should_remove_account = self.program_accounts.update_account( + &(program_key, *commitment), + key_ref, filter_groups, - *commitment, slot, ); - - // important for proper removal - drop(key_ref); - self.accounts.remove(&key, *commitment); + if should_remove_account { + self.accounts.remove(&key, *commitment); + } } else { warn!( self.actor_id, @@ -1036,19 +1212,61 @@ impl StreamHandler for AccountUpdateManager { } } +impl Handler for AccountUpdateManager { + type Result = bool; + + fn handle(&mut self, item: IsSubActive, _: &mut Context) -> bool { + self.sub_to_id.contains_key(&(item.sub, item.commitment)) + } +} + +impl Handler for AccountUpdateManager { + type Result = (); + + fn handle(&mut self, item: InitManager, _: &mut Context) { + self.manager.replace(item.0); + } +} + impl Handler for AccountUpdateManager { type Result = (); fn handle(&mut self, item: AccountCommand, ctx: &mut Context) { let _ = (|| -> Result<(), serde_json::Error> { match item { - AccountCommand::Subscribe(sub, commitment, filters) => { + AccountCommand::Subscribe(sub, commitment, filters, owner) => { + // if account owner exists + if let Some(pubkey) = owner { + let program_key = (pubkey, commitment); + // then try to add it to tracked keys in program accounts cache + let success = self + .accounts + .get(&sub.key()) + .and_then(|state| state.get_ref(commitment)) + .map(|acc_ref| { + self.program_accounts + .track_account_key(program_key, acc_ref) + }) + .unwrap_or_default(); + // if real program entry existed, `success` will be set to true, + // indicating that owner program is tracking subscription for + // the account, so there's no need create another one + if success { + info!("account subscription skipped"); + metrics().subscriptions_skipped.inc(); + self.purge_queue + .as_ref() + .unwrap() + .insert((sub, commitment), self.config.ttl); + return Ok(()); + } + } metrics() .commands .with_label_values(&[&self.actor_name, "subscribe"]) .inc(); self.subscribe(sub, commitment)?; - if let Some(filters) = filters { + if !sub.is_account() { self.reset_filter(ctx, sub, commitment, filters); } } @@ -1057,6 +1275,29 @@ impl Handler for AccountUpdateManager { .commands .with_label_values(&[&self.actor_name, "purge"]) .inc(); + // for program subscription, before we purge everything from + // cache, we have to resubscribe for all of its accounts, + // which had recent activity, and are stored in tracked_keys + if !sub.is_account() { + let program_key = (sub.key(), commitment); + // we save the tracked keys, before purging the program's cache entries + let tracked_keys = self.program_accounts.get_tracked_keys(&program_key); + // for a program, we have to remove data from cache first, so that new + // account subscriptions can create a new entry for their owner, and add + // themselves to tracked accounts list + self.purge_key(ctx, &sub, commitment); + let manager = self + .manager + .as_ref() + .expect("PubSub manager is not setup in worker"); + for key in tracked_keys { + manager.do_send(PubSubSubscribe { + key, + commitment, + owner: sub.key(), + }); + } + } if self.connection.is_connected() { self.unsubscribe(sub, commitment)?; @@ -1065,10 +1306,24 @@ impl Handler for AccountUpdateManager { } if sub.is_account() { self.active_accounts.remove(&(sub.key(), commitment)); + self.purge_key(ctx, &sub, commitment); } - self.purge_key(ctx, &sub, commitment); } - AccountCommand::Reset(sub, commitment, filters) => { + AccountCommand::Unsubscribe(pubkey, commitment) => { + metrics() + .commands + .with_label_values(&[&self.actor_name, "unsubsribe"]) + .inc(); + self.active_accounts.remove(&(pubkey, commitment)); + if self.connection.is_connected() { + self.unsubscribe(Subscription::Account(pubkey), commitment)?; + } else { + self.subs + .remove(&(Subscription::Account(pubkey), commitment)); + } + } + + AccountCommand::Reset(sub, commitment, filters, owner) => { metrics() .commands .with_label_values(&[&self.actor_name, "reset"]) @@ -1078,11 +1333,28 @@ impl Handler for AccountUpdateManager { .time_until_reset .observe(time.update().as_secs_f64()); } + + if let Some(owner_pubkey) = owner { + let acc_ref = self + .accounts + .get(&sub.key()) + .and_then(|state| state.get_ref(commitment)); + // if owner was provided for reset, it means, that this account + // subscription is being tracked by owner program, by tracking this + // account's key in program's cache entry we make sure, that when the + // program's subscription ends, it will resubscribe for all tracked + // accounts + if let Some(acc_ref) = acc_ref { + self.program_accounts + .track_account_key((owner_pubkey, commitment), acc_ref); + } + } + self.purge_queue .as_ref() .unwrap() .reset((sub, commitment), self.config.ttl); - if let Some(filters) = filters { + if !sub.is_account() { self.reset_filter(ctx, sub, commitment, filters); } } @@ -1292,9 +1564,40 @@ impl std::fmt::Display for Subscription { #[derive(Message, Debug)] #[rtype(result = "()")] pub enum AccountCommand { - Subscribe(Subscription, Commitment, Option), - Reset(Subscription, Commitment, Option), + Subscribe( + Subscription, + Commitment, + Option, + Option, /*optional account owner*/ + ), + Reset( + Subscription, + Commitment, + Option, + Option, /*optional account owner*/ + ), Purge(Subscription, Commitment), + // same as purge, but doesn't remove data from cache, used only for account subscriptions + Unsubscribe(Pubkey, Commitment), +} + +#[derive(Message, Debug)] +#[rtype(result = "bool")] +pub struct IsSubActive { + sub: Subscription, + commitment: Commitment, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct InitManager(Addr); + +#[derive(Message)] +#[rtype(result = "()")] +pub struct PubSubSubscribe { + key: Pubkey, + commitment: Commitment, + owner: Pubkey, } enum DelayQueueCommand { diff --git a/src/rpc.rs b/src/rpc.rs index c561e1c..230359c 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -28,7 +28,7 @@ use tracing::{debug, error, info, warn}; use crate::filter::{Filter, Filters}; use crate::metrics::rpc_metrics as metrics; -use crate::pubsub::{PubSubManager, Subscription}; +use crate::pubsub::{PubSubManager, Subscription, SubscriptionActive}; use crate::types::{ AccountContext, AccountData, AccountInfo, AccountState, AccountsDb, BytesChain, Commitment, Encoding, ProgramAccountsDb, Pubkey, SemaphoreQueue, Slot, SolanaContext, @@ -323,24 +323,42 @@ impl Waf { } impl State { - fn reset(&self, sub: SubDescriptor) { - self.pubsub.reset(sub.kind, sub.commitment, sub.filters); + fn reset(&self, sub: SubDescriptor, owner: Option) { + self.pubsub + .reset(sub.kind, sub.commitment, sub.filters, owner); } fn insert(&self, key: Pubkey, data: AccountContext, commitment: Commitment) -> Arc { self.accounts.insert(key, data, commitment) } - fn subscription_active(&self, key: (Pubkey, Commitment)) -> bool { - self.pubsub.subscription_active(key) + fn websocket_connected(&self, key: (Pubkey, Commitment)) -> bool { + self.pubsub.websocket_connected(key) + } + + // owner is the subscription for program, if given account belongs to one + fn subscription_active( + &self, + sub: Subscription, + commitment: Commitment, + owner: Option, + ) -> SubscriptionActive { + self.pubsub.subscription_active(sub, commitment, owner) } fn is_caching_allowed(&self) -> bool { self.pubsub.can_subscribe() } - fn subscribe(&self, sub: SubDescriptor) { - self.pubsub.subscribe(sub.kind, sub.commitment, sub.filters); + fn subscribe(&self, sub: SubDescriptor, owner: Option) { + self.pubsub + .subscribe(sub.kind, sub.commitment, sub.filters, owner); + } + + // used to unsubscribe from accounts of program, after owner + // is inserted into cache + fn unsubscribe(&self, key: Pubkey, commitment: Commitment) { + self.pubsub.unsubscribe(key, commitment); } // clippy incorrectly assumes that lifetimes can be elided @@ -416,8 +434,13 @@ impl State { { Ok(Some(data)) => { T::cache_hit_counter().inc(); - self.reset(request.sub_descriptor()); - return data; + let owner = data.owner(); + self.reset(request.sub_descriptor(), owner); + if request.has_active_subscription(&self, owner).await { + return data.map(|data| data.response); + } else { + (true, false) + } } Ok(None) => (true, true), Err(reason) => { @@ -428,8 +451,8 @@ impl State { if let Some(data) = data { T::cache_hit_counter().inc(); - self.reset(request.sub_descriptor()); - return data; + self.reset(request.sub_descriptor(), data.owner()); + return data.map(|data| data.response); } metrics() @@ -453,8 +476,8 @@ impl State { if let Some(data) = request.get_from_cache(&raw_request.id, &self) { T::cache_hit_counter().inc(); T::cache_filled_counter().inc(); - self.reset(request.sub_descriptor()); - return data; + self.reset(request.sub_descriptor(), data.owner()); + return data.map(|data| data.response); } continue; } @@ -491,10 +514,11 @@ impl State { match resp { Ok(Response::Result(data)) => { + let owner = data.owner(); if this.is_caching_allowed() && request.put_into_cache(&this, data) { debug!(%request, "cached for key"); this.map_updated.notify_waiters(); - this.subscribe(request.sub_descriptor()); + this.subscribe(request.sub_descriptor(), owner); } } Ok(Response::Error(error)) => { @@ -516,17 +540,50 @@ impl State { } } +struct CachedResponse { + owner: Option, + response: HttpResponse, +} + type CacheResult<'a> = Result>; +trait HasOwner { + fn owner(&self) -> Option { + None + } +} + +impl<'a> HasOwner for Result> { + fn owner(&self) -> Option { + self.as_ref().ok().map(|data| data.owner).flatten() + } +} + +impl HasOwner for AccountContext { + fn owner(&self) -> Option { + self.value.as_ref().map(|value| value.owner) + } +} + +impl HasOwner for MaybeContext> {} + trait Cacheable: Sized + 'static { const REQUEST_TYPE: &'static str; - type ResponseData: DeserializeOwned; + type ResponseData: DeserializeOwned + HasOwner; fn parse<'a>(request: &Request<'a, RawValue>) -> Result>; fn get_limit(state: &State) -> &SemaphoreQueue; fn is_cacheable(&self, state: &State) -> Result<(), UncacheableReason>; - fn get_from_cache<'a>(&self, id: &Id<'a>, state: &State) -> Option>; + // method to check whether cached entry has corresponding websocket subscription + fn has_active_subscription(&self, state: &State, owner: Option) -> SubscriptionActive; + + fn get_from_cache<'a>( + &self, + id: &Id<'a>, + state: &State, + ) -> Option>>; + fn put_into_cache(&self, state: &State, data: Self::ResponseData) -> bool; fn sub_descriptor(&self) -> SubDescriptor; @@ -590,33 +647,41 @@ impl Cacheable for GetAccountInfo { state.account_info_request_limit.as_ref() } + // for getAccountInfo requests, we don't need to subscribe in case if the owner program exists, + // and there's already an active subscription present for it + fn has_active_subscription(&self, state: &State, owner: Option) -> SubscriptionActive { + state.subscription_active(Subscription::Account(self.pubkey), self.commitment(), owner) + } + fn is_cacheable(&self, state: &State) -> Result<(), UncacheableReason> { if self.config.encoding == Encoding::JsonParsed { Err(UncacheableReason::Encoding) } else if self.config.data_slice.is_some() { Err(UncacheableReason::DataSlice) - } else if !state.subscription_active((self.pubkey, self.commitment())) { - Err(UncacheableReason::Inactive) + } else if !state.websocket_connected((self.pubkey, self.commitment())) { + Err(UncacheableReason::Disconnected) } else { Ok(()) } } - fn get_from_cache<'a>(&self, id: &Id<'a>, state: &State) -> Option> { + fn get_from_cache<'a>( + &self, + id: &Id<'a>, + state: &State, + ) -> Option>> { state.accounts.get(&self.pubkey).and_then(|data| { let mut account = data.value().get(self.commitment()); - account = account.map(|(info, mut slot)| { - if slot == 0 { - if let Some(info) = info { - if let Some(owner) = state.program_accounts.get(&info.owner, None) { - if let Some(s) = owner.value().get_slot(self.commitment()) { - slot = *s; - } - } - } - } - (info, slot) - }); + let owner = account.and_then(|(info, _)| info).map(|info| info.owner); + + account = match account { + Some((Some(info), slot)) if slot == 0 => state + .program_accounts + .get_slot(&(info.owner, self.commitment())) + .map(|slot| (Some(info), slot)), + acc => acc, + }; + match account.filter(|(_, slot)| *slot != 0) { Some(data) => { let resp = account_response( @@ -628,9 +693,12 @@ impl Cacheable for GetAccountInfo { self.pubkey, ); match resp { - Ok(res) => Some(Ok(res)), + Ok(res) => Some(Ok(CachedResponse { + response: res, + owner, + })), Err(Error::Parsing(_)) => None, - e => Some(e), + Err(e) => Some(Err(e)), } } _ => None, @@ -712,6 +780,10 @@ impl Cacheable for GetProgramAccounts { state.program_accounts_request_limit.as_ref() } + fn has_active_subscription(&self, state: &State, _owner: Option) -> SubscriptionActive { + state.subscription_active(Subscription::Program(self.pubkey), self.commitment(), None) + } + fn is_cacheable(&self, state: &State) -> Result<(), UncacheableReason> { if self.config.encoding == Encoding::JsonParsed { Err(UncacheableReason::Encoding) @@ -719,14 +791,18 @@ impl Cacheable for GetProgramAccounts { Err(UncacheableReason::DataSlice) } else if !self.valid_filters { Err(UncacheableReason::Filters) - } else if !state.subscription_active((self.pubkey, self.commitment())) { - Err(UncacheableReason::Inactive) + } else if !state.websocket_connected((self.pubkey, self.commitment())) { + Err(UncacheableReason::Disconnected) } else { Ok(()) } } - fn get_from_cache<'a>(&self, id: &Id<'a>, state: &State) -> Option> { + fn get_from_cache<'a>( + &self, + id: &Id<'a>, + state: &State, + ) -> Option>> { let with_context = self.config.with_context.unwrap_or(false); let commitment = self.commitment(); let filters = self.filters.as_ref(); @@ -734,15 +810,22 @@ impl Cacheable for GetProgramAccounts { state .program_accounts - .get(&self.pubkey, filters.cloned()) + .get_state((self.pubkey, commitment)) .and_then(|data| { - let accounts = data.value().get(commitment)?; - - let id_ = id.clone(); - let res = - program_accounts_response(id_, accounts, config, filters, state, with_context); + let accounts = data.value().get_account_keys(&self.filters)?; + let res = program_accounts_response( + id.clone(), + accounts, + config, + filters, + state, + with_context, + ); match res { - Ok(res) => Some(Ok(res)), + Ok(res) => Some(Ok(CachedResponse { + owner: None, + response: res, + })), Err(ProgramAccountsResponseError::Base58) => { Some(Err(base58_error(id.clone()))) } @@ -755,11 +838,10 @@ impl Cacheable for GetProgramAccounts { if !self.valid_filters { return false; } - let filters = self.filters.as_ref(); let commitment = self.commitment(); let (slot, accounts) = data.into_slot_and_value(); - let mut keys = HashSet::with_capacity(accounts.len()); + let mut account_key_refs = HashSet::with_capacity(accounts.len()); for acc in accounts { let AccountAndPubkey { account, pubkey } = acc; let key_ref = state.insert( @@ -772,11 +854,27 @@ impl Cacheable for GetProgramAccounts { }, commitment, ); - keys.insert(key_ref); + account_key_refs.insert(Arc::clone(&key_ref)); } - state - .program_accounts - .insert(self.pubkey, keys, commitment, filters.cloned()); + + let program_key = (self.pubkey, commitment); + let should_unsubscribe = !state.program_accounts.has_active_entry(&program_key); + + let program_state = + state + .program_accounts + .insert(program_key, account_key_refs, self.filters.clone()); + + // if the cache insertion for the given program key + // happened for the first time, then we have to unsubscribe from all accounts, which are + // owned by given program, otherwise, we have already unsubscribed from them, on previous + // insert calls + if should_unsubscribe { + for key in program_state.tracked_keys() { + state.unsubscribe(**key, commitment); + } + } + true } @@ -810,18 +908,18 @@ impl fmt::Display for GetProgramAccounts { enum UncacheableReason { Encoding, - Inactive, DataSlice, Filters, + Disconnected, } impl UncacheableReason { fn as_str(&self) -> &'static str { match self { Self::Encoding => "encoding", - Self::Inactive => "inactive_sub", Self::DataSlice => "data_slice", Self::Filters => "bad_filters", + Self::Disconnected => "websocket_disconnected", } } @@ -829,7 +927,7 @@ impl UncacheableReason { fn can_use_cache(&self) -> bool { match self { Self::Encoding | Self::DataSlice => true, - Self::Inactive | Self::Filters => false, + Self::Filters | Self::Disconnected => false, } } } diff --git a/src/types.rs b/src/types.rs index 2d1e4e9..1fb0c9d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,10 +1,13 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use bytes::{Buf, Bytes}; use dashmap::mapref::entry::Entry; -use dashmap::{mapref::one::Ref, DashMap}; +use dashmap::{ + mapref::one::{Ref, RefMut}, + DashMap, +}; use either::Either; use serde::{Deserialize, Serialize}; @@ -13,193 +16,251 @@ use crate::metrics::db_metrics as metrics; use tokio::sync::{Semaphore, SemaphorePermit}; +type AccountSet = HashSet>; +type ProgramAccountsKey = (Pubkey, Commitment); + +#[derive(Default)] pub struct ProgramState { - account_keys: [Option>>; 3], - slots: [u64; 3], + pub slot: u64, + // account keys, that were requested using various filteres + filtered_keys: HashMap, + // account keys, that were requested without any filters + unfiltered_keys: Option, + observed_filters: HashMap>, + tracked_keys: AccountSet, + // set to true after first insert, should be set to false, + // before program removal from cache, so that resubscription + // for accounts of the owner program can be made + active: bool, +} +#[derive(Default, Clone)] +pub struct ProgramAccountsDb { + map: Arc>, } impl ProgramState { - pub fn get(&self, commitment: Commitment) -> Option<&HashSet>> { - self.account_keys[commitment.as_idx()].as_ref() + pub fn get_account_keys(&self, filters: &Option) -> Option<&HashSet>> { + if let Some(filters) = filters { + self.filtered_keys.get(filters) + } else { + self.unfiltered_keys.as_ref() + } } - pub fn get_slot(&self, commitment: Commitment) -> Option<&u64> { - self.slots.get(commitment.as_idx()) + fn insert_account_keys(&mut self, filters: Option, keys: AccountSet) { + let old = if let Some(filters) = filters { + self.filtered_keys.insert(filters, keys) + } else { + self.unfiltered_keys.replace(keys) + }; + if old.is_none() { + metrics().program_account_entries.inc(); + } } - fn insert(&mut self, commitment: Commitment, data: HashSet>) { - self.account_keys[commitment.as_idx()] = Some(data) + fn has_unfiltered(&self) -> bool { + self.unfiltered_keys.is_some() } - fn add(&mut self, commitment: Commitment, data: Arc, slot: Slot) { - if let Some(ref mut keys) = self.account_keys[commitment.as_idx()] { - keys.insert(data); - } else { - let mut set = HashSet::new(); - set.insert(data); - self.insert(commitment, set); + fn insert_single_key(&mut self, filters: Option<&Filters>, key: Arc) { + if let Some(filters) = filters { + self.filtered_keys + .entry(filters.clone()) + .or_insert_with(|| { + metrics().program_account_entries.inc(); + AccountSet::new() + }) + .insert(key); + } else if let Some(set) = self.unfiltered_keys.as_mut() { + set.insert(key); } - self.slots[commitment.as_idx()] = slot; + // else we shouldn't insert single key, as there's no record for unfiltered request } - fn remove(&mut self, commitment: Commitment, data: &Pubkey) { - if let Some(ref mut keys) = self.account_keys[commitment.as_idx()] { - keys.remove(data); + // used to remove keys for given filter, or for no filters + fn remove_account_keys(&mut self, filters: Option) -> Option { + let mut res = if let Some(filters) = filters { + self.filtered_keys.remove(&filters) + } else { + self.unfiltered_keys.take() + }; + if let Some(keys) = res.as_mut() { + metrics().program_account_entries.dec(); + for key in &self.tracked_keys { + keys.remove(key); + } } + res } - fn take_commitment(&mut self, commitment: Commitment) -> Option>> { - self.account_keys[commitment.as_idx()].take() - } - - fn is_empty(&self) -> bool { - self.account_keys.iter().all(Option::is_none) + fn remove_single_key(&mut self, filters: &Filters, key: &Arc) { + let keys = self.filtered_keys.get_mut(filters); + if let Some(keys) = keys { + keys.remove(key); + if keys.is_empty() { + self.filtered_keys.remove(filters); + metrics().program_account_entries.dec(); + } + } } -} -impl Default for ProgramState { - fn default() -> Self { - ProgramState { - account_keys: [None, None, None], - slots: [0; 3], - } + pub fn tracked_keys(&self) -> &AccountSet { + &self.tracked_keys } } -type ProgramAccountsKey = (Pubkey, Option); +impl ProgramAccountsDb { + pub fn track_account_key(&self, program_key: ProgramAccountsKey, key_ref: Arc) -> bool { + let mut program_state = self.map.entry(program_key).or_default(); + // track refrerence counts, in case owner program is ever requested + // and put in cache, it will just unsubscribe from all those tracked + // account keys, to avoid subscription duplication + program_state.tracked_keys.insert(key_ref); -#[derive(Clone)] -pub struct ProgramAccountsDb { - map: Arc>, - observed_filters: Arc>>, -} + // if state is not active, then this state is actually dummy state, + // and no real program subscription exists, as such we can keep creating account + // subscriptions, as no owner program is tracking them for the owned account + program_state.active + } -impl Default for ProgramAccountsDb { - fn default() -> Self { - Self::new() + pub fn untrack_account_key(&self, program_key: &ProgramAccountsKey, key_ref: Arc) { + self.map + .get_mut(program_key) + .map(|mut state| state.tracked_keys.remove(&key_ref)); } -} -impl ProgramAccountsDb { - pub fn new() -> Self { - ProgramAccountsDb { - map: Arc::new(DashMap::new()), - observed_filters: Arc::new(DashMap::new()), - } + pub fn get_slot(&self, key: &ProgramAccountsKey) -> Option { + self.map.get(key).map(|state| state.slot) } - pub fn get( + pub fn get_state( &self, - key: &Pubkey, - filters: Option, + key: ProgramAccountsKey, ) -> Option> { - if let Some(found) = self.map.get(&(*key, filters)) { - Some(found) - } else { - self.map.get(&(*key, None)) - } + self.map.get(&key) } pub fn insert( &self, - key: Pubkey, - data: HashSet>, - commitment: Commitment, + key: ProgramAccountsKey, + account_key_refs: AccountSet, filters: Option, - ) { + ) -> RefMut<'_, ProgramAccountsKey, ProgramState> { + let mut state = self.map.entry(key).or_default(); if let Some(filters) = filters.as_ref() { - data.iter().for_each(|key| { - self.observed_filters - .entry((**key, commitment)) + account_key_refs.iter().for_each(|key| { + state + .observed_filters + .entry(**key) .or_default() .insert(filters.clone()); }) } + state.insert_account_keys(filters, account_key_refs); + state.active = true; - let mut entry = self.map.entry((key, filters)).or_default(); - entry.insert(commitment, data); - drop(entry); - metrics().program_account_entries.set(self.map.len() as i64); + state + } + + pub fn remove_keys_for_filter( + &self, + key: &ProgramAccountsKey, + filters: Option, + ) -> AccountSet { + self.map + .get_mut(key) + .map(|mut state| state.remove_account_keys(filters)) + .flatten() + .unwrap_or_default() + } + + // method is used to check whether we are inserting program data for the first time + pub fn has_active_entry(&self, key: &ProgramAccountsKey) -> bool { + self.map + .get(key) + .map(|state| state.active) + .unwrap_or_default() + } + + pub fn get_tracked_keys(&self, key: &ProgramAccountsKey) -> Vec { + self.map + .get(key) + .map(|state| state.tracked_keys.iter().map(|k| **k).collect()) + .unwrap_or_default() + } + + pub fn remove_all(&self, key: &ProgramAccountsKey) -> impl Iterator> { + let mut state = match self.map.remove(key) { + Some(state) => state.1, + None => return Either::Left(std::iter::empty()), + }; + + metrics() + .program_account_entries + .sub((state.filtered_keys.len() + state.unfiltered_keys.is_some() as usize) as i64); + + let mut keys = state + .filtered_keys + .into_values() + .flatten() + .collect::(); + keys.extend(state.unfiltered_keys.into_iter().flatten()); + let tracked_keys = std::mem::take(&mut state.tracked_keys); + Either::Right( + keys.into_iter() + .filter(move |key| !tracked_keys.contains(key)), + ) } pub fn update_account( &self, - key: &Pubkey, - data: Arc, + key: &ProgramAccountsKey, + acc_ref_key: Arc, filter_groups: HashSet, - commitment: Commitment, slot: Slot, - ) { - // add to global - if let Some(mut entry) = self.map.get_mut(&(*key, None)) { - entry.add(commitment, data.clone(), slot); + ) -> bool { + let mut can_be_removed = true; + let mut state = match self.map.get_mut(key) { + Some(state) if state.active => state, + _ => return can_be_removed, + }; + + // register it within unfiltered keys + if state.has_unfiltered() { + state.insert_single_key(None, Arc::clone(&acc_ref_key)); + can_be_removed = false; } + // update slot for this program + state.slot = slot; + let has_new_or_old_filters = !filter_groups.is_empty() - || self + || state .observed_filters - .get(&(*data, commitment)) + .get(&*acc_ref_key) .map_or(false /* no set == empty */, |set| !set.is_empty()); if has_new_or_old_filters { - let mut old_groups = self + let old_groups = state .observed_filters - .entry((*data, commitment)) - .or_default(); + .remove(&*acc_ref_key) + .unwrap_or_default(); let diff = old_groups.symmetric_difference(&filter_groups); for filter in diff { - let state = self.map.get_mut(&(*key, Some(filter.clone()))); - match state { + if old_groups.contains(filter) { // Account no longer matches filter - Some(mut state) if old_groups.contains(filter) => { - state.remove(commitment, &data); - } + state.remove_single_key(filter, &acc_ref_key); + } else { // Account is new to the filter - Some(mut state) /* !old_groups.contains(&filter) */ => { - state.add(commitment, Arc::clone(&data), slot); - } - None => (), // State not found + state.insert_single_key(Some(filter), Arc::clone(&acc_ref_key)); } } - *old_groups = filter_groups; + can_be_removed = can_be_removed && filter_groups.is_empty(); + // Insert new filters for this account + state.observed_filters.insert(*acc_ref_key, filter_groups); } - } - - pub fn remove_all( - &self, - key: &Pubkey, - commitment: Commitment, - filters: Option, - ) -> impl Iterator { - let iter = if let Entry::Occupied(mut entry) = self.map.entry((*key, filters.clone())) { - let state = entry.get_mut(); - let keys = state.take_commitment(commitment); - if state.is_empty() { - entry.remove(); - } else { - drop(entry); - } - if let Some(filters) = filters { - keys.as_ref().into_iter().flatten().for_each(|key| { - if let Some(mut set) = self.observed_filters.get_mut(&(**key, commitment)) { - set.remove(&filters); - } - }) - } - - let iter = keys.into_iter().flatten().map(|arc| { - let key = *arc; - drop(arc); - key - }); - - Either::Left(iter) - } else { - Either::Right(std::iter::empty()) - }; - metrics().program_account_entries.set(self.map.len() as i64); - iter + can_be_removed && !state.tracked_keys.contains(&acc_ref_key) } } @@ -256,7 +317,7 @@ impl AccountState { .account_bytes .sub(old.data.as_ref().map(|info| info.data.len()).unwrap_or(0) as i64); old.data = data.value; - old.slot = data.context.slot; + old.slot = old.slot.max(data.context.slot); old.refcount.clone() } else { let refcount = Arc::new(self.key); @@ -341,6 +402,14 @@ impl AccountsDb { self.slot[commitment.as_idx()].fetch_max(val, Ordering::AcqRel); } + pub fn get_owner(&self, key: &Pubkey, commitment: Commitment) -> Option { + let state = self.map.get(key)?; + state + .get(commitment) + .and_then(|(info, _)| info) + .map(|info| info.owner) + } + #[allow(unused)] pub fn get_slot(&self, commitment: Commitment) -> u64 { self.slot[commitment.as_idx()].load(Ordering::Acquire) @@ -463,8 +532,8 @@ impl From for [u8; 32] { } } +#[cfg(test)] impl Pubkey { - #[cfg(test)] fn zero() -> Self { Pubkey([0; 32]) } @@ -750,7 +819,7 @@ fn pooq() { #[test] fn db_refs() { - let programs = ProgramAccountsDb::new(); + let programs = ProgramAccountsDb::default(); let accounts = AccountsDb::new(); let acc_ref = accounts.insert( @@ -772,18 +841,21 @@ fn db_refs() { let mut set = HashSet::new(); set.insert(acc_ref); - programs.insert(Pubkey::zero(), set, Commitment::Confirmed, None); + + programs.insert((Pubkey::zero(), Commitment::Confirmed), set, None); assert_eq!(accounts.map.len(), 1); assert_eq!(programs.map.len(), 1); accounts.remove(&Pubkey::zero(), Commitment::Confirmed); assert_eq!(accounts.map.len(), 1); - let program_accounts = programs.remove_all(&Pubkey::zero(), Commitment::Confirmed, None); + let program_accounts = programs.remove_all(&(Pubkey::zero(), Commitment::Confirmed)); assert_eq!(programs.map.len(), 0); assert_eq!(accounts.map.len(), 1); - for key in program_accounts { + for arc in program_accounts { + let key = *arc; + drop(arc); accounts.remove(&key, Commitment::Confirmed); } assert_eq!(accounts.map.len(), 0);