diff --git a/src/chat.rs b/src/chat.rs index 9c8dbe289d..9e9ed3a4e0 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -49,7 +49,7 @@ use crate::tools::{ create_smeared_timestamps, get_abs_path, gm2local_offset, improve_single_line_input, smeared_time, time, IsNoneOrEmpty, SystemTime, }; -use crate::webxdc::WEBXDC_SUFFIX; +use crate::webxdc::{StatusUpdateSerial, WEBXDC_SUFFIX}; /// An chat item, such as a message or a marker. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -2870,7 +2870,7 @@ async fn prepare_send_msg( ); message::update_msg_state(context, msg.id, MessageState::OutPending).await?; } - create_send_msg_jobs(context, msg).await + create_send_msg_jobs(context, msg, false).await } /// Constructs jobs for sending a message and inserts them into the `smtp` table. @@ -2879,9 +2879,15 @@ async fn prepare_send_msg( /// group with only self and no BCC-to-self configured. /// /// The caller has to interrupt SMTP loop or otherwise process new rows. -pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result> { +pub(crate) async fn create_send_msg_jobs( + context: &Context, + msg: &mut Message, + resend: bool, +) -> Result> { let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default(); - let mimefactory = MimeFactory::from_msg(context, msg).await?; + let mimefactory = MimeFactory::from_msg(context, msg) + .await? + .set_resend(resend); let attach_selfavatar = mimefactory.attach_selfavatar; let mut recipients = mimefactory.recipients(); @@ -4180,7 +4186,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .prepare_msg_raw(context, &mut msg, None, curr_timestamp) .await?; curr_timestamp += 1; - if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { + if !create_send_msg_jobs(context, &mut msg, false) + .await? + .is_empty() + { context.scheduler.interrupt_smtp().await; } } @@ -4240,9 +4249,42 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg.timestamp_sort = create_smeared_timestamp(context); // note(treefit): only matters if it is the last message in chat (but probably to expensive to check, debounce also solves it) chatlist_events::emit_chatlist_item_changed(context, msg.chat_id); - if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { - context.scheduler.interrupt_smtp().await; + let resend = true; + if create_send_msg_jobs(context, &mut msg, resend) + .await? + .is_empty() + { + continue; + } + if msg.viewtype == Viewtype::Webxdc { + let conn_fn = |conn: &mut rusqlite::Connection| { + let range = conn.query_row( + "SELECT IFNULL(min(id), 1), IFNULL(max(id), 0) \ + FROM msgs_status_updates WHERE msg_id=?", + (msg.id,), + |row| { + let min_id: StatusUpdateSerial = row.get(0)?; + let max_id: StatusUpdateSerial = row.get(1)?; + Ok((min_id, max_id)) + }, + )?; + if range.0 > range.1 { + return Ok(()); + }; + // `first_serial` must be decreased to make parallel running + // `Context::flush_status_updates()` send the updates again. + conn.execute( + "INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) \ + VALUES(?, ?, ?, '') \ + ON CONFLICT(msg_id) \ + DO UPDATE SET first_serial=min(first_serial - 1, excluded.first_serial)", + (msg.id, range.0, range.1), + )?; + Ok(()) + }; + context.sql.call_write(conn_fn).await?; } + context.scheduler.interrupt_smtp().await; } Ok(()) } diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 17baa6f3e2..1ac4c47958 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -30,6 +30,7 @@ use crate::tools::IsNoneOrEmpty; use crate::tools::{ create_outgoing_rfc724_mid, create_smeared_timestamp, remove_subject_prefix, time, }; +use crate::webxdc::StatusUpdateSerial; use crate::{location, peer_channels}; // attachments of 25 mb brutto should work on the majority of providers @@ -77,6 +78,7 @@ pub struct MimeFactory<'a> { req_mdn: bool, last_added_location_id: Option, + resend: bool, /// If the created mime-structure contains sync-items, /// the IDs of these items are listed here. @@ -232,6 +234,7 @@ impl<'a> MimeFactory<'a> { references, req_mdn, last_added_location_id: None, + resend: false, sync_ids_to_delete: None, attach_selfavatar, }; @@ -273,6 +276,7 @@ impl<'a> MimeFactory<'a> { references: String::default(), req_mdn: false, last_added_location_id: None, + resend: false, sync_ids_to_delete: None, attach_selfavatar: false, }; @@ -280,6 +284,13 @@ impl<'a> MimeFactory<'a> { Ok(res) } + /// Returns `MimeFactory` for resending the message if `resend`. E.g. for a resent WebXDC + /// instance the status updates must not be sent in the same message to not exceeed the size + /// limit. + pub(crate) fn set_resend(self, resend: bool) -> Self { + Self { resend, ..self } + } + async fn peerstates_for_recipients( &self, context: &Context, @@ -1332,8 +1343,14 @@ impl<'a> MimeFactory<'a> { headers .protected .push(create_iroh_header(context, topic, self.msg.id).await?); - if let Some(json) = context - .render_webxdc_status_update_object(self.msg.id, None) + if self.resend { + } else if let (Some(json), _) = context + .render_webxdc_status_update_object( + self.msg.id, + StatusUpdateSerial::MIN, + StatusUpdateSerial::MAX, + None, + ) .await? { parts.push(context.build_status_update_part(&json)); diff --git a/src/webxdc.rs b/src/webxdc.rs index 78f5898d5b..34b2fa1262 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -18,6 +18,7 @@ mod integration; mod maps_integration; +use std::cmp::max; use std::path::Path; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; @@ -122,11 +123,26 @@ impl StatusUpdateSerial { StatusUpdateSerial(id) } + /// Minimum value. + pub const MIN: Self = Self(1); + /// Maximum value. + pub const MAX: Self = Self(u32::MAX - 1); + /// Gets StatusUpdateSerial as untyped integer. /// Avoid using this outside ffi. pub fn to_u32(self) -> u32 { self.0 } + + /// Returns next [StatusUpdateSerial]. + /// For [Self::MAX] returns `MAX + 1`, but it is a special value. + pub fn next(self) -> Result { + Ok(StatusUpdateSerial( + self.0 + .checked_add(1) + .context("StatusUpdateSerial overflow")?, + )) + } } impl rusqlite::types::ToSql for StatusUpdateSerial { @@ -196,6 +212,9 @@ fn find_zip_entry<'a>( None } +/// Status update JSON size soft limit. +const STATUS_UPDATE_SIZE_MAX: usize = 100 << 10; + impl Context { /// check if a file is an acceptable webxdc for sending or receiving. pub(crate) async fn is_webxdc_file(&self, filename: &str, file: &[u8]) -> Result { @@ -505,22 +524,19 @@ impl Context { Ok(()) } - /// Pops one record of queued webxdc status updates. - /// This function exists to make the sqlite statement testable. - async fn pop_smtp_status_update( + /// Returns one record of the queued webxdc status updates. + async fn smtp_status_update_get( &self, - ) -> Result> { - let _lock = self.sql.write_lock().await; + ) -> Result> { let res = self .sql .query_row_optional( - "DELETE FROM smtp_status_updates - WHERE msg_id IN (SELECT msg_id FROM smtp_status_updates LIMIT 1) - RETURNING msg_id, first_serial, last_serial, descr", + "SELECT msg_id, first_serial, last_serial, descr \ + FROM smtp_status_updates LIMIT 1", (), |row| { let instance_id: MsgId = row.get(0)?; - let first_serial: StatusUpdateSerial = row.get(1)?; + let first_serial: i64 = row.get(1)?; let last_serial: StatusUpdateSerial = row.get(2)?; let descr: String = row.get(3)?; Ok((instance_id, first_serial, last_serial, descr)) @@ -530,19 +546,50 @@ impl Context { Ok(res) } + async fn smtp_status_update_pop_serials( + &self, + msg_id: MsgId, + first: i64, + first_new: StatusUpdateSerial, + ) -> Result<()> { + if self + .sql + .execute( + "DELETE FROM smtp_status_updates \ + WHERE msg_id=? AND first_serial=? AND last_serial 0 + { + return Ok(()); + } + self.sql + .execute( + "UPDATE smtp_status_updates SET first_serial=? \ + WHERE msg_id=? AND first_serial=?", + (first_new, msg_id, first), + ) + .await?; + Ok(()) + } + /// Attempts to send queued webxdc status updates. pub(crate) async fn flush_status_updates(&self) -> Result<()> { loop { - let (instance_id, first_serial, last_serial, descr) = - match self.pop_smtp_status_update().await? { - Some(res) => res, - None => return Ok(()), - }; - - if let Some(json) = self - .render_webxdc_status_update_object(instance_id, Some((first_serial, last_serial))) - .await? - { + let (instance_id, first, last, descr) = match self.smtp_status_update_get().await? { + Some(res) => res, + None => return Ok(()), + }; + let (json, first_new) = self + .render_webxdc_status_update_object( + instance_id, + StatusUpdateSerial(max(first, 1).try_into()?), + last, + Some(STATUS_UPDATE_SIZE_MAX), + ) + .await?; + if let Some(json) = json { let instance = Message::load_from_db(self, instance_id).await?; let mut status_update = Message { chat_id: instance.chat_id, @@ -559,6 +606,8 @@ impl Context { status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed chat::send_msg(self, instance.chat_id, &mut status_update).await?; } + self.smtp_status_update_pop_serials(instance_id, first, first_new) + .await?; } } @@ -690,45 +739,54 @@ impl Context { /// Renders JSON-object for status updates as used on the wire. /// - /// Example: `{"updates": [{"payload":"any update data"}, - /// {"payload":"another update data"}]}` + /// Returns optional JSON and the first serial of updates not included due to a JSON size + /// limit. If all requested updates are included, returns the first not requested serial. /// - /// `range` is an optional range of status update serials to send. - /// If it is `None`, all updates are sent. - /// This is used when a message is resent using [`crate::chat::resend_msgs`]. + /// Example JSON: `{"updates": [{"payload":"any update data"}, + /// {"payload":"another update data"}]}` + /// + /// * `(first, last)`: range of status update serials to send. pub(crate) async fn render_webxdc_status_update_object( &self, instance_msg_id: MsgId, - range: Option<(StatusUpdateSerial, StatusUpdateSerial)>, - ) -> Result> { - let json = self + first: StatusUpdateSerial, + last: StatusUpdateSerial, + size_max: Option, + ) -> Result<(Option, StatusUpdateSerial)> { + let (json, first_new) = self .sql .query_map( - "SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND id>=? AND id<=? ORDER BY id", - ( - instance_msg_id, - range.map(|r|r.0).unwrap_or(StatusUpdateSerial(0)), - range.map(|r|r.1).unwrap_or(StatusUpdateSerial(u32::MAX)), - ), - |row| row.get::<_, String>(0), + "SELECT id, update_item FROM msgs_status_updates \ + WHERE msg_id=? AND id>=? AND id<=? ORDER BY id", + (instance_msg_id, first, last), + |row| { + let id: StatusUpdateSerial = row.get(0)?; + let update_item: String = row.get(1)?; + Ok((id, update_item)) + }, |rows| { let mut json = String::default(); for row in rows { - let update_item = row?; + let (id, update_item) = row?; + if !json.is_empty() + && json.len() + update_item.len() >= size_max.unwrap_or(usize::MAX) + { + return Ok((json, id)); + } if !json.is_empty() { json.push_str(",\n"); } json.push_str(&update_item); } - Ok(json) + Ok((json, last.next()?)) }, ) .await?; - if json.is_empty() { - Ok(None) - } else { - Ok(Some(format!(r#"{{"updates":[{json}]}}"#))) - } + let json = match json.is_empty() { + true => None, + false => Some(format!(r#"{{"updates":[{json}]}}"#)), + }; + Ok((json, first_new)) } } @@ -1089,10 +1147,13 @@ mod tests { assert_eq!(alice_grp.get_msg_cnt(&alice).await?, 3); resend_msgs(&alice, &[alice_instance.id]).await?; let sent1 = alice.pop_sent_msg().await; + alice.flush_status_updates().await?; + let sent2 = alice.pop_sent_msg().await; - // Bob received webxdc, legacy info-messages updates are received but not added to the chat + // Bob receives webxdc, legacy info-messages updates are received and added to the chat. let bob = tcm.bob().await; let bob_instance = bob.recv_msg(&sent1).await; + bob.recv_msg_trash(&sent2).await; assert_eq!(bob_instance.viewtype, Viewtype::Webxdc); assert!(!bob_instance.is_info()); assert_eq!( @@ -1101,8 +1162,8 @@ mod tests { r#"[{"payload":7,"info":"i","summary":"s","serial":1,"max_serial":1}]"# ); let bob_grp = bob_instance.chat_id; - assert_eq!(bob.get_last_msg_in(bob_grp).await.id, bob_instance.id); - assert_eq!(bob_grp.get_msg_cnt(&bob).await?, 1); + assert_eq!(bob_grp.get_msg_cnt(&bob).await?, 2); + assert!(bob.get_last_msg_in(bob_grp).await.is_info()); Ok(()) } @@ -1684,6 +1745,79 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_send_big_webxdc_status_update() -> Result<()> { + let alice = TestContext::new_alice().await; + alice.set_config_bool(Config::BccSelf, true).await?; + let bob = TestContext::new_bob().await; + + let alice_chat = alice.create_chat(&bob).await; + let alice_instance = send_webxdc_instance(&alice, alice_chat.id).await?; + let sent1 = &alice.pop_sent_msg().await; + assert_eq!(alice_instance.viewtype, Viewtype::Webxdc); + assert!(!sent1.payload().contains("report-type=status-update")); + + let update1_str = r#"{"payload":{"foo":""#.to_string() + + &String::from_utf8(vec![b'a'; STATUS_UPDATE_SIZE_MAX])? + + r#""}"#; + alice + .send_webxdc_status_update(alice_instance.id, &(update1_str.clone() + "}"), "descr1") + .await?; + alice + .send_webxdc_status_update( + alice_instance.id, + r#"{"payload" : {"foo":"bar2"}}"#, + "descr2", + ) + .await?; + alice + .send_webxdc_status_update( + alice_instance.id, + r#"{"payload" : {"foo":"bar3"}}"#, + "descr3", + ) + .await?; + alice.flush_status_updates().await?; + + // There's the message stack, so we pop messages in the reverse order. + let sent3 = &alice.pop_sent_msg().await; + let alice_update = sent3.load_from_db().await; + assert_eq!(alice_update.text, "descr3".to_string()); + let sent2 = &alice.pop_sent_msg().await; + let alice_update = sent2.load_from_db().await; + assert_eq!(alice_update.text, "descr3".to_string()); + assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 1); + + // Bob receives the instance. + let bob_instance = bob.recv_msg(sent1).await; + let bob_chat_id = bob_instance.chat_id; + assert_eq!(bob_instance.rfc724_mid, alice_instance.rfc724_mid); + assert_eq!(bob_instance.viewtype, Viewtype::Webxdc); + assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1); + + // Bob receives the status updates. + bob.recv_msg_trash(sent2).await; + expect_status_update_event(&bob, bob_instance.id).await?; + assert_eq!( + bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0)) + .await?, + "[".to_string() + &update1_str + r#","serial":1,"max_serial":1}]"# + ); + bob.recv_msg_trash(sent3).await; + for _ in 0..2 { + expect_status_update_event(&bob, bob_instance.id).await?; + } + assert_eq!( + bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(1)) + .await?, + r#"[{"payload":{"foo":"bar2"},"serial":2,"max_serial":3}, +{"payload":{"foo":"bar3"},"serial":3,"max_serial":3}]"# + ); + assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_render_webxdc_status_update_object() -> Result<()> { let t = TestContext::new_alice().await; @@ -1695,17 +1829,20 @@ mod tests { ) .await?; chat_id.set_draft(&t, Some(&mut instance)).await?; - assert!(t - .render_webxdc_status_update_object(instance.id, None) - .await? - .is_none()); + let (first, last) = (StatusUpdateSerial(1), StatusUpdateSerial::MAX); + assert_eq!( + t.render_webxdc_status_update_object(instance.id, first, last, None) + .await?, + (None, StatusUpdateSerial(u32::MAX)) + ); t.send_webxdc_status_update(instance.id, r#"{"payload": 1}"#, "bla") .await?; - assert!(t - .render_webxdc_status_update_object(instance.id, None) - .await? - .is_some()); + let (object, first_new) = t + .render_webxdc_status_update_object(instance.id, first, last, None) + .await?; + assert!(object.is_some()); + assert_eq!(first_new, StatusUpdateSerial(u32::MAX)); Ok(()) } @@ -1723,13 +1860,16 @@ mod tests { .await?; t.send_webxdc_status_update(instance.id, r#"{"payload": 4}"#, "d") .await?; - let json = t + let (json, first_new) = t .render_webxdc_status_update_object( instance.id, - Some((StatusUpdateSerial(2), StatusUpdateSerial(3))), + StatusUpdateSerial(2), + StatusUpdateSerial(3), + None, ) - .await? - .unwrap(); + .await?; + let json = json.unwrap(); + assert_eq!(first_new, StatusUpdateSerial(4)); let json = Regex::new(r#""uid":"[^"]*""#) .unwrap() .replace_all(&json, "XXX"); @@ -1761,7 +1901,7 @@ mod tests { let instance1 = send_webxdc_instance(&t, chat_id).await?; let instance2 = send_webxdc_instance(&t, chat_id).await?; let instance3 = send_webxdc_instance(&t, chat_id).await?; - assert!(t.pop_smtp_status_update().await?.is_none()); + assert!(t.smtp_status_update_get().await?.is_none()); t.send_webxdc_status_update(instance1.id, r#"{"payload": "1a"}"#, "descr1a") .await?; @@ -1782,20 +1922,23 @@ mod tests { 3 ); - // order of pop_status_update() is not defined, therefore the more complicated test + // order of smtp_status_update_get() is not defined, therefore the more complicated test let mut instances_checked = 0; for i in 0..3 { - let (instance, min_ser, max_ser, descr) = t.pop_smtp_status_update().await?.unwrap(); + let (instance, min_ser, max_ser, descr) = t.smtp_status_update_get().await?.unwrap(); + t.smtp_status_update_pop_serials(instance, min_ser, max_ser.next()?) + .await?; + let min_ser: u32 = min_ser.try_into()?; if instance == instance1.id { - assert_eq!(min_ser, max_ser); + assert_eq!(min_ser, max_ser.to_u32()); assert_eq!(descr, "descr1a"); instances_checked += 1; } else if instance == instance2.id { - assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 1); + assert_eq!(min_ser, max_ser.to_u32() - 1); assert_eq!(descr, "descr2b"); instances_checked += 1; } else if instance == instance3.id { - assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 2); + assert_eq!(min_ser, max_ser.to_u32() - 2); assert_eq!(descr, "descr3c"); instances_checked += 1; } else { @@ -1809,7 +1952,7 @@ mod tests { ); } assert_eq!(instances_checked, 3); - assert!(t.pop_smtp_status_update().await?.is_none()); + assert!(t.smtp_status_update_get().await?.is_none()); Ok(()) } @@ -1836,12 +1979,11 @@ mod tests { alice .send_webxdc_status_update(alice_instance.id, r#"{"payload": {"foo":"bar"}}"#, "descr") .await?; - alice.flush_status_updates().await?; expect_status_update_event(&alice, alice_instance.id).await?; alice .send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#, "descr") .await?; - alice.flush_status_updates().await?; + expect_status_update_event(&alice, alice_instance.id).await?; assert_eq!( alice .sql