Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wait for exectuting gpa requests, instead of making new ones #261

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cache-rpc"
version = "0.2.16"
version = "0.2.17"
authors = ["Alexander Polakov <a.polakov@iconic.vc>"]
edition = "2018"
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const DEFAULT_GAI_QUEUE_SIZE: usize = 2 << 19;
const DEFAULT_GPA_QUEUE_SIZE: usize = 2 << 18;
const DEFAULT_GAI_TIMEOUT: u64 = 30;
const DEFAULT_GPA_TIMEOUT: u64 = 60;
const IN_PROGRESS_REQUEST_WAIT_TIMEOUT: u64 = 60;
const DEFAULT_GAI_BACKOFF: u64 = 30;
const DEFAULT_GPA_BACKOFF: u64 = 60;
const PASSTHROUGH_BACKOFF: u64 = 30;
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async fn config_read_loop(path: PathBuf, rpc: Arc<watch::Sender<rpc::Config>>) {
async fn run(options: cli::Options) -> Result<()> {
let accounts = AccountsDb::new();
let program_accounts = ProgramAccountsDb::default();
let executing_gpa = Arc::default();

let rpc_slot = Arc::new(AtomicU64::new(0));
let _rpc_monitor = cache_rpc::rpc::monitor::RpcMonitor::init(
Expand Down Expand Up @@ -221,6 +222,7 @@ async fn run(options: cli::Options) -> Result<()> {
let state = rpc::state::State {
accounts: accounts.clone(),
program_accounts: program_accounts.clone(),
executing_gpa: Arc::clone(&executing_gpa),
client,
pubsub: pubsub.clone(),
rpc_url: rpc_url.clone(),
Expand Down
46 changes: 46 additions & 0 deletions src/rpc/cacheable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::Duration;

use prometheus::IntCounter;
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use tokio::sync::Notify;

use crate::metrics::rpc_metrics as metrics;
use crate::pubsub::subscription::{Subscription, SubscriptionActive};
Expand Down Expand Up @@ -53,6 +56,14 @@ pub(super) trait Cacheable: Sized + 'static {
// method to check whether cached entry has corresponding websocket subscription
fn has_active_subscription(&self, state: &State, owner: Option<Pubkey>) -> SubscriptionActive;

fn in_progress(&self, _: &State) -> Option<Arc<Notify>> {
None
}

fn track_execution(&self, _: &State) {}

fn finish_execution(&self, _: &State, _: bool) {}

fn get_from_cache<'a>(
&mut self, // gPA may modify internal state of the request object
id: &Id<'a>,
Expand Down Expand Up @@ -258,6 +269,41 @@ impl Cacheable for GetProgramAccounts {
})
}

fn in_progress(&self, state: &State) -> Option<Arc<Notify>> {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
let hash = hasher.finish();
state
.executing_gpa
.get(&hash)
.map(|r| Arc::clone(r.value()))
}

fn track_execution(&self, state: &State) {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
let hash = hasher.finish();
let progress = Arc::default();
if state.executing_gpa.insert(hash, progress).is_some() {
tracing::warn!(
"new execution tracking for gPA is being initialized while old one still exists"
);
}
}

fn finish_execution(&self, state: &State, notify: bool) {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
let hash = hasher.finish();
match state.executing_gpa.remove(&hash) {
Some((_, progress)) if notify => progress.notify_waiters(),
None => {
tracing::warn!(request=%self, "finished gPA execution that hasn't been tracked")
}
_ => (),
}
}

fn get_limit(state: &State) -> &SemaphoreQueue {
state.program_accounts_request_limit.as_ref()
}
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct Timeouts {
pub program_accounts_request: u64,
pub account_info_backoff: u64,
pub program_accounts_backoff: u64,
pub in_progress_request_wait: u64,
}

impl Default for Timeouts {
Expand All @@ -39,6 +40,7 @@ impl Default for Timeouts {
program_accounts_request: crate::DEFAULT_GPA_TIMEOUT,
account_info_backoff: crate::DEFAULT_GAI_BACKOFF,
program_accounts_backoff: crate::DEFAULT_GPA_BACKOFF,
in_progress_request_wait: crate::IN_PROGRESS_REQUEST_WAIT_TIMEOUT,
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/rpc/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use smallvec::SmallVec;
use std::fmt::{self, Display};
use std::hash::Hash;

use crate::filter::{Filter, Filters};
use crate::types::{AccountInfo, Commitment, Encoding, Pubkey, Slot, SolanaContext};
Expand Down Expand Up @@ -90,7 +91,17 @@ pub(super) struct ProgramAccountsConfig {
pub(super) with_context: Option<bool>,
}

#[derive(Debug, Deserialize, Clone)]
impl Hash for GetProgramAccounts {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.pubkey.hash(state);
self.config.filters.hash(state);
if let Some(ref c) = self.config.commitment {
c.commitment.hash(state);
}
}
}

#[derive(Debug, Deserialize, Clone, Hash)]
#[serde(from = "SmallVec<[Filter; 3]>")]
pub(super) enum MaybeFilters {
Valid(Filters),
Expand Down
52 changes: 49 additions & 3 deletions src/rpc/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use arc_swap::ArcSwap;
use awc::Client;
use backoff::backoff::Backoff;
use bytes::Bytes;
use dashmap::DashMap;
use futures_util::stream::{Stream, StreamExt};
use lru::LruCache;
use mlua::{Lua, LuaOptions, StdLib};
Expand Down Expand Up @@ -39,6 +40,7 @@ pub struct State {
pub accounts: AccountsDb,
pub program_accounts: ProgramAccountsDb,
pub client: Client,
pub executing_gpa: Arc<DashMap<u64, Arc<Notify>>>,
pub pubsub: PubSubManager,
pub rpc_url: String,
pub map_updated: Arc<Notify>,
Expand Down Expand Up @@ -368,6 +370,40 @@ impl State {
}
};

if can_use_cache {
if let Some(progress) = request.in_progress(&self) {
info!(
id=?raw_request.id,
"request is already in progress, waiting for cache"
);
let timeout =
Duration::from_secs(self.config.load().timeouts.in_progress_request_wait);
match tokio::time::timeout(timeout, progress.notified()).await {
Ok(_) => {
info!(
id=?raw_request.id,
"request finished, checking cache"
);
if let Some(data) =
request.get_from_cache(&raw_request.id, Arc::clone(&self), &xrid)
{
T::cache_hit_counter().inc();
T::cache_filled_counter().inc();
self.reset(request.sub_descriptor(), data.owner());
return data.map(|data| data.response);
}
}
Err(error) => {
error!(%error, id=?raw_request.id, "timeout on wait for executing request's response");
return Err(Error::Timeout(raw_request.id, xrid));
}
}
}
}

if is_cacheable {
request.track_execution(&self);
}
let wait_for_response = self.request(
&raw_request,
T::get_limit(&self),
Expand All @@ -393,6 +429,7 @@ impl State {
T::cache_hit_counter().inc();
T::cache_filled_counter().inc();
self.reset(request.sub_descriptor(), data.owner());
request.finish_execution(&self, false);
return data.map(|data| data.response);
}
continue;
Expand Down Expand Up @@ -429,6 +466,7 @@ impl State {
"cacheable request streaming error"
);
metrics().streaming_errors.inc();
request.finish_execution(&this, false);
Error::Streaming(error)
})?;
// client can be already dead
Expand All @@ -444,7 +482,7 @@ impl State {
let resp = serde_json::from_reader(bytes_chain)
.map(|wrap: Flatten<Response<T::ResponseData>>| wrap.inner);

match resp {
let notify = match resp {
Ok(Response::Result(data)) => {
let owner = data.owner();
if this.is_caching_allowed()
Expand All @@ -458,6 +496,9 @@ impl State {
);
this.map_updated.notify_waiters();
this.subscribe(request.sub_descriptor(), owner);
true
} else {
false
}
}
Ok(Response::Error(error)) => {
Expand All @@ -472,9 +513,14 @@ impl State {
?error,
"cannot cache request result, error during streaming"
);
false
}
Err(err) => request.handle_parse_error(err.into()),
}
Err(err) => {
request.handle_parse_error(err.into());
false
}
};
request.finish_execution(&this, notify);
Ok::<(), Error<'_>>(())
});
let receiver = tokio_stream::wrappers::ReceiverStream::new(receiver);
Expand Down