Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix import blocked objects #4712

Merged
merged 9 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
232 changes: 111 additions & 121 deletions crates/apub/src/api/user_settings_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use crate::objects::{
person::ApubPerson,
post::ApubPost,
};
use activitypub_federation::{config::Data, fetch::object_id::ObjectId};
use activitypub_federation::{config::Data, fetch::object_id::ObjectId, traits::Object};
use actix_web::web::Json;
use futures::{future::try_join_all, StreamExt};
use itertools::Itertools;
use lemmy_api_common::{context::LemmyContext, SuccessResponse};
use lemmy_db_schema::{
newtypes::DbUrl,
Expand All @@ -30,8 +31,11 @@ use lemmy_utils::{
spawn_try_task,
};
use serde::{Deserialize, Serialize};
use std::future::Future;
use tracing::info;

const PARALLELISM: usize = 10;

/// Backup of user data. This struct should never be changed so that the data can be used as a
/// long-term backup in case the instance goes down unexpectedly. All fields are optional to allow
/// importing partial backups.
Expand Down Expand Up @@ -167,141 +171,91 @@ pub async fn import_settings(
}

spawn_try_task(async move {
const PARALLELISM: usize = 10;
let person_id = local_user_view.person.id;

// These tasks fetch objects from remote instances which might be down.
// TODO: Would be nice if we could send a list of failed items with api response, but then
// the request would likely timeout.
let mut failed_items = vec![];

info!(
"Starting settings backup for {}",
"Starting settings import for {}",
local_user_view.person.name
);

futures::stream::iter(
data
.followed_communities
.clone()
.into_iter()
// reset_request_count works like clone, and is necessary to avoid running into request limit
.map(|f| (f, context.reset_request_count()))
.map(|(followed, context)| async move {
// need to reset outgoing request count to avoid running into limit
let community = followed.dereference(&context).await?;
let form = CommunityFollowerForm {
person_id,
community_id: community.id,
pending: true,
};
CommunityFollower::follow(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}),
)
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r)| {
if let Err(e) = r {
failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone()));
info!("Failed to import followed community: {e}");
}
});

futures::stream::iter(
data
.saved_posts
.clone()
.into_iter()
.map(|s| (s, context.reset_request_count()))
.map(|(saved, context)| async move {
let post = saved.dereference(&context).await?;
let form = PostSavedForm {
person_id,
post_id: post.id,
};
PostSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}),
let failed_followed_communities = fetch_and_import(
data.followed_communities.clone(),
&context,
|(followed, context)| async move {
let community = followed.dereference(&context).await?;
let form = CommunityFollowerForm {
person_id,
community_id: community.id,
pending: true,
};
CommunityFollower::follow(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r)| {
if let Err(e) = r {
failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone()));
info!("Failed to import saved post community: {e}");
}
});

futures::stream::iter(
data
.saved_comments
.clone()
.into_iter()
.map(|s| (s, context.reset_request_count()))
.map(|(saved, context)| async move {
let comment = saved.dereference(&context).await?;
let form = CommentSavedForm {
person_id,
comment_id: comment.id,
};
CommentSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}),
.await?;

let failed_saved_posts = fetch_and_import(
data.saved_posts.clone(),
&context,
|(saved, context)| async move {
let post = saved.dereference(&context).await?;
let form = PostSavedForm {
person_id,
post_id: post.id,
};
PostSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r)| {
if let Err(e) = r {
failed_items.push(data.followed_communities.get(i).map(|u| u.inner().clone()));
info!("Failed to import saved comment community: {e}");
}
});
.await?;

let failed_items: Vec<_> = failed_items.into_iter().flatten().collect();
info!(
"Finished settings backup for {}, failed items: {:#?}",
local_user_view.person.name, failed_items
);
let failed_saved_comments = fetch_and_import(
data.saved_comments.clone(),
&context,
|(saved, context)| async move {
let comment = saved.dereference(&context).await?;
let form = CommentSavedForm {
person_id,
comment_id: comment.id,
};
CommentSaved::save(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.await?;

// These tasks don't connect to any remote instances but only insert directly in the database.
// That means the only error condition are db connection failures, so no extra error handling is
// needed.
try_join_all(data.blocked_communities.iter().map(|blocked| async {
// dont fetch unknown blocked objects from home server
let community = blocked.dereference_local(&context).await?;
let form = CommunityBlockForm {
person_id,
community_id: community.id,
};
CommunityBlock::block(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}))
let failed_community_blocks = fetch_and_import(
data.blocked_communities.clone(),
&context,
|(blocked, context)| async move {
let community = blocked.dereference(&context).await?;
let form = CommunityBlockForm {
person_id,
community_id: community.id,
};
CommunityBlock::block(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.await?;

try_join_all(data.blocked_users.iter().map(|blocked| async {
// dont fetch unknown blocked objects from home server
let target = blocked.dereference_local(&context).await?;
let form = PersonBlockForm {
person_id,
target_id: target.id,
};
PersonBlock::block(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
}))
let failed_user_blocks = fetch_and_import(
data.blocked_users.clone(),
&context,
|(blocked, context)| async move {
let context = context.reset_request_count();
let target = blocked.dereference(&context).await?;
let form = PersonBlockForm {
person_id,
target_id: target.id,
};
PersonBlock::block(&mut context.pool(), &form).await?;
LemmyResult::Ok(())
},
)
.await?;

try_join_all(data.blocked_instances.iter().map(|domain| async {
// dont fetch unknown blocked objects from home server
let instance = Instance::read_or_create(&mut context.pool(), domain.clone()).await?;
let form = InstanceBlockForm {
person_id,
Expand All @@ -312,12 +266,48 @@ pub async fn import_settings(
}))
.await?;

info!("Settings import completed for {}, the following items failed: {failed_followed_communities}, {failed_saved_posts}, {failed_saved_comments}, {failed_community_blocks}, {failed_user_blocks}",
local_user_view.person.name);

Ok(())
});

Ok(Json(Default::default()))
}

async fn fetch_and_import<Kind, Fut>(
objects: Vec<ObjectId<Kind>>,
context: &Data<LemmyContext>,
import_fn: impl FnMut((ObjectId<Kind>, Data<LemmyContext>)) -> Fut,
) -> LemmyResult<String>
where
Kind: Object + Send + 'static,
for<'de2> <Kind as Object>::Kind: Deserialize<'de2>,
Fut: Future<Output = LemmyResult<()>>,
{
let mut failed_items = vec![];
futures::stream::iter(
objects
.clone()
.into_iter()
// need to reset outgoing request count to avoid running into limit
.map(|s| (s, context.reset_request_count()))
.map(import_fn),
)
.buffer_unordered(PARALLELISM)
.collect::<Vec<_>>()
.await
.into_iter()
.enumerate()
.for_each(|(i, r): (usize, LemmyResult<()>)| {
if r.is_err() {
if let Some(object) = objects.get(i) {
failed_items.push(object.inner().clone());
}
}
});
Ok(failed_items.into_iter().join(","))
}
#[cfg(test)]
#[allow(clippy::indexing_slicing)]
mod tests {
Expand Down
4 changes: 3 additions & 1 deletion crates/apub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ pub(crate) mod mentions;
pub mod objects;
pub mod protocol;

pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 50;
/// Maximum number of outgoing HTTP requests to fetch a single object. Needs to be high enough
/// to fetch a new community with posts, moderators and featured posts.
pub const FEDERATION_HTTP_FETCH_LIMIT: u32 = 100;

/// Only include a basic context to save space and bandwidth. The main context is hosted statically
/// on join-lemmy.org. Include activitystreams explicitly for better compat, but this could
Expand Down