Skip to content

Commit

Permalink
use filter on insert
Browse files Browse the repository at this point in the history
  • Loading branch information
Nutomic committed Apr 3, 2024
1 parent 240294f commit 41bac92
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 99 deletions.
20 changes: 10 additions & 10 deletions crates/apub/src/objects/comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
activities::{verify_is_public, verify_person_in_community},
check_apub_id_valid_with_strictness,
mentions::collect_non_local_mentions,
objects::{read_from_string_or_source, verify_is_remote_object, verify_object_timestamp},
objects::{read_from_string_or_source, verify_is_remote_object},
protocol::{
objects::{note::Note, LanguageTag},
InCommunity,
Expand All @@ -29,6 +29,7 @@ use lemmy_db_schema::{
post::Post,
},
traits::Crud,
utils::naive_now,
};
use lemmy_utils::{
error::{LemmyError, LemmyErrorType},
Expand Down Expand Up @@ -142,14 +143,6 @@ impl Object for ApubComment {
verify_is_remote_object(note.id.inner(), context.settings())?;
verify_person_in_community(&note.attributed_to, &community, context).await?;

let old_comment = note.id.dereference_local(context).await;
let old_timestamp = old_comment
.as_ref()
.map(|c| c.updated.unwrap_or(c.published))
.clone()
.ok();
verify_object_timestamp(old_timestamp, note.updated.or(note.published))?;

let (post, _) = note.get_parents(context).await?;
let creator = note.attributed_to.dereference(context).await?;
let is_mod_or_admin = is_mod_or_admin(&mut context.pool(), &creator, community.id)
Expand Down Expand Up @@ -193,7 +186,14 @@ impl Object for ApubComment {
language_id,
};
let parent_comment_path = parent_comment.map(|t| t.0.path);
let comment = Comment::create(&mut context.pool(), &form, parent_comment_path.as_ref()).await?;
let timestamp = note.updated.or(note.published).unwrap_or_else(naive_now);
let comment = Comment::insert_apub(
&mut context.pool(),
timestamp,
&form,
parent_comment_path.as_ref(),
)
.await?;
Ok(comment.into())
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/apub/src/objects/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ impl Object for ApubCommunity {
let languages =
LanguageTag::to_language_id_multiple(group.language, &mut context.pool()).await?;

let community = Community::create(&mut context.pool(), &form).await?;
let timestamp = group.updated.or(group.published).unwrap_or_else(naive_now);
let community = Community::insert_apub(&mut context.pool(), timestamp, &form).await?;
CommunityLanguage::update(&mut context.pool(), languages, community.id).await?;

let community: ApubCommunity = community.into();
Expand Down
32 changes: 0 additions & 32 deletions crates/apub/src/objects/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::protocol::Source;
use activitypub_federation::protocol::values::MediaTypeMarkdownOrHtml;
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use html2md::parse_html;
use lemmy_utils::{error::LemmyError, settings::structs::Settings};
use url::Url;
Expand Down Expand Up @@ -52,34 +51,3 @@ pub(crate) fn verify_is_remote_object(id: &Url, settings: &Settings) -> Result<(
Ok(())
}
}

/// When receiving a federated object, check that the timestamp is newer than the latest version stored
/// locally. Necessary to reject edits which are received in wrong order.
pub(crate) fn verify_object_timestamp(
old_timestamp: Option<DateTime<Utc>>,
new_timestamp: Option<DateTime<Utc>>,
) -> Result<(), LemmyError> {
if let (Some(old_timestamp), Some(new_timestamp)) = (old_timestamp, new_timestamp) {
if new_timestamp < old_timestamp {
return Err(anyhow!("Ignoring old object edit").into());
}
}
Ok(())
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use chrono::TimeZone;

#[test]
fn test_verify_object_timestamp() {
let old = Some(Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap());
let new = Some(Utc.with_ymd_and_hms(2024, 2, 1, 0, 0, 0).unwrap());

assert!(verify_object_timestamp(old, new).is_ok());
assert!(verify_object_timestamp(None, new).is_ok());
assert!(verify_object_timestamp(old, None).is_ok());
assert!(verify_object_timestamp(new, old).is_err());
}
}
13 changes: 4 additions & 9 deletions crates/apub/src/objects/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
activities::{verify_is_public, verify_person_in_community},
check_apub_id_valid_with_strictness,
local_site_data_cached,
objects::{read_from_string_or_source_opt, verify_is_remote_object, verify_object_timestamp},
objects::{read_from_string_or_source_opt, verify_is_remote_object},
protocol::{
objects::{
page::{Attachment, AttributedTo, Hashtag, HashtagType, Page, PageType},
Expand Down Expand Up @@ -41,6 +41,7 @@ use lemmy_db_schema::{
post::{Post, PostInsertForm, PostUpdateForm},
},
traits::Crud,
utils::naive_now,
};
use lemmy_db_views_actor::structs::CommunityModeratorView;
use lemmy_utils::{
Expand Down Expand Up @@ -216,13 +217,6 @@ impl Object for ApubPost {
// read existing, local post if any (for generating mod log)
let old_post = page.id.dereference_local(context).await;

let old_timestamp = old_post
.as_ref()
.map(|p| p.updated.unwrap_or(p.published))
.clone()
.ok();
verify_object_timestamp(old_timestamp, page.updated.or(page.published))?;

let first_attachment = page.attachment.first();
let local_site = LocalSite::read(&mut context.pool()).await.ok();

Expand Down Expand Up @@ -277,7 +271,8 @@ impl Object for ApubPost {
.build()
};

let post = Post::create(&mut context.pool(), &form).await?;
let timestamp = page.updated.or(page.published).unwrap_or_else(naive_now);
let post = Post::insert_apub(&mut context.pool(), timestamp, &form).await?;

generate_post_link_metadata(
post.clone(),
Expand Down
14 changes: 4 additions & 10 deletions crates/apub/src/objects/private_message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
check_apub_id_valid_with_strictness,
objects::{read_from_string_or_source, verify_object_timestamp},
objects::read_from_string_or_source,
protocol::{
objects::chat_message::{ChatMessage, ChatMessageType},
Source,
Expand All @@ -23,6 +23,7 @@ use lemmy_db_schema::{
private_message::{PrivateMessage, PrivateMessageInsertForm},
},
traits::Crud,
utils::naive_now,
};
use lemmy_utils::{
error::{LemmyError, LemmyErrorType},
Expand Down Expand Up @@ -105,14 +106,6 @@ impl Object for ApubPrivateMessage {
verify_domains_match(note.id.inner(), expected_domain)?;
verify_domains_match(note.attributed_to.inner(), note.id.inner())?;

let old_pm = note.id.dereference_local(context).await;
let old_timestamp = old_pm
.as_ref()
.map(|c| c.updated.unwrap_or(c.published))
.clone()
.ok();
verify_object_timestamp(old_timestamp, note.updated.or(note.published))?;

check_apub_id_valid_with_strictness(note.id.inner(), false, context).await?;
let person = note.attributed_to.dereference(context).await?;
if person.banned {
Expand Down Expand Up @@ -150,7 +143,8 @@ impl Object for ApubPrivateMessage {
ap_id: Some(note.id.into()),
local: Some(false),
};
let pm = PrivateMessage::create(&mut context.pool(), &form).await?;
let timestamp = note.updated.or(note.published).unwrap_or_else(naive_now);
let pm = PrivateMessage::insert_apub(&mut context.pool(), timestamp, &form).await?;
Ok(pm.into())
}
}
Expand Down
10 changes: 1 addition & 9 deletions crates/apub/src/protocol/objects/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
community_outbox::ApubCommunityOutbox,
},
local_site_data_cached,
objects::{community::ApubCommunity, read_from_string_or_source_opt, verify_object_timestamp},
objects::{community::ApubCommunity, read_from_string_or_source_opt},
protocol::{
objects::{Endpoints, LanguageTag},
ImageObject,
Expand Down Expand Up @@ -89,14 +89,6 @@ impl Group {
let description = read_from_string_or_source_opt(&self.summary, &None, &self.source);
check_slurs_opt(&description, slur_regex)?;

let old_communmity = self.id.dereference_local(context).await;
let old_timestamp = old_communmity
.as_ref()
.map(|c| c.updated.unwrap_or(c.published))
.clone()
.ok();
verify_object_timestamp(old_timestamp, self.updated.or(self.published))?;

Ok(())
}
}
14 changes: 14 additions & 0 deletions crates/db_schema/src/impls/comment.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
diesel::DecoratableTarget,
newtypes::{CommentId, DbUrl, PersonId},
schema::comment,
source::comment::{
Expand All @@ -13,6 +14,7 @@ use crate::{
traits::{Crud, Likeable, Saveable},
utils::{get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT},
};
use chrono::{DateTime, Utc};
use diesel::{
dsl::{insert_into, sql_query},
result::Error,
Expand Down Expand Up @@ -59,6 +61,16 @@ impl Comment {
pool: &mut DbPool<'_>,
comment_form: &CommentInsertForm,
parent_path: Option<&Ltree>,
) -> Result<Comment, Error> {
// TODO: shouldnt have on_conflict clause when called from here
Self::insert_apub(pool, naive_now(), comment_form, parent_path).await
}

pub async fn insert_apub(
pool: &mut DbPool<'_>,
timestamp: DateTime<Utc>,
comment_form: &CommentInsertForm,
parent_path: Option<&Ltree>,
) -> Result<Comment, Error> {
let conn = &mut get_conn(pool).await?;

Expand All @@ -70,6 +82,7 @@ impl Comment {
let inserted_comment = insert_into(comment::table)
.values(comment_form)
.on_conflict(comment::ap_id)
.filter_target(comment::published.lt(timestamp))
.do_update()
.set(comment_form)
.get_result::<Self>(conn)
Expand Down Expand Up @@ -129,6 +142,7 @@ where ca.comment_id = c.id"
})
.await
}

pub async fn read_from_apub_id(
pool: &mut DbPool<'_>,
object_id: Url,
Expand Down
43 changes: 32 additions & 11 deletions crates/db_schema/src/impls/community.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
diesel::DecoratableTarget,
newtypes::{CommunityId, DbUrl, PersonId},
schema::{community, community_follower, instance},
source::{
Expand All @@ -20,6 +21,7 @@ use crate::{
utils::{functions::lower, get_conn, DbPool},
SubscribedType,
};
use chrono::{DateTime, Utc};
use diesel::{
deserialize,
dsl,
Expand All @@ -43,25 +45,15 @@ impl Crud for Community {
type IdType = CommunityId;

async fn create(pool: &mut DbPool<'_>, form: &Self::InsertForm) -> Result<Self, Error> {
let is_new_community = match &form.actor_id {
Some(id) => Community::read_from_apub_id(pool, id).await?.is_none(),
None => true,
};
let conn = &mut get_conn(pool).await?;

// Can't do separate insert/update commands because InsertForm/UpdateForm aren't convertible
let community_ = insert_into(community::table)
.values(form)
.on_conflict(community::actor_id)
.do_update()
.set(form)
.get_result::<Self>(conn)
.await?;

// Initialize languages for new community
if is_new_community {
CommunityLanguage::update(pool, vec![], community_.id).await?;
}
CommunityLanguage::update(pool, vec![], community_.id).await?;

Ok(community_)
}
Expand Down Expand Up @@ -115,6 +107,35 @@ pub enum CollectionType {
}

impl Community {
pub async fn insert_apub(
pool: &mut DbPool<'_>,
timestamp: DateTime<Utc>,
form: &CommunityInsertForm,
) -> Result<Self, Error> {
let is_new_community = match &form.actor_id {
Some(id) => Community::read_from_apub_id(pool, id).await?.is_none(),
None => true,
};
let conn = &mut get_conn(pool).await?;

// Can't do separate insert/update commands because InsertForm/UpdateForm aren't convertible
let community_ = insert_into(community::table)
.values(form)
.on_conflict(community::actor_id)
.filter_target(community::published.lt(timestamp))
.do_update()
.set(form)
.get_result::<Self>(conn)
.await?;

// Initialize languages for new community
if is_new_community {
CommunityLanguage::update(pool, vec![], community_.id).await?;
}

Ok(community_)
}

/// Get the community which has a given moderators or featured url, also return the collection type
pub async fn get_by_collection_url(
pool: &mut DbPool<'_>,
Expand Down
30 changes: 25 additions & 5 deletions crates/db_schema/src/impls/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ use crate::{
},
};
use ::url::Url;
use chrono::Utc;
use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl, TextExpressionMethods};
use chrono::{DateTime, Utc};
use diesel::{
dsl::insert_into,
result::Error,
DecoratableTarget,
ExpressionMethods,
QueryDsl,
TextExpressionMethods,
};
use diesel_async::RunQueryDsl;
use std::collections::HashSet;

Expand All @@ -42,9 +49,6 @@ impl Crud for Post {
let conn = &mut get_conn(pool).await?;
insert_into(post::table)
.values(form)
.on_conflict(post::ap_id)
.do_update()
.set(form)
.get_result::<Self>(conn)
.await
}
Expand All @@ -63,6 +67,22 @@ impl Crud for Post {
}

impl Post {
pub async fn insert_apub(
pool: &mut DbPool<'_>,
timestamp: DateTime<Utc>,
form: &PostInsertForm,
) -> Result<Self, Error> {
let conn = &mut get_conn(pool).await?;
insert_into(post::table)
.values(form)
.on_conflict(post::ap_id)
.filter_target(post::published.lt(timestamp))
.do_update()
.set(form)
.get_result::<Self>(conn)
.await
}

pub async fn list_for_community(
pool: &mut DbPool<'_>,
the_community_id: CommunityId,
Expand Down
Loading

0 comments on commit 41bac92

Please sign in to comment.