Skip to content

Commit

Permalink
Use max batch size for parent sync (cope with rate limits) (#680)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Bruijnzeels committed Nov 5, 2021
1 parent cc56e85 commit 7faded1
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 34 deletions.
154 changes: 154 additions & 0 deletions src/commons/api/ca.rs
Expand Up @@ -1262,6 +1262,59 @@ impl ParentStatuses {
self.0.iter()
}

/// Get the first synchronization candidates based on the following:
/// - take the given ca_parents for which no current status exists first
/// - then sort by last exchange, minute grade granularity - oldest first
/// - where failures come before success within the same minute
/// - then take the first N parents for this batch
pub fn sync_candidates(&self, ca_parents: Vec<&ParentHandle>, batch: usize) -> Vec<ParentHandle> {
let mut parents = vec![];

// Add any parent for which no current status is known to the candidate list first.
for parent in ca_parents {
if !self.0.contains_key(parent) {
parents.push(parent.clone());
}
}

// Then add the ones for which we do have a status, sorted by their
// last exchange.
let mut parents_by_last_exchange = self.sorted_by_last_exchange();
parents.append(&mut parents_by_last_exchange);

// But truncate to the specified batch size
parents.truncate(batch);

parents
}

// Return the parents sorted by last exchange, i.e. let the parents
// without an exchange be first, and then from longest ago to most recent.
// Uses minute grade granularity and in cases where the exchanges happened in
// the same minute failures take precedence (come before) successful exchanges.
pub fn sorted_by_last_exchange(&self) -> Vec<ParentHandle> {
let mut sorted_parents: Vec<(&ParentHandle, &ParentStatus)> = self.iter().collect();
sorted_parents.sort_by(|a, b| {
// we can map the 'no last exchange' case to 1970..
let a_last_exchange = a.1.last_exchange.as_ref();
let b_last_exchange = b.1.last_exchange.as_ref();

let a_last_exchange_time = a_last_exchange.map(|e| i64::from(e.timestamp)).unwrap_or(0) / 60;
let b_last_exchange_time = b_last_exchange.map(|e| i64::from(e.timestamp)).unwrap_or(0) / 60;

if a_last_exchange_time == b_last_exchange_time {
// compare success / failure
let a_last_exchange_res = a_last_exchange.map(|e| e.result().was_success()).unwrap_or(false);
let b_last_exchange_res = b_last_exchange.map(|e| e.result().was_success()).unwrap_or(false);
a_last_exchange_res.cmp(&b_last_exchange_res)
} else {
a_last_exchange_time.cmp(&b_last_exchange_time)
}
});

sorted_parents.into_iter().map(|(handle, _)| handle).cloned().collect()
}

pub fn set_failure(&mut self, parent: &ParentHandle, uri: &ServiceUri, error: ErrorResponse, next_seconds: i64) {
self.get_mut_status(parent)
.set_failure(uri.clone(), error, next_seconds);
Expand Down Expand Up @@ -1923,6 +1976,12 @@ impl From<Time> for Timestamp {
}
}

impl From<Timestamp> for i64 {
fn from(t: Timestamp) -> Self {
t.0
}
}

//--- Display

impl fmt::Display for Timestamp {
Expand Down Expand Up @@ -2906,4 +2965,99 @@ mod test {

assert!(old_krill_post_0_9_1.is_suspension_candidate(threshold_seconds));
}

#[test]
fn find_sync_candidates() {
let uri = ServiceUri::try_from("https://example.com/rfc6492/child/".to_string()).unwrap();

let in_five_seconds = Timestamp::now_plus_seconds(5);
let five_seconds_ago = Timestamp::now_minus_seconds(5);
let five_mins_ago = Timestamp::now_minus_seconds(300);

let p1_new_parent = ParentHandle::from_str("p1").unwrap();
let p2_new_parent = ParentHandle::from_str("p2").unwrap();
let p3_no_exchange = ParentHandle::from_str("p3").unwrap();
let p4_success = ParentHandle::from_str("p4").unwrap();
let p5_failure = ParentHandle::from_str("p5").unwrap();
let p6_success_long_ago = ParentHandle::from_str("p6").unwrap();

let p3_status_no_exchange = ParentStatus {
last_exchange: None,
last_success: None,
next_exchange_before: in_five_seconds,
all_resources: ResourceSet::default(),
entitlements: HashMap::new(),
};

let p4_status_success = ParentStatus {
last_exchange: Some(ParentExchange {
timestamp: five_seconds_ago,
uri: uri.clone(),
result: ExchangeResult::Success,
}),
last_success: None,
next_exchange_before: in_five_seconds,
all_resources: ResourceSet::default(),
entitlements: HashMap::new(),
};

let p5_status_failure = ParentStatus {
last_exchange: Some(ParentExchange {
timestamp: five_seconds_ago,
uri: uri.clone(),
result: ExchangeResult::Failure(ErrorResponse::new("err", "err!")),
}),
last_success: None,
next_exchange_before: in_five_seconds,
all_resources: ResourceSet::default(),
entitlements: HashMap::new(),
};

let p6_status_success_long_ago = ParentStatus {
last_exchange: Some(ParentExchange {
timestamp: five_mins_ago,
uri: uri.clone(),
result: ExchangeResult::Success,
}),
last_success: None,
next_exchange_before: in_five_seconds,
all_resources: ResourceSet::default(),
entitlements: HashMap::new(),
};

let mut inner_statuses = HashMap::new();
inner_statuses.insert(p3_no_exchange.clone(), p3_status_no_exchange);
inner_statuses.insert(p4_success.clone(), p4_status_success);
inner_statuses.insert(p5_failure.clone(), p5_status_failure);
inner_statuses.insert(p6_success_long_ago.clone(), p6_status_success_long_ago);

let parent_statuses = ParentStatuses(inner_statuses);

let ca_parents = vec![
&p1_new_parent,
&p2_new_parent,
&p3_no_exchange,
&p4_success,
&p5_failure,
&p6_success_long_ago,
];

let candidates = parent_statuses.sync_candidates(ca_parents.clone(), 10);

let expected = vec![
p1_new_parent.clone(),
p2_new_parent.clone(),
p3_no_exchange.clone(),
p6_success_long_ago.clone(),
p5_failure.clone(),
p4_success.clone(),
];

assert_eq!(candidates, expected);

let candidates_trimmed = parent_statuses.sync_candidates(ca_parents, 1);
let expected_trimmed = vec![p1_new_parent];

assert_eq!(candidates_trimmed, expected_trimmed);
}
}
4 changes: 2 additions & 2 deletions src/commons/util/dummysigner.rs
Expand Up @@ -10,7 +10,7 @@ impl Signer for DummySigner {
type KeyId = KeyIdentifier;
type Error = SignerError;

fn create_key(&mut self, _: PublicKeyFormat) -> Result<Self::KeyId, Self::Error> {
fn create_key(&self, _: PublicKeyFormat) -> Result<Self::KeyId, Self::Error> {
unreachable!()
}

Expand All @@ -21,7 +21,7 @@ impl Signer for DummySigner {
unreachable!()
}

fn destroy_key(&mut self, _: &Self::KeyId) -> Result<(), rpki::repository::crypto::signer::KeyError<Self::Error>> {
fn destroy_key(&self, _: &Self::KeyId) -> Result<(), rpki::repository::crypto::signer::KeyError<Self::Error>> {
unreachable!()
}

Expand Down
4 changes: 4 additions & 0 deletions src/daemon/ca/certauth.rs
Expand Up @@ -1176,6 +1176,10 @@ impl CertAuth {
self.parents.keys()
}

pub fn nr_parents(&self) -> usize {
self.parents.len()
}

pub fn parent_known(&self, parent: &ParentHandle) -> bool {
self.parents.contains_key(parent)
}
Expand Down
100 changes: 68 additions & 32 deletions src/daemon/ca/manager.rs
Expand Up @@ -819,8 +819,12 @@ impl CaManager {

/// Refresh a single CA with its parents, and possibly suspend inactive children.
pub async fn cas_refresh_single(&self, ca_handle: Handle, started: Timestamp, actor: &Actor) {
let mut updates = vec![];
self.ca_sync_parents(&ca_handle, actor).await;
self.ca_suspend_inactive_children(&ca_handle, started, actor).await;
}

/// Suspend child CAs
async fn ca_suspend_inactive_children(&self, ca_handle: &Handle, started: Timestamp, actor: &Actor) {
// Set threshold hours if it was configured AND this server has been started
// longer ago than the hours specified. Otherwise we risk that *all* children
// without prior recorded status are suspended on upgrade, or that *all* children
Expand All @@ -830,46 +834,78 @@ impl CaManager {
.suspend_child_after_inactive_seconds()
.filter(|secs| started < Timestamp::now_minus_seconds(*secs));

if let Ok(ca) = self.get_ca(&ca_handle).await {
for parent in ca.parents() {
updates.push(self.ca_sync_parent_infallible(ca_handle.clone(), parent.clone(), actor.clone()));
}
// suspend inactive children, if so configured
if let Some(threshold_seconds) = threshold_seconds {
if let Ok(ca_status) = self.get_ca_status(ca_handle).await {
let connections = ca_status.get_children_connection_stats();

for child in connections.suspension_candidates(threshold_seconds) {
let threshold_string = if threshold_seconds >= 3600 {
format!("{} hours", threshold_seconds / 3600)
} else {
format!("{} seconds", threshold_seconds)
};

info!(
"Child '{}' under CA '{}' was inactive for more than {}. Will suspend it.",
child, ca_handle, threshold_string
);
if let Err(e) = self
.status_store
.lock()
.await
.set_child_suspended(ca_handle, &child)
.await
{
panic!("System level error encountered while updating ca status: {}", e);
}

if let Some(threshold_seconds) = threshold_seconds {
if let Ok(ca_status) = self.get_ca_status(&ca_handle).await {
let connections = ca_status.get_children_connection_stats();
let req = UpdateChildRequest::suspend();
if let Err(e) = self.ca_child_update(ca_handle, child, req, actor).await {
error!("Could not suspend inactive child, error: {}", e);
}
}
}
}
}

for child in connections.suspension_candidates(threshold_seconds) {
let threshold_string = if threshold_seconds >= 3600 {
format!("{} hours", threshold_seconds / 3600)
} else {
format!("{} seconds", threshold_seconds)
};
/// Synchronizes a CA with its parents - up to the configures batch size.
/// Remaining parents will be done in a future run.
async fn ca_sync_parents(&self, ca_handle: &Handle, actor: &Actor) {
let mut updates = vec![];

info!(
"Child '{}' under CA '{}' was inactive for more than {}. Will suspend it.",
child, ca_handle, threshold_string
);
if let Err(e) = self
.status_store
.lock()
.await
.set_child_suspended(&ca_handle, &child)
.await
{
if let Ok(ca) = self.get_ca(ca_handle).await {
// get updates from parents
{
if ca.nr_parents() <= self.config.ca_refresh_parents_batch_size {
// Nr of parents is below batch size, so just process all of them
for parent in ca.parents() {
updates.push(self.ca_sync_parent_infallible(ca_handle.clone(), parent.clone(), actor.clone()));
}
} else {
// more parents than the batch size exist, so get candidates based on
// the known parent statuses for this CA.
match self.status_store.lock().await.get_ca_status(ca_handle).await {
Err(e) => {
panic!("System level error encountered while updating ca status: {}", e);
}

let req = UpdateChildRequest::suspend();
if let Err(e) = self.ca_child_update(&ca_handle, child, req, actor).await {
error!("Could not suspend inactive child, error: {}", e);
Ok(status) => {
for parent in status
.parents()
.sync_candidates(ca.parents().collect(), self.config.ca_refresh_parents_batch_size)
{
updates.push(self.ca_sync_parent_infallible(
ca_handle.clone(),
parent.clone(),
actor.clone(),
));
}
}
}
};
}
}
join_all(updates).await;
}

join_all(updates).await;
}

/// Synchronizes a CA with a parent, logging failures.
Expand Down
9 changes: 9 additions & 0 deletions src/daemon/config.rs
Expand Up @@ -109,6 +109,10 @@ impl ConfigDefaults {
600
}

fn ca_refresh_parents_batch_size() -> usize {
25
}

fn post_limit_api() -> u64 {
256 * 1024 // 256kB
}
Expand Down Expand Up @@ -262,6 +266,9 @@ pub struct Config {
#[serde(default = "ConfigDefaults::ca_refresh_seconds", alias = "ca_refresh")]
pub ca_refresh_seconds: u32,

#[serde(default = "ConfigDefaults::ca_refresh_parents_batch_size")]
pub ca_refresh_parents_batch_size: usize,

#[serde(skip)]
suspend_child_after_inactive_seconds: Option<i64>,
suspend_child_after_inactive_hours: Option<i64>,
Expand Down Expand Up @@ -557,6 +564,7 @@ impl Config {
#[cfg(feature = "multi-user")]
let auth_openidconnect = None;
let ca_refresh_seconds = if enable_ca_refresh { 1 } else { 86400 };
let ca_refresh_parents_batch_size = 10;
let post_limit_api = ConfigDefaults::post_limit_api();
let post_limit_rfc8181 = ConfigDefaults::post_limit_rfc8181();
let rfc8181_log_dir = {
Expand Down Expand Up @@ -653,6 +661,7 @@ impl Config {
#[cfg(feature = "multi-user")]
auth_openidconnect,
ca_refresh_seconds,
ca_refresh_parents_batch_size,
suspend_child_after_inactive_seconds,
suspend_child_after_inactive_hours: None,
post_limit_api,
Expand Down

0 comments on commit 7faded1

Please sign in to comment.