Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

federation: parallel sending per instance #4623

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
539f06a
federation: parallel sending
phiresky Apr 13, 2024
491daab
federation: some comments
phiresky Apr 13, 2024
987174a
lint and set force_write true when a request fails
phiresky Apr 15, 2024
a66aec6
inbox_urls return vec
phiresky Apr 15, 2024
a3d705f
split inbox functions into separate file
phiresky Apr 15, 2024
7eedcb7
cleanup
phiresky Apr 15, 2024
e719baf
extract sending task code to separate file
phiresky Apr 15, 2024
5e986ef
move federation concurrent config to config file
phiresky Apr 15, 2024
c1932f9
off by one issue
phiresky Apr 15, 2024
a7c7abd
improve msg
phiresky Apr 30, 2024
13ff059
fix both permanent stopping of federation queues and multiple creatio…
phiresky May 29, 2024
7cb4e82
Merge branch 'fix-dupe-activity-sending' into federation-send-parallel
phiresky May 29, 2024
10d3b7d
fix after merge
phiresky May 30, 2024
ffb99cd
Merge remote-tracking branch 'origin/main' into federation-send-parallel
phiresky May 30, 2024
a0b0a7a
lint fix
phiresky May 30, 2024
cdff275
Update crates/federate/src/send.rs
phiresky May 30, 2024
175133f
comment about reverse ordering
phiresky May 30, 2024
2acdc78
remove crashable, comment
phiresky May 30, 2024
9d87921
comment
phiresky May 30, 2024
5538794
move comment
phiresky May 30, 2024
7ee63f4
run federation tests twice
phiresky May 31, 2024
3784b7f
fix test run
phiresky May 31, 2024
c2d18d3
prettier
phiresky May 31, 2024
5a418ac
fix config default
phiresky May 31, 2024
c66bf26
upgrade rust to 1.78 to fix diesel cli
phiresky May 31, 2024
1c1018b
Merge remote-tracking branch 'origin/upgrade-rust' into federation-se…
phiresky May 31, 2024
101901b
fix clippy
phiresky May 31, 2024
dfccf3e
delay
phiresky Jun 3, 2024
2dd7b71
Merge branch 'main' into federation-send-parallel
phiresky Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ anyhow = { version = "1.0.81", features = [
diesel_ltree = "0.3.1"
typed-builder = "0.18.1"
serial_test = "2.0.0"
tokio = { version = "1.36.0", features = ["full"] }
tokio = { version = "1.37.0", features = ["full"] }
regex = "1.10.3"
once_cell = "1.19.0"
diesel-derive-newtype = "2.1.0"
Expand Down
9 changes: 5 additions & 4 deletions config/defaults.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@
port: 8536
# Whether the site is available over TLS. Needs to be true for federation to work.
tls_enabled: true
# The number of activitypub federation workers that can be in-flight concurrently
worker_count: 0
# The number of activitypub federation retry workers that can be in-flight concurrently
retry_count: 0
federation: {
# Limit to the number of concurrent outgoing federation requests per target instance.
# Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities per second) and if a receiving instance is not keeping up.
concurrent_sends_per_instance: 1
}
prometheus: {
bind: "127.0.0.1"
port: 10002
Expand Down
149 changes: 149 additions & 0 deletions crates/federate/src/inboxes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use crate::util::LEMMY_TEST_FAST_FEDERATION;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a note, i've not made any changes to this code, just moved it into a separate struct

use anyhow::Result;
use chrono::{DateTime, TimeZone, Utc};
use lemmy_db_schema::{
newtypes::{CommunityId, InstanceId},
source::{activity::SentActivity, site::Site},
utils::{ActualDbPool, DbPool},
};
use lemmy_db_views_actor::structs::CommunityFollowerView;
use once_cell::sync::Lazy;
use reqwest::Url;
use std::collections::{HashMap, HashSet};

/// interval with which new additions to community_followers are queried.
///
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears),
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url.
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate.
/// (see https://github.com/LemmyNet/lemmy/issues/3958)
phiresky marked this conversation as resolved.
Show resolved Hide resolved
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
} else {
chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
}
});
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance unfollows a specific remote community.
/// This is expected to happen pretty rarely and updating it in a timely manner is not too important.
static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds"));

