From 7136e48eaa07d4739889dd5a0955c92524090a45 Mon Sep 17 00:00:00 2001 From: iequidoo Date: Tue, 23 Apr 2024 05:24:14 -0300 Subject: [PATCH] feat: Limit the size of aggregated WebXDC update to 100 KiB (#4825) Before, update sending might be delayed due to rate limits and later merged into large messages. This is undesirable for apps that want to send large files over WebXDC updates because the message with aggregated update may be too large for actual sending and hit the provider limit or require multiple attempts on a flaky SMTP connection. So, don't aggregate updates if the size of an aggregated update will exceed the limit of 100 KiB. This is a soft limit, so it may be exceeded if a single update is larger and it limits only the update JSON size, so the message with all envelopes still may be larger. Also don't send any updates together with the WebXDC instance when resending it to not complicate the code, the only downside is sending one message more. --- src/chat.rs | 56 ++++++++-- src/mimefactory.rs | 21 +++- src/webxdc.rs | 272 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 275 insertions(+), 74 deletions(-) diff --git a/src/chat.rs b/src/chat.rs index 9c8dbe289d..9e9ed3a4e0 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -49,7 +49,7 @@ use crate::tools::{ create_smeared_timestamps, get_abs_path, gm2local_offset, improve_single_line_input, smeared_time, time, IsNoneOrEmpty, SystemTime, }; -use crate::webxdc::WEBXDC_SUFFIX; +use crate::webxdc::{StatusUpdateSerial, WEBXDC_SUFFIX}; /// An chat item, such as a message or a marker. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -2870,7 +2870,7 @@ async fn prepare_send_msg( ); message::update_msg_state(context, msg.id, MessageState::OutPending).await?; } - create_send_msg_jobs(context, msg).await + create_send_msg_jobs(context, msg, false).await } /// Constructs jobs for sending a message and inserts them into the `smtp` table. @@ -2879,9 +2879,15 @@ async fn prepare_send_msg( /// group with only self and no BCC-to-self configured. /// /// The caller has to interrupt SMTP loop or otherwise process new rows. -pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result> { +pub(crate) async fn create_send_msg_jobs( + context: &Context, + msg: &mut Message, + resend: bool, +) -> Result> { let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default(); - let mimefactory = MimeFactory::from_msg(context, msg).await?; + let mimefactory = MimeFactory::from_msg(context, msg) + .await? + .set_resend(resend); let attach_selfavatar = mimefactory.attach_selfavatar; let mut recipients = mimefactory.recipients(); @@ -4180,7 +4186,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .prepare_msg_raw(context, &mut msg, None, curr_timestamp) .await?; curr_timestamp += 1; - if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { + if !create_send_msg_jobs(context, &mut msg, false) + .await? + .is_empty() + { context.scheduler.interrupt_smtp().await; } } @@ -4240,9 +4249,42 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg.timestamp_sort = create_smeared_timestamp(context); // note(treefit): only matters if it is the last message in chat (but probably to expensive to check, debounce also solves it) chatlist_events::emit_chatlist_item_changed(context, msg.chat_id); - if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { - context.scheduler.interrupt_smtp().await; + let resend = true; + if create_send_msg_jobs(context, &mut msg, resend) + .await? + .is_empty() + { + continue; + } + if msg.viewtype == Viewtype::Webxdc { + let conn_fn = |conn: &mut rusqlite::Connection| { + let range = conn.query_row( + "SELECT IFNULL(min(id), 1), IFNULL(max(id), 0) \ + FROM msgs_status_updates WHERE msg_id=?", + (msg.id,), + |row| { + let min_id: StatusUpdateSerial = row.get(0)?; + let max_id: StatusUpdateSerial = row.get(1)?; + Ok((min_id, max_id)) + }, + )?; + if range.0 > range.1 { + return Ok(()); + }; + // `first_serial` must be decreased to make parallel running + // `Context::flush_status_updates()` send the updates again. + conn.execute( + "INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) \ + VALUES(?, ?, ?, '') \ + ON CONFLICT(msg_id) \ + DO UPDATE SET first_serial=min(first_serial - 1, excluded.first_serial)", + (msg.id, range.0, range.1), + )?; + Ok(()) + }; + context.sql.call_write(conn_fn).await?; } + context.scheduler.interrupt_smtp().await; } Ok(()) } diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 17baa6f3e2..1ac4c47958 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -30,6 +30,7 @@ use crate::tools::IsNoneOrEmpty; use crate::tools::{ create_outgoing_rfc724_mid, create_smeared_timestamp, remove_subject_prefix, time, }; +use crate::webxdc::StatusUpdateSerial; use crate::{location, peer_channels}; // attachments of 25 mb brutto should work on the majority of providers @@ -77,6 +78,7 @@ pub struct MimeFactory<'a> { req_mdn: bool, last_added_location_id: Option, + resend: bool, /// If the created mime-structure contains sync-items, /// the IDs of these items are listed here. @@ -232,6 +234,7 @@ impl<'a> MimeFactory<'a> { references, req_mdn, last_added_location_id: None, + resend: false, sync_ids_to_delete: None, attach_selfavatar, }; @@ -273,6 +276,7 @@ impl<'a> MimeFactory<'a> { references: String::default(), req_mdn: false, last_added_location_id: None, + resend: false, sync_ids_to_delete: None, attach_selfavatar: false, }; @@ -280,6 +284,13 @@ impl<'a> MimeFactory<'a> { Ok(res) } + /// Returns `MimeFactory` for resending the message if `resend`. E.g. for a resent WebXDC + /// instance the status updates must not be sent in the same message to not exceeed the size + /// limit. + pub(crate) fn set_resend(self, resend: bool) -> Self { + Self { resend, ..self } + } + async fn peerstates_for_recipients( &self, context: &Context, @@ -1332,8 +1343,14 @@ impl<'a> MimeFactory<'a> { headers .protected .push(create_iroh_header(context, topic, self.msg.id).await?); - if let Some(json) = context - .render_webxdc_status_update_object(self.msg.id, None) + if self.resend { + } else if let (Some(json), _) = context + .render_webxdc_status_update_object( + self.msg.id, + StatusUpdateSerial::MIN, + StatusUpdateSerial::MAX, + None, + ) .await? { parts.push(context.build_status_update_part(&json)); diff --git a/src/webxdc.rs b/src/webxdc.rs index 78f5898d5b..34b2fa1262 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -18,6 +18,7 @@ mod integration; mod maps_integration; +use std::cmp::max; use std::path::Path; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; @@ -122,11 +123,26 @@ impl StatusUpdateSerial { StatusUpdateSerial(id) } + /// Minimum value. + pub const MIN: Self = Self(1); + /// Maximum value. + pub const MAX: Self = Self(u32::MAX - 1); + /// Gets StatusUpdateSerial as untyped integer. /// Avoid using this outside ffi. pub fn to_u32(self) -> u32 { self.0 } + + /// Returns next [StatusUpdateSerial]. + /// For [Self::MAX] returns `MAX + 1`, but it is a special value. + pub fn next(self) -> Result { + Ok(StatusUpdateSerial( + self.0 + .checked_add(1) + .context("StatusUpdateSerial overflow")?, + )) + } } impl rusqlite::types::ToSql for StatusUpdateSerial { @@ -196,6 +212,9 @@ fn find_zip_entry<'a>( None } +/// Status update JSON size soft limit. +const STATUS_UPDATE_SIZE_MAX: usize = 100 << 10; + impl Context { /// check if a file is an acceptable webxdc for sending or receiving. pub(crate) async fn is_webxdc_file(&self, filename: &str, file: &[u8]) -> Result { @@ -505,22 +524,19 @@ impl Context { Ok(()) } - /// Pops one record of queued webxdc status updates. - /// This function exists to make the sqlite statement testable. - async fn pop_smtp_status_update( + /// Returns one record of the queued webxdc status updates. + async fn smtp_status_update_get( &self, - ) -> Result> { - let _lock = self.sql.write_lock().await; + ) -> Result> { let res = self .sql .query_row_optional( - "DELETE FROM smtp_status_updates - WHERE msg_id IN (SELECT msg_id FROM smtp_status_updates LIMIT 1) - RETURNING msg_id, first_serial, last_serial, descr", + "SELECT msg_id, first_serial, last_serial, descr \ + FROM smtp_status_updates LIMIT 1", (), |row| { let instance_id: MsgId = row.get(0)?; - let first_serial: StatusUpdateSerial = row.get(1)?; + let first_serial: i64 = row.get(1)?; let last_serial: StatusUpdateSerial = row.get(2)?; let descr: String = row.get(3)?; Ok((instance_id, first_serial, last_serial, descr)) @@ -530,19 +546,50 @@ impl Context { Ok(res) } + async fn smtp_status_update_pop_serials( + &self, + msg_id: MsgId, + first: i64, + first_new: StatusUpdateSerial, + ) -> Result<()> { + if self + .sql + .execute( + "DELETE FROM smtp_status_updates \ + WHERE msg_id=? AND first_serial=? AND last_serial 0 + { + return Ok(()); + } + self.sql + .execute( + "UPDATE smtp_status_updates SET first_serial=? \ + WHERE msg_id=? AND first_serial=?", + (first_new, msg_id, first), + ) + .await?; + Ok(()) + } + /// Attempts to send queued webxdc status updates. pub(crate) async fn flush_status_updates(&self) -> Result<()> { loop { - let (instance_id, first_serial, last_serial, descr) = - match self.pop_smtp_status_update().await? { - Some(res) => res, - None => return Ok(()), - }; - - if let Some(json) = self - .render_webxdc_status_update_object(instance_id, Some((first_serial, last_serial))) - .await? - { + let (instance_id, first, last, descr) = match self.smtp_status_update_get().await? { + Some(res) => res, + None => return Ok(()), + }; + let (json, first_new) = self + .render_webxdc_status_update_object( + instance_id, + StatusUpdateSerial(max(first, 1).try_into()?), + last, + Some(STATUS_UPDATE_SIZE_MAX), + ) + .await?; + if let Some(json) = json { let instance = Message::load_from_db(self, instance_id).await?; let mut status_update = Message { chat_id: instance.chat_id, @@ -559,6 +606,8 @@ impl Context { status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed chat::send_msg(self, instance.chat_id, &mut status_update).await?; } + self.smtp_status_update_pop_serials(instance_id, first, first_new) + .await?; } } @@ -690,45 +739,54 @@ impl Context { /// Renders JSON-object for status updates as used on the wire. /// - /// Example: `{"updates": [{"payload":"any update data"}, - /// {"payload":"another update data"}]}` + /// Returns optional JSON and the first serial of updates not included due to a JSON size + /// limit. If all requested updates are included, returns the first not requested serial. /// - /// `range` is an optional range of status update serials to send. - /// If it is `None`, all updates are sent. - /// This is used when a message is resent using [`crate::chat::resend_msgs`]. + /// Example JSON: `{"updates": [{"payload":"any update data"}, + /// {"payload":"another update data"}]}` + /// + /// * `(first, last)`: range of status update serials to send. pub(crate) async fn render_webxdc_status_update_object( &self, instance_msg_id: MsgId, - range: Option<(StatusUpdateSerial, StatusUpdateSerial)>, - ) -> Result> { - let json = self + first: StatusUpdateSerial, + last: StatusUpdateSerial, + size_max: Option, + ) -> Result<(Option, StatusUpdateSerial)> { + let (json, first_new) = self .sql .query_map( - "SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND id>=? AND id<=? ORDER BY id", - ( - instance_msg_id, - range.map(|r|r.0).unwrap_or(StatusUpdateSerial(0)), - range.map(|r|r.1).unwrap_or(StatusUpdateSerial(u32::MAX)), - ), - |row| row.get::<_, String>(0), + "SELECT id, update_item FROM msgs_status_updates \ + WHERE msg_id=? AND id>=? AND id<=? ORDER BY id", + (instance_msg_id, first, last), + |row| { + let id: StatusUpdateSerial = row.get(0)?; + let update_item: String = row.get(1)?; + Ok((id, update_item)) + }, |rows| { let mut json = String::default(); for row in rows { - let update_item = row?; + let (id, update_item) = row?; + if !json.is_empty() + && json.len() + update_item.len() >= size_max.unwrap_or(usize::MAX) + { + return Ok((json, id)); + } if !json.is_empty() { json.push_str(",\n"); } json.push_str(&update_item); } - Ok(json) + Ok((json, last.next()?)) }, ) .await?; - if json.is_empty() { - Ok(None) - } else { - Ok(Some(format!(r#"{{"updates":[{json}]}}"#))) - } + let json = match json.is_empty() { + true => None, + false => Some(format!(r#"{{"updates":[{json}]}}"#)), + }; + Ok((json, first_new)) } } @@ -1089,10 +1147,13 @@ mod tests { assert_eq!(alice_grp.get_msg_cnt(&alice).await?, 3); resend_msgs(&alice, &[alice_instance.id]).await?; let sent1 = alice.pop_sent_msg().await; + alice.flush_status_updates().await?; + let sent2 = alice.pop_sent_msg().await; - // Bob received webxdc, legacy info-messages updates are received but not added to the chat + // Bob receives webxdc, legacy info-messages updates are received and added to the chat. let bob = tcm.bob().await; let bob_instance = bob.recv_msg(&sent1).await; + bob.recv_msg_trash(&sent2).await; assert_eq!(bob_instance.viewtype, Viewtype::Webxdc); assert!(!bob_instance.is_info()); assert_eq!( @@ -1101,8 +1162,8 @@ mod tests { r#"[{"payload":7,"info":"i","summary":"s","serial":1,"max_serial":1}]"# ); let bob_grp = bob_instance.chat_id; - assert_eq!(bob.get_last_msg_in(bob_grp).await.id, bob_instance.id); - assert_eq!(bob_grp.get_msg_cnt(&bob).await?, 1); + assert_eq!(bob_grp.get_msg_cnt(&bob).await?, 2); + assert!(bob.get_last_msg_in(bob_grp).await.is_info()); Ok(()) } @@ -1684,6 +1745,79 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_send_big_webxdc_status_update() -> Result<()> { + let alice = TestContext::new_alice().await; + alice.set_config_bool(Config::BccSelf, true).await?; + let bob = TestContext::new_bob().await; + + let alice_chat = alice.create_chat(&bob).await; + let alice_instance = send_webxdc_instance(&alice, alice_chat.id).await?; + let sent1 = &alice.pop_sent_msg().await; + assert_eq!(alice_instance.viewtype, Viewtype::Webxdc); + assert!(!sent1.payload().contains("report-type=status-update")); + + let update1_str = r#"{"payload":{"foo":""#.to_string() + + &String::from_utf8(vec![b'a'; STATUS_UPDATE_SIZE_MAX])? + + r#""}"#; + alice + .send_webxdc_status_update(alice_instance.id, &(update1_str.clone() + "}"), "descr1") + .await?; + alice + .send_webxdc_status_update( + alice_instance.id, + r#"{"payload" : {"foo":"bar2"}}"#, + "descr2", + ) + .await?; + alice + .send_webxdc_status_update( + alice_instance.id, + r#"{"payload" : {"foo":"bar3"}}"#, + "descr3", + ) + .await?; + alice.flush_status_updates().await?; + + // There's the message stack, so we pop messages in the reverse order. + let sent3 = &alice.pop_sent_msg().await; + let alice_update = sent3.load_from_db().await; + assert_eq!(alice_update.text, "descr3".to_string()); + let sent2 = &alice.pop_sent_msg().await; + let alice_update = sent2.load_from_db().await; + assert_eq!(alice_update.text, "descr3".to_string()); + assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 1); + + // Bob receives the instance. + let bob_instance = bob.recv_msg(sent1).await; + let bob_chat_id = bob_instance.chat_id; + assert_eq!(bob_instance.rfc724_mid, alice_instance.rfc724_mid); + assert_eq!(bob_instance.viewtype, Viewtype::Webxdc); + assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1); + + // Bob receives the status updates. + bob.recv_msg_trash(sent2).await; + expect_status_update_event(&bob, bob_instance.id).await?; + assert_eq!( + bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0)) + .await?, + "[".to_string() + &update1_str + r#","serial":1,"max_serial":1}]"# + ); + bob.recv_msg_trash(sent3).await; + for _ in 0..2 { + expect_status_update_event(&bob, bob_instance.id).await?; + } + assert_eq!( + bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(1)) + .await?, + r#"[{"payload":{"foo":"bar2"},"serial":2,"max_serial":3}, +{"payload":{"foo":"bar3"},"serial":3,"max_serial":3}]"# + ); + assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_render_webxdc_status_update_object() -> Result<()> { let t = TestContext::new_alice().await; @@ -1695,17 +1829,20 @@ mod tests { ) .await?; chat_id.set_draft(&t, Some(&mut instance)).await?; - assert!(t - .render_webxdc_status_update_object(instance.id, None) - .await? - .is_none()); + let (first, last) = (StatusUpdateSerial(1), StatusUpdateSerial::MAX); + assert_eq!( + t.render_webxdc_status_update_object(instance.id, first, last, None) + .await?, + (None, StatusUpdateSerial(u32::MAX)) + ); t.send_webxdc_status_update(instance.id, r#"{"payload": 1}"#, "bla") .await?; - assert!(t - .render_webxdc_status_update_object(instance.id, None) - .await? - .is_some()); + let (object, first_new) = t + .render_webxdc_status_update_object(instance.id, first, last, None) + .await?; + assert!(object.is_some()); + assert_eq!(first_new, StatusUpdateSerial(u32::MAX)); Ok(()) } @@ -1723,13 +1860,16 @@ mod tests { .await?; t.send_webxdc_status_update(instance.id, r#"{"payload": 4}"#, "d") .await?; - let json = t + let (json, first_new) = t .render_webxdc_status_update_object( instance.id, - Some((StatusUpdateSerial(2), StatusUpdateSerial(3))), + StatusUpdateSerial(2), + StatusUpdateSerial(3), + None, ) - .await? - .unwrap(); + .await?; + let json = json.unwrap(); + assert_eq!(first_new, StatusUpdateSerial(4)); let json = Regex::new(r#""uid":"[^"]*""#) .unwrap() .replace_all(&json, "XXX"); @@ -1761,7 +1901,7 @@ mod tests { let instance1 = send_webxdc_instance(&t, chat_id).await?; let instance2 = send_webxdc_instance(&t, chat_id).await?; let instance3 = send_webxdc_instance(&t, chat_id).await?; - assert!(t.pop_smtp_status_update().await?.is_none()); + assert!(t.smtp_status_update_get().await?.is_none()); t.send_webxdc_status_update(instance1.id, r#"{"payload": "1a"}"#, "descr1a") .await?; @@ -1782,20 +1922,23 @@ mod tests { 3 ); - // order of pop_status_update() is not defined, therefore the more complicated test + // order of smtp_status_update_get() is not defined, therefore the more complicated test let mut instances_checked = 0; for i in 0..3 { - let (instance, min_ser, max_ser, descr) = t.pop_smtp_status_update().await?.unwrap(); + let (instance, min_ser, max_ser, descr) = t.smtp_status_update_get().await?.unwrap(); + t.smtp_status_update_pop_serials(instance, min_ser, max_ser.next()?) + .await?; + let min_ser: u32 = min_ser.try_into()?; if instance == instance1.id { - assert_eq!(min_ser, max_ser); + assert_eq!(min_ser, max_ser.to_u32()); assert_eq!(descr, "descr1a"); instances_checked += 1; } else if instance == instance2.id { - assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 1); + assert_eq!(min_ser, max_ser.to_u32() - 1); assert_eq!(descr, "descr2b"); instances_checked += 1; } else if instance == instance3.id { - assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 2); + assert_eq!(min_ser, max_ser.to_u32() - 2); assert_eq!(descr, "descr3c"); instances_checked += 1; } else { @@ -1809,7 +1952,7 @@ mod tests { ); } assert_eq!(instances_checked, 3); - assert!(t.pop_smtp_status_update().await?.is_none()); + assert!(t.smtp_status_update_get().await?.is_none()); Ok(()) } @@ -1836,12 +1979,11 @@ mod tests { alice .send_webxdc_status_update(alice_instance.id, r#"{"payload": {"foo":"bar"}}"#, "descr") .await?; - alice.flush_status_updates().await?; expect_status_update_event(&alice, alice_instance.id).await?; alice .send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#, "descr") .await?; - alice.flush_status_updates().await?; + expect_status_update_event(&alice, alice_instance.id).await?; assert_eq!( alice .sql