Skip to content
79 changes: 72 additions & 7 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::chatlist_events;
use crate::color::str_to_color;
use crate::config::Config;
use crate::constants::{
Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, DC_CHAT_ID_LAST_SPECIAL,
DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX, TIMESTAMP_SENT_TOLERANCE,
self, Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK,
DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX,
TIMESTAMP_SENT_TOLERANCE,
};
use crate::contact::{self, Contact, ContactId, Origin};
use crate::context::Context;
Expand All @@ -34,7 +35,7 @@ use crate::download::{
};
use crate::ephemeral::{Timer as EphemeralTimer, start_chat_ephemeral_timers};
use crate::events::EventType;
use crate::key::self_fingerprint;
use crate::key::{Fingerprint, self_fingerprint};
use crate::location;
use crate::log::{LogExt, warn};
use crate::logged_debug_assert;
Expand Down Expand Up @@ -3125,7 +3126,8 @@ pub async fn get_chat_msgs(context: &Context, chat_id: ChatId) -> Result<Vec<Cha
.await
}

/// Returns messages belonging to the chat according to the given options.
/// Returns messages belonging to the chat according to the given options,
/// sorted by oldest message first.
#[expect(clippy::arithmetic_side_effects)]
pub async fn get_chat_msgs_ex(
context: &Context,
Expand All @@ -3136,6 +3138,7 @@ pub async fn get_chat_msgs_ex(
info_only,
add_daymarker,
} = options;
// TODO: Remove `info_only` parameter; it's not used by anything
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this in a follow-up PR

let process_row = if info_only {
|row: &rusqlite::Row| {
// is_info logic taken from Message.is_info()
Expand Down Expand Up @@ -4009,9 +4012,47 @@ pub(crate) async fn add_contact_to_chat_ex(
if sync.into() {
chat.sync_contacts(context).await.log_err(context).ok();
}
if chat.typ == Chattype::OutBroadcast {
resend_last_msgs(context, chat.id, &contact)
.await
.log_err(context)
.ok();
}
Ok(true)
}

async fn resend_last_msgs(context: &Context, chat_id: ChatId, to_contact: &Contact) -> Result<()> {
let msgs: Vec<MsgId> = context
.sql
.query_map_vec(
"
SELECT id
FROM msgs
WHERE chat_id=?
AND hidden=0
AND NOT ( -- Exclude info and system messages
param GLOB '*\nS=*' OR param GLOB 'S=*'
OR from_id=?
OR to_id=?
)
AND type!=?
ORDER BY timestamp DESC, id DESC LIMIT ?",
(
chat_id,
ContactId::INFO,
ContactId::INFO,
Viewtype::Webxdc,
constants::N_MSGS_TO_NEW_BROADCAST_MEMBER,
),
|row: &rusqlite::Row| Ok(row.get::<_, MsgId>(0)?),
)
.await?
.into_iter()
.rev()
.collect();
resend_msgs_ex(context, &msgs, to_contact.fingerprint()).await
}

/// Returns true if an avatar should be attached in the given chat.
///
/// This function does not check if the avatar is set.
Expand Down Expand Up @@ -4675,10 +4716,26 @@ pub(crate) async fn save_copy_in_self_talk(
Ok(msg.rfc724_mid)
}

/// Resends given messages with the same Message-ID.
/// Resends given messages to members of the corresponding chats.
///
/// This is primarily intended to make existing webxdcs available to new chat members.
pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
resend_msgs_ex(context, msg_ids, None).await
}

/// Resends given messages to a contact with fingerprint `to_fingerprint` or, if it's `None`, to
/// members of the corresponding chats.
///
/// NB: Actually `to_fingerprint` is only passed for `OutBroadcast` chats when a new member is
/// added. Regarding webxdcs: It is not trivial to resend only the own status updates,
/// and it is not trivial to resend them only to the newly-joined member,
/// so that for now, [`resend_last_msgs`] does not automatically resend webxdcs at all.
pub(crate) async fn resend_msgs_ex(
context: &Context,
msg_ids: &[MsgId],
to_fingerprint: Option<Fingerprint>,
) -> Result<()> {
let to_fingerprint = to_fingerprint.map(|f| f.hex());
let mut msgs: Vec<Message> = Vec::new();
for msg_id in msg_ids {
let msg = Message::load_from_db(context, *msg_id).await?;
Expand All @@ -4697,10 +4754,17 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
| MessageState::OutFailed
| MessageState::OutDelivered
| MessageState::OutMdnRcvd => {
message::update_msg_state(context, msg.id, MessageState::OutPending).await?
// Broadcast owners shouldn't see spinners on messages being auto-re-sent to new
// subscribers (otherwise big channel owners will see spinners most of the time).
if to_fingerprint.is_none() {
message::update_msg_state(context, msg.id, MessageState::OutPending).await?;
}
}
msg_state => bail!("Unexpected message state {msg_state}"),
}
if let Some(to_fingerprint) = &to_fingerprint {
msg.param.set(Param::Arg4, to_fingerprint.clone());
}
if create_send_msg_jobs(context, &mut msg).await?.is_empty() {
continue;
}
Expand All @@ -4712,7 +4776,8 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
chat_id: msg.chat_id,
msg_id: msg.id,
});
// note(treefit): only matters if it is the last message in chat (but probably to expensive to check, debounce also solves it)
// The event only matters if the message is last in the chat.
// But it's probably too expensive check, and UIs anyways need to debounce.
Comment on lines +4779 to +4780
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in practise, resent messages always belong to the same chat, in a channel, but also when resend manually by selecting in the UI

we can also move the event out of the loop, and document that all messages shall belong to the same chat

but i am also fine to leave things as is

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be fixed in this way:

diff --git a/src/chat.rs b/src/chat.rs
index 966a041ca..1b6440720 100644
--- a/src/chat.rs
+++ b/src/chat.rs
@@ -4747,6 +4747,9 @@ pub(crate) async fn resend_msgs_ex(
         msgs.push(msg)
     }
 
+    let mut chat_ids: Vec<ChatId> = msgs.iter().map(|m| m.chat_id).collect();
+    chat_ids.dedup();
+
     for mut msg in msgs {
         match msg.get_state() {
             // `get_state()` may return an outdated `OutPending`, so update anyway.
@@ -4776,9 +4779,6 @@ pub(crate) async fn resend_msgs_ex(
             chat_id: msg.chat_id,
             msg_id: msg.id,
         });
-        // The event only matters if the message is last in the chat.
-        // But it's probably too expensive check, and UIs anyways need to debounce.
-        chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
 
         if msg.viewtype == Viewtype::Webxdc {
             let conn_fn = |conn: &mut rusqlite::Connection| {
@@ -4811,6 +4811,11 @@ pub(crate) async fn resend_msgs_ex(
         }
         context.scheduler.interrupt_smtp().await;
     }
+    for chat_id in chat_ids {
+        // The event only matters if the message is last in the chat.
+        // But it's probably too expensive check, and UIs anyways need to debounce.
+        chatlist_events::emit_chatlist_item_changed(context, chat_id);
+    }
     Ok(())
 }

but is unrelated to the PR here

chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);

if msg.viewtype == Viewtype::Webxdc {
Expand Down
52 changes: 51 additions & 1 deletion src/chat/chat_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use super::*;
use crate::Event;
use crate::chatlist::get_archived_cnt;
use crate::constants::{DC_GCL_ARCHIVED_ONLY, DC_GCL_NO_SPECIALS};
use crate::constants::{DC_GCL_ARCHIVED_ONLY, DC_GCL_NO_SPECIALS, N_MSGS_TO_NEW_BROADCAST_MEMBER};
use crate::ephemeral::Timer;
use crate::headerdef::HeaderDef;
use crate::imex::{ImexMode, has_backup, imex};
Expand Down Expand Up @@ -2947,6 +2947,56 @@ async fn test_broadcast_change_name() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_broadcast_resend_to_new_member() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = &tcm.alice().await;
let bob = &tcm.bob().await;
let fiona = &tcm.fiona().await;

let alice_bc_id = create_broadcast(alice, "Channel".to_string()).await?;
let qr = get_securejoin_qr(alice, Some(alice_bc_id)).await.unwrap();

tcm.exec_securejoin_qr(bob, alice, &qr).await;
let mut alice_msg_ids = Vec::new();
for i in 0..(N_MSGS_TO_NEW_BROADCAST_MEMBER + 1) {
alice_msg_ids.push(
alice
.send_text(alice_bc_id, &i.to_string())
.await
.sender_msg_id,
);
}
let fiona_bc_id = tcm.exec_securejoin_qr(fiona, alice, &qr).await;
for msg_id in alice_msg_ids {
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
}
for i in 0..N_MSGS_TO_NEW_BROADCAST_MEMBER {
let rev_order = false;
let resent_msg = alice
.pop_sent_msg_ex(rev_order, Duration::ZERO)
.await
.unwrap();
let fiona_msg = fiona.recv_msg(&resent_msg).await;
assert_eq!(fiona_msg.chat_id, fiona_bc_id);
assert_eq!(fiona_msg.text, (i + 1).to_string());
assert!(resent_msg.recipients.contains("fiona@example.net"));
assert!(!resent_msg.recipients.contains("bob@"));
// The message is undecryptable for Bob, he mustn't be able to know yet that somebody joined
// the broadcast even if he is a postman in this land. E.g. Fiona may leave after fetching
// the news, Bob won't know about that.
assert!(
MimeMessage::from_bytes(bob, resent_msg.payload().as_bytes())
.await?
.decryption_error
.is_some()
);
bob.recv_msg_trash(&resent_msg).await;
}
assert!(alice.pop_sent_msg_opt(Duration::ZERO).await.is_none());
Ok(())
}

/// - Alice has multiple devices
/// - Alice creates a broadcast and sends a message into it
/// - Alice's second device sees the broadcast
Expand Down
3 changes: 3 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ Here is what to do:

If you have any questions, please send an email to delta@merlinux.eu or ask at https://support.delta.chat/."#;

/// How many recent messages should be re-sent to a new broadcast member.
pub(crate) const N_MSGS_TO_NEW_BROADCAST_MEMBER: usize = 10;

#[cfg(test)]
mod tests {
use num_traits::FromPrimitive;
Expand Down
23 changes: 12 additions & 11 deletions src/mimefactory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ fn new_address_with_name(name: &str, address: String) -> Address<'static> {
}

impl MimeFactory {
/// Returns `MimeFactory` for rendering `msg`.
#[expect(clippy::arithmetic_side_effects)]
pub async fn from_msg(context: &Context, msg: Message) -> Result<MimeFactory> {
let now = time();
Expand Down Expand Up @@ -2225,18 +2226,18 @@ fn should_encrypt_symmetrically(msg: &Message, chat: &Chat) -> bool {
/// rather than all recipients.
/// This function returns the fingerprint of the recipient the message should be sent to.
fn must_have_only_one_recipient<'a>(msg: &'a Message, chat: &Chat) -> Option<Result<&'a str>> {
if chat.typ == Chattype::OutBroadcast
&& matches!(
msg.param.get_cmd(),
SystemMessage::MemberRemovedFromGroup | SystemMessage::MemberAddedToGroup
)
{
let Some(fp) = msg.param.get(Param::Arg4) else {
return Some(Err(format_err!("Missing removed/added member")));
};
return Some(Ok(fp));
if chat.typ != Chattype::OutBroadcast {
None
} else if let Some(fp) = msg.param.get(Param::Arg4) {
Some(Ok(fp))
} else if matches!(
msg.param.get_cmd(),
SystemMessage::MemberRemovedFromGroup | SystemMessage::MemberAddedToGroup
) {
Some(Err(format_err!("Missing removed/added member")))
} else {
None
}
None
}

async fn build_body_file(context: &Context, msg: &Message) -> Result<MimePart<'static>> {
Expand Down
4 changes: 4 additions & 0 deletions src/param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ pub enum Param {
/// For "MemberAddedToGroup" and "MemberRemovedFromGroup",
/// this is the fingerprint added to / removed from the group.
///
/// For messages resent when adding a new member to a broadcast channel,
/// this is the fingerprint of the added member;
/// the message must only be sent to this one member then.
///
/// For call messages, this is the end timsetamp.
Arg4 = b'H',

Expand Down
45 changes: 28 additions & 17 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,17 @@ impl TestContextManager {

let chat_id = join_securejoin(&joiner.ctx, qr).await.unwrap();

loop {
for _ in 0..2 {
let mut something_sent = false;
if let Some(sent) = joiner.pop_sent_msg_opt(Duration::ZERO).await {
let rev_order = false;
if let Some(sent) = joiner.pop_sent_msg_ex(rev_order, Duration::ZERO).await {
for inviter in inviters {
inviter.recv_msg_opt(&sent).await;
}
something_sent = true;
}
for inviter in inviters {
if let Some(sent) = inviter.pop_sent_msg_opt(Duration::ZERO).await {
if let Some(sent) = inviter.pop_sent_msg_ex(rev_order, Duration::ZERO).await {
joiner.recv_msg_opt(&sent).await;
something_sent = true;
}
Expand Down Expand Up @@ -623,25 +624,35 @@ impl TestContext {
}

pub async fn pop_sent_msg_opt(&self, timeout: Duration) -> Option<SentMessage<'_>> {
let rev_order = true;
self.pop_sent_msg_ex(rev_order, timeout).await
}

pub async fn pop_sent_msg_ex(
&self,
rev_order: bool,
timeout: Duration,
) -> Option<SentMessage<'_>> {
let start = Instant::now();
let mut query = "
SELECT id, msg_id, mime, recipients
FROM smtp
ORDER BY id"
.to_string();
if rev_order {
query += " DESC";
}
let (rowid, msg_id, payload, recipients) = loop {
let row = self
.ctx
.sql
.query_row_optional(
r#"
SELECT id, msg_id, mime, recipients
FROM smtp
ORDER BY id DESC"#,
(),
|row| {
let rowid: i64 = row.get(0)?;
let msg_id: MsgId = row.get(1)?;
let mime: String = row.get(2)?;
let recipients: String = row.get(3)?;
Ok((rowid, msg_id, mime, recipients))
},
)
.query_row_optional(&query, (), |row| {
let rowid: i64 = row.get(0)?;
let msg_id: MsgId = row.get(1)?;
let mime: String = row.get(2)?;
let recipients: String = row.get(3)?;
Ok((rowid, msg_id, mime, recipients))
})
.await
.expect("query_row_optional failed");
if let Some(row) = row {
Expand Down