pub(crate) struct CommunityInboxCollector {
// load site lazily because if an instance is first seen due to being on allowlist,
// the corresponding row in `site` may not exist yet since that is only added once
// `fetch_instance_actor_for_object` is called.
// (this should be unlikely to be relevant outside of the federation tests)
site_loaded: bool,
site: Option<Site>,
followed_communities: HashMap<CommunityId, HashSet<Url>>,
last_full_communities_fetch: DateTime<Utc>,
last_incremental_communities_fetch: DateTime<Utc>,
instance_id: InstanceId,
domain: String,
pool: ActualDbPool,
}
impl CommunityInboxCollector {
pub fn new(
pool: ActualDbPool,
instance_id: InstanceId,
domain: String,
) -> CommunityInboxCollector {
CommunityInboxCollector {
pool,
site_loaded: false,
site: None,
followed_communities: HashMap::new(),
last_full_communities_fetch: Utc.timestamp_nanos(0),
last_incremental_communities_fetch: Utc.timestamp_nanos(0),
instance_id,
domain,
}
}
/// get inbox urls of sending the given activity to the given instance
/// most often this will return 0 values (if instance doesn't care about the activity)
/// or 1 value (the shared inbox)
/// > 1 values only happens for non-lemmy software
pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result<Vec<Url>> {
let mut inbox_urls: HashSet<Url> = HashSet::new();

if activity.send_all_instances {
if !self.site_loaded {
self.site = Site::read_from_instance_id(&mut self.pool(), self.instance_id).await?;
self.site_loaded = true;
}
if let Some(site) = &self.site {
// Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine.
inbox_urls.insert(site.inbox_url.inner().clone());
}
}
if let Some(t) = &activity.send_community_followers_of {
if let Some(urls) = self.followed_communities.get(t) {
inbox_urls.extend(urls.iter().cloned());
}
}
inbox_urls.extend(
activity
.send_inboxes
.iter()
.filter_map(std::option::Option::as_ref)
.filter(|&u| (u.domain() == Some(&self.domain)))
.map(|u| u.inner().clone()),
);
Ok(inbox_urls.into_iter().collect())
}

pub async fn update_communities(&mut self) -> Result<()> {
if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY {
tracing::debug!("{}: fetching full list of communities", self.domain);
// process removals every hour
(self.followed_communities, self.last_full_communities_fetch) = self
.get_communities(self.instance_id, Utc.timestamp_nanos(0))
.await?;
self.last_incremental_communities_fetch = self.last_full_communities_fetch;
}
if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY {
// process additions every minute
let (news, time) = self
.get_communities(self.instance_id, self.last_incremental_communities_fetch)
.await?;
if !news.is_empty() {
tracing::debug!(
"{}: fetched {} incremental new followed communities",
self.domain,
news.len()
);
}
self.followed_communities.extend(news);
self.last_incremental_communities_fetch = time;
}
Ok(())
}

/// get a list of local communities with the remote inboxes on the given instance that cares about them
async fn get_communities(
&mut self,
instance_id: InstanceId,
last_fetch: DateTime<Utc>,
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
let new_last_fetch =
Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact
phiresky marked this conversation as resolved.
Show resolved Hide resolved
Ok((
CommunityFollowerView::get_instance_followed_community_inboxes(
&mut self.pool(),
instance_id,
last_fetch,
)
.await?
.into_iter()
.fold(HashMap::new(), |mut map, (c, u)| {
map.entry(c).or_default().insert(u.into());
map
}),
new_last_fetch,
))
}
fn pool(&self) -> DbPool<'_> {
DbPool::Pool(&self.pool)
}
}
52 changes: 29 additions & 23 deletions crates/federate/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::{util::CancellableTask, worker::InstanceWorker};
use activitypub_federation::config::FederationConfig;
use chrono::{Local, Timelike};
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_api_common::{
context::LemmyContext,
federate_retry_sleep_duration,
lemmy_utils::settings::structs::FederationWorkerConfig,
};
use lemmy_db_schema::{
newtypes::InstanceId,
source::{federation_queue_state::FederationQueueState, instance::Instance},
Expand All @@ -14,6 +18,8 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;

mod inboxes;
mod send;
mod util;
mod worker;

Expand All @@ -34,7 +40,8 @@ pub struct Opts {
async fn start_stop_federation_workers(
opts: Opts,
pool: ActualDbPool,
federation_config: FederationConfig<LemmyContext>,
federation_lib_config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut workers = HashMap::<InstanceId, CancellableTask>::new();
Expand All @@ -43,7 +50,9 @@ async fn start_stop_federation_workers(
let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver));
let pool2 = &mut DbPool::Pool(&pool);
let process_index = opts.process_index - 1;
let local_domain = federation_config.settings().get_hostname_without_port()?;
let local_domain = federation_lib_config
.settings()
.get_hostname_without_port()?;
loop {
let mut total_count = 0;
let mut dead_count = 0;
Expand Down Expand Up @@ -71,26 +80,19 @@ async fn start_stop_federation_workers(
continue;
}
// create new worker
let config = federation_config.clone();
let config = federation_lib_config.clone();
let stats_sender = stats_sender.clone();
let pool = pool.clone();
let federation_worker_config = federation_worker_config.clone();
workers.insert(
instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let instance = instance.clone();
let req_data = config.clone().to_request_data();
let stats_sender = stats_sender.clone();
let pool = pool.clone();
async move {
InstanceWorker::init_and_loop(
instance,
req_data,
&mut DbPool::Pool(&pool),
stop,
stats_sender,
)
.await
}
InstanceWorker::init_and_loop(
instance.clone(),
config.clone(),
federation_worker_config.clone(),
stop,
stats_sender.clone(),
)
}),
);
} else if !should_federate {
Expand Down Expand Up @@ -126,12 +128,16 @@ pub fn start_stop_federation_workers_cancellable(
opts: Opts,
pool: ActualDbPool,
config: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
let opts = opts.clone();
let pool = pool.clone();
let config = config.clone();
async move { start_stop_federation_workers(opts, pool, config, stop).await }
start_stop_federation_workers(
opts.clone(),
pool.clone(),
config.clone(),
federation_worker_config.clone(),
stop,
)
})
}

Expand Down