Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/chat.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! # Chat module.

use std::cmp;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::io::Cursor;
use std::marker::Sync;
Expand Down Expand Up @@ -3776,7 +3776,7 @@ pub(crate) async fn update_chat_contacts_table(
context: &Context,
timestamp: i64,
id: ChatId,
contacts: &HashSet<ContactId>,
contacts: &BTreeSet<ContactId>,
) -> Result<()> {
context
.sql
Expand Down Expand Up @@ -5035,15 +5035,15 @@ async fn set_contacts_by_addrs(context: &Context, id: ChatId, addrs: &[String])
chat.typ == Chattype::OutBroadcast,
"{id} is not a broadcast list",
);
let mut contacts = HashSet::new();
let mut contacts = BTreeSet::new();
for addr in addrs {
let contact_addr = ContactAddress::new(addr)?;
let contact = Contact::add_or_lookup(context, "", &contact_addr, Origin::Hidden)
.await?
.0;
contacts.insert(contact);
}
let contacts_old = HashSet::<ContactId>::from_iter(get_chat_contacts(context, id).await?);
let contacts_old = BTreeSet::<ContactId>::from_iter(get_chat_contacts(context, id).await?);
if contacts == contacts_old {
return Ok(());
}
Expand Down
5 changes: 2 additions & 3 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! # Messages and their identifiers.

use std::collections::BTreeSet;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::str;

