Skip to content

Commit

Permalink
feat: wait for exectuting gpa requests, instead of making new ones (#261
Browse files Browse the repository at this point in the history
)
  • Loading branch information
bmuddha authored and Babur Makhmudov committed May 26, 2022
1 parent d93a799 commit ec464e3
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cache-rpc"
version = "0.2.16"
version = "0.2.17"
authors = ["Alexander Polakov <a.polakov@iconic.vc>"]
edition = "2018"
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const DEFAULT_GAI_QUEUE_SIZE: usize = 2 << 19;
const DEFAULT_GPA_QUEUE_SIZE: usize = 2 << 18;
const DEFAULT_GAI_TIMEOUT: u64 = 30;
const DEFAULT_GPA_TIMEOUT: u64 = 60;
const IN_PROGRESS_REQUEST_WAIT_TIMEOUT: u64 = 60;
const DEFAULT_GAI_BACKOFF: u64 = 30;
const DEFAULT_GPA_BACKOFF: u64 = 60;
const PASSTHROUGH_BACKOFF: u64 = 30;
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async fn config_read_loop(path: PathBuf, rpc: Arc<watch::Sender<rpc::Config>>) {
async fn run(options: cli::Options) -> Result<()> {
let accounts = AccountsDb::new();
let program_accounts = ProgramAccountsDb::default();
let executing_gpa = Arc::default();

let rpc_slot = Arc::new(AtomicU64::new(0));
let _rpc_monitor = cache_rpc::rpc::monitor::RpcMonitor::init(
Expand Down Expand Up @@ -221,6 +222,7 @@ async fn run(options: cli::Options) -> Result<()> {
let state = rpc::state::State {
accounts: accounts.clone(),
program_accounts: program_accounts.clone(),
executing_gpa: Arc::clone(&executing_gpa),
client,
pubsub: pubsub.clone(),
rpc_url: rpc_url.clone(),
Expand Down
46 changes: 46 additions & 0 deletions src/rpc/cacheable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::Duration;

use prometheus::IntCounter;
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use tokio::sync::Notify;

use crate::metrics::rpc_metrics as metrics;
use crate::pubsub::subscription::{Subscription, SubscriptionActive};
Expand Down Expand Up @@ -53,6 +56,14 @@ pub(super) trait Cacheable: Sized + 'static {
// method to check whether cached entry has corresponding websocket subscription
fn has_active_subscription(&self, state: &State, owner: Option<Pubkey>) -> SubscriptionActive;

fn in_progress(&self, _: &State) -> Option<Arc<Notify>> {
None
}

fn track_execution(&self, _: &State) {}

fn finish_execution(&self, _: &State, _: bool) {}

fn get_from_cache<'a>(
&mut self, // gPA may modify internal state of the request object
id: &Id<'a>,
Expand Down Expand Up @@ -258,6 +269,41 @@ impl Cacheable for GetProgramAccounts {
})
}

fn in_progress(&self, state: &State) -> Option<Arc<Notify>> {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
let hash = hasher.finish();
state
.executing_gpa
.get(&hash)
.map(|r| Arc::clone(r.value()))
}

fn track_execution(&self, state: &State) {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
let hash = hasher.finish();
let progress = Arc::default();
if state.executing_gpa.insert(hash, progress).is_some() {
tracing::warn!(
"new execution tracking for gPA is being initialized while old one still exists"
);
}
}

fn finish_execution(&self, state: &State, notify: bool) {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
let hash = hasher.finish();
match state.executing_gpa.remove(&hash) {
Some((_, progress)) if notify => progress.notify_waiters(),
None => {
tracing::warn!(request=%self, "finished gPA execution that hasn't been tracked")
}
_ => (),
}
}

fn get_limit(state: &State) -> &SemaphoreQueue {
state.program_accounts_request_limit.as_ref()
}
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct Timeouts {
pub program_accounts_request: u64,
pub account_info_backoff: u64,
pub program_accounts_backoff: u64,
pub in_progress_request_wait: u64,
}

impl Default for Timeouts {
Expand All @@ -39,6 +40,7 @@ impl Default for Timeouts {
program_accounts_request: crate::DEFAULT_GPA_TIMEOUT,
account_info_backoff: crate::DEFAULT_GAI_BACKOFF,
program_accounts_backoff: crate::DEFAULT_GPA_BACKOFF,
in_progress_request_wait: crate::IN_PROGRESS_REQUEST_WAIT_TIMEOUT,
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/rpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use smallvec::SmallVec;
use std::fmt::{self, Display};
use std::hash::Hash;

use crate::filter::{Filter, Filters};
use crate::types::{AccountInfo, Commitment, Encoding, Pubkey, Slot, SolanaContext};
Expand Down Expand Up @@ -90,7 +91,17 @@ pub(super) struct ProgramAccountsConfig {
pub(super) with_context: Option<bool>,
}

#[derive(Debug, Deserialize, Clone)]
impl Hash for GetProgramAccounts {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.pubkey.hash(state);
self.config.filters.hash(state);
if let Some(ref c) = self.config.commitment {
c.commitment.hash(state);
}
}
}

#[derive(Debug, Deserialize, Clone, Hash)]
#[serde(from = "SmallVec<[Filter; 3]>")]
pub(super) enum MaybeFilters {
Valid(Filters),
Expand Down
52 changes: 49 additions & 3 deletions src/rpc/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use arc_swap::ArcSwap;
use awc::Client;
use backoff::backoff::Backoff;
use bytes::Bytes;
use dashmap::DashMap;
use futures_util::stream::{Stream, StreamExt};
use lru::LruCache;
use mlua::{Lua, LuaOptions, StdLib};
Expand Down Expand Up @@ -39,6 +40,7 @@ pub struct State {
pub accounts: AccountsDb,
pub program_accounts: ProgramAccountsDb,
pub client: Client,
pub executing_gpa: Arc<DashMap<u64, Arc<Notify>>>,
pub pubsub: PubSubManager,
pub rpc_url: String,
pub map_updated: Arc<Notify>,
Expand Down Expand Up @@ -368,6 +370,40 @@ impl State {
}
};

if can_use_cache {
if let Some(progress) = request.in_progress(&self) {
info!(
id=?raw_request.id,
"request is already in progress, waiting for cache"
);
let timeout =
Duration::from_secs(self.config.load().timeouts.in_progress_request_wait);
match tokio::time::timeout(timeout, progress.notified()).await {
Ok(_) => {
info!(
id=?raw_request.id,
"request finished, checking cache"
);
if let Some(data) =
request.get_from_cache(&raw_request.id, Arc::clone(&self), &xrid)
{
T::cache_hit_counter().inc();
T::cache_filled_counter().inc();
self.reset(request.sub_descriptor(), data.owner());
return data.map(|data| data.response);
}
}
Err(error) => {
error!(%error, id=?raw_request.id, "timeout on wait for executing request's response");
return Err(Error::Timeout(raw_request.id, xrid));
}
}
}
}

if is_cacheable {
request.track_execution(&self);
}
let wait_for_response = self.request(
&raw_request,
T::get_limit(&self),
Expand All @@ -393,6 +429,7 @@ impl State {
T::cache_hit_counter().inc();
T::cache_filled_counter().inc();
self.reset(request.sub_descriptor(), data.owner());
request.finish_execution(&self, false);
return data.map(|data| data.response);
}
continue;
Expand Down Expand Up @@ -429,6 +466,7 @@ impl State {
"cacheable request streaming error"
);
metrics().streaming_errors.inc();
request.finish_execution(&this, false);
Error::Streaming(error)
})?;
// client can be already dead
Expand All @@ -444,7 +482,7 @@ impl State {
let resp = serde_json::from_reader(bytes_chain)
.map(|wrap: Flatten<Response<T::ResponseData>>| wrap.inner);

match resp {
let notify = match resp {
Ok(Response::Result(data)) => {
let owner = data.owner();
if this.is_caching_allowed()
Expand All @@ -458,6 +496,9 @@ impl State {
);
this.map_updated.notify_waiters();
this.subscribe(request.sub_descriptor(), owner);
true
} else {
false
}
}
Ok(Response::Error(error)) => {
Expand All @@ -472,9 +513,14 @@ impl State {
?error,
"cannot cache request result, error during streaming"
);
false
}
Err(err) => request.handle_parse_error(err.into()),
}
Err(err) => {
request.handle_parse_error(err.into());
false
}
};
request.finish_execution(&this, notify);
Ok::<(), Error<'_>>(())
});
let receiver = tokio_stream::wrappers::ReceiverStream::new(receiver);
Expand Down

0 comments on commit ec464e3

Please sign in to comment.