Expand Down Expand Up @@ -1693,7 +1692,7 @@ pub(crate) async fn delete_msg_locally(context: &Context, msg: &Message) -> Resu
pub(crate) async fn delete_msgs_locally_done(
context: &Context,
msg_ids: &[MsgId],
modified_chat_ids: HashSet<ChatId>,
modified_chat_ids: BTreeSet<ChatId>,
) -> Result<()> {
for modified_chat_id in modified_chat_ids {
context.emit_msgs_changed_without_msg_id(modified_chat_id);
Expand Down Expand Up @@ -1723,7 +1722,7 @@ pub async fn delete_msgs_ex(
msg_ids: &[MsgId],
delete_for_all: bool,
) -> Result<()> {
let mut modified_chat_ids = HashSet::new();
let mut modified_chat_ids = BTreeSet::new();
let mut deleted_rfc724_mid = Vec::new();
let mut res = Ok(());

Expand Down
64 changes: 38 additions & 26 deletions src/receive_imf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Internet Message Format reception pipeline.

use std::cmp;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use std::sync::LazyLock;

Expand Down Expand Up @@ -773,6 +773,7 @@ pub(crate) async fn receive_imf_inner(
chat_id,
chat_id_blocked,
is_dc_message,
is_created,
)
.await
.context("add_parts error")?
Expand Down Expand Up @@ -1809,6 +1810,7 @@ async fn add_parts(
mut chat_id: ChatId,
mut chat_id_blocked: Blocked,
is_dc_message: MessengerMessage,
is_chat_created: bool,
) -> Result<ReceivedMsg> {
let to_id = if mime_parser.incoming {
ContactId::SELF
Expand Down Expand Up @@ -1872,7 +1874,16 @@ async fn add_parts(
apply_out_broadcast_changes(context, mime_parser, &mut chat, from_id).await?
}
Chattype::Group => {
apply_group_changes(context, mime_parser, &mut chat, from_id, to_ids, past_ids).await?
apply_group_changes(
context,
mime_parser,
&mut chat,
from_id,
to_ids,
past_ids,
is_chat_created,
)
.await?
}
Chattype::InBroadcast => {
apply_in_broadcast_changes(context, mime_parser, &mut chat, from_id).await?
Expand Down Expand Up @@ -1927,7 +1938,7 @@ async fn add_parts(
&& chat_id.get_ephemeral_timer(context).await? != ephemeral_timer
{
let chat_contacts =
HashSet::<ContactId>::from_iter(chat::get_chat_contacts(context, chat_id).await?);
BTreeSet::<ContactId>::from_iter(chat::get_chat_contacts(context, chat_id).await?);
let is_from_in_chat =
!chat_contacts.contains(&ContactId::SELF) || chat_contacts.contains(&from_id);

Expand Down Expand Up @@ -2462,7 +2473,7 @@ async fn handle_edit_delete(
// See `message::delete_msgs_ex()`, unlike edit requests, DC doesn't send unencrypted
// deletion requests, so there's no need to support them.
if part.param.get_bool(Param::GuaranteeE2ee).unwrap_or(false) {
let mut modified_chat_ids = HashSet::new();
let mut modified_chat_ids = BTreeSet::new();
let mut msg_ids = Vec::new();

let rfc724_mid_vec: Vec<&str> = rfc724_mid_list.split_whitespace().collect();
Expand Down Expand Up @@ -2889,8 +2900,7 @@ async fn create_group(
let mut chat_id = None;
let mut chat_id_blocked = Default::default();

if chat_id.is_none()
&& !mime_parser.is_mailinglist_message()
if !mime_parser.is_mailinglist_message()
&& !grpid.is_empty()
&& mime_parser.get_header(HeaderDef::ChatGroupName).is_some()
// otherwise, a pending "quit" message may pop up
Expand Down Expand Up @@ -3082,6 +3092,7 @@ async fn apply_group_changes(
from_id: ContactId,
to_ids: &[Option<ContactId>],
past_ids: &[Option<ContactId>],
is_chat_created: bool,
) -> Result<GroupChangesInfo> {
let from_is_key_contact = Contact::get_by_id(context, from_id).await?.is_key_contact();
ensure!(from_is_key_contact || chat.grpid.is_empty());
Expand All @@ -3094,7 +3105,7 @@ async fn apply_group_changes(
let mut better_msg = None;
let mut silent = false;
let chat_contacts =
HashSet::<ContactId>::from_iter(chat::get_chat_contacts(context, chat.id).await?);
BTreeSet::<ContactId>::from_iter(chat::get_chat_contacts(context, chat.id).await?);
let is_from_in_chat =
!chat_contacts.contains(&ContactId::SELF) || chat_contacts.contains(&from_id);

Expand Down Expand Up @@ -3172,8 +3183,8 @@ async fn apply_group_changes(
&& chat.member_list_is_stale(context).await?
{
info!(context, "Member list is stale.");
let mut new_members: HashSet<ContactId> =
HashSet::from_iter(to_ids_flat.iter().copied());
let mut new_members: BTreeSet<ContactId> =
BTreeSet::from_iter(to_ids_flat.iter().copied());
new_members.insert(ContactId::SELF);
if !from_id.is_special() {
new_members.insert(from_id);
Expand Down Expand Up @@ -3217,7 +3228,7 @@ async fn apply_group_changes(
)
.await?;
} else {
let mut new_members: HashSet<ContactId>;
let mut new_members: BTreeSet<ContactId>;
// True if a Delta Chat client has explicitly and really added our primary address to an
// already existing group.
let self_added =
Expand All @@ -3228,7 +3239,7 @@ async fn apply_group_changes(
false
};
if self_added {
new_members = HashSet::from_iter(to_ids_flat.iter().copied());
new_members = BTreeSet::from_iter(to_ids_flat.iter().copied());
new_members.insert(ContactId::SELF);
if !from_id.is_special() && from_is_key_contact != chat.grpid.is_empty() {
new_members.insert(from_id);
Expand Down Expand Up @@ -3279,35 +3290,31 @@ async fn apply_group_changes(
.await?;
}

let new_chat_contacts = HashSet::<ContactId>::from_iter(
let new_chat_contacts = BTreeSet::<ContactId>::from_iter(
chat::get_chat_contacts(context, chat.id)
.await?
.iter()
.copied(),
);

// These are for adding info messages about implicit membership changes.
let mut added_ids: HashSet<ContactId> = new_chat_contacts
let mut added_ids: BTreeSet<ContactId> = new_chat_contacts
.difference(&chat_contacts)
.copied()
.collect();
let mut removed_ids: HashSet<ContactId> = chat_contacts
let mut removed_ids: BTreeSet<ContactId> = chat_contacts
.difference(&new_chat_contacts)
.copied()
.collect();

if let Some(added_id) = added_id
&& !added_ids.remove(&added_id)
&& added_id != ContactId::SELF
{
// No-op "Member added" message. An exception is self-addition messages because they at
// least must be shown when a chat is created on our side.
info!(context, "No-op 'Member added' message (TRASH)");
better_msg = Some(String::new());
}
let id_was_already_added = if let Some(added_id) = added_id {
!added_ids.remove(&added_id)
} else {
false
};
if let Some(removed_id) = removed_id {
removed_ids.remove(&removed_id);
}

let group_changes_msgs = if !chat_contacts.contains(&ContactId::SELF)
&& new_chat_contacts.contains(&ContactId::SELF)
{
Expand All @@ -3316,6 +3323,11 @@ async fn apply_group_changes(
group_changes_msgs(context, &added_ids, &removed_ids, chat.id).await?
};

if id_was_already_added && group_changes_msgs.is_empty() && !is_chat_created {
info!(context, "No-op 'Member added' message (TRASH)");
better_msg = Some(String::new());
}

if send_event_chat_modified {
context.emit_event(EventType::ChatModified(chat.id));
chatlist_events::emit_chatlist_item_changed(context, chat.id);
Expand Down Expand Up @@ -3513,8 +3525,8 @@ async fn apply_chat_name_avatar_and_description_changes(
#[expect(clippy::arithmetic_side_effects)]
async fn group_changes_msgs(
context: &Context,
added_ids: &HashSet<ContactId>,
removed_ids: &HashSet<ContactId>,
added_ids: &BTreeSet<ContactId>,
removed_ids: &BTreeSet<ContactId>,
chat_id: ChatId,
) -> Result<Vec<(String, SystemMessage, Option<ContactId>)>> {
let mut group_changes_msgs: Vec<(String, SystemMessage, Option<ContactId>)> = Vec::new();
Expand Down
48 changes: 48 additions & 0 deletions src/securejoin/securejoin_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1584,3 +1584,51 @@ async fn test_auth_token_is_synchronized() -> Result<()> {

Ok(())
}

/// Tests that Bob deduplicates messages about being added to the group.
///
/// If Alice (inviter) has multiple devices,
/// Bob may receive multiple "member added" messages.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_deduplicate_member_added() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice1 = &tcm.alice().await;
let alice2 = &tcm.alice().await;
let bob = &tcm.bob().await;

// Enable sync messages so Alice QR code tokens
// are synchronized.
alice1.set_config_bool(Config::SyncMsgs, true).await?;
alice2.set_config_bool(Config::SyncMsgs, true).await?;

tcm.section("Alice creates a group");
let alice1_chat_id = chat::create_group(alice1, "Group").await?;

tcm.section("Alice creates group QR code and synchronizes it");
let qr = get_securejoin_qr(alice1, Some(alice1_chat_id)).await?;
sync(alice1, alice2).await;

// Import Alice's key to skip key request step.
tcm.section("Bob imports Alice's vCard");
bob.add_or_lookup_contact_id(alice1).await;

tcm.section("Bob scans the QR code");
let bob_chat_id = join_securejoin(bob, &qr).await?;

tcm.section("Both Alice's devices add Bob to chat");
let sent = bob.pop_sent_msg().await;
alice1.recv_msg_trash(&sent).await;
alice2.recv_msg_trash(&sent).await;

let sent1 = alice1.pop_sent_msg().await;
let sent2 = alice2.pop_sent_msg().await;

let bob_rcvd = bob.recv_msg(&sent1).await;
assert_eq!(bob_rcvd.chat_id, bob_chat_id);
assert_eq!(bob_rcvd.text, "Member Me added by alice@example.org.");

// Second message is a no-op, so it is trashed.
bob.recv_msg_trash(&sent2).await;

Ok(())
}
4 changes: 2 additions & 2 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::token::Namespace;
use crate::tools::time;
use crate::transport::{ConfiguredLoginParamJson, sync_transports};
use crate::{message, stock_str, token};
use std::collections::HashSet;
use std::collections::BTreeSet;

/// Whether to send device sync messages. Aimed for usage in the internal API.
#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -383,7 +383,7 @@ impl Context {
}

async fn sync_message_deletion(&self, msgs: &Vec<String>) -> Result<()> {
let mut modified_chat_ids = HashSet::new();
let mut modified_chat_ids = BTreeSet::new();
let mut msg_ids = Vec::new();
for rfc724_mid in msgs {
if let Some(msg_id) = message::rfc724_mid_exists(self, rfc724_mid).await? {
Expand Down
5 changes: 2 additions & 3 deletions src/tests/verified_chats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,8 @@ async fn test_verified_member_added_reordering() -> Result<()> {

assert_eq!(fiona_received_message.get_text(), "Hi");

// Fiona receives late "Member added" message
// and the chat becomes protected.
fiona.recv_msg(&alice_sent_member_added).await;
// Fiona receives late "Member added" message.
fiona.recv_msg_trash(&alice_sent_member_added).await;

Ok(())
}
Expand Down
Loading