From a8ce09a2f0cfd01146a3355807daa023cf8238fc Mon Sep 17 00:00:00 2001 From: iequidoo Date: Tue, 23 Apr 2024 05:24:14 -0300 Subject: [PATCH] feat: Limit the size of aggregated WebXDC update to 100 KiB (#4825) Before, update sending might be delayed due to rate limits and later merged into large messages. This is undesirable for apps that want to send large files over WebXDC updates because the message with aggregated update may be too large for actual sending and hit the provider limit or require multiple attempts on a flaky SMTP connection. So, don't aggregate updates if the size of an aggregated update will exceed the limit of 100 KiB. This is a soft limit, so it may be exceeded if a single update is larger and it limits only the update JSON size, so the message with all envelopes still may be larger. Also the limit may be exceeded when updates are sent together with the WebXDC instance when resending it as the instance size isn't accounted to not complicate the code. At least this is not worse than the previous behaviour when all updates were attached. --- src/chat.rs | 35 +++++- src/mimefactory.rs | 10 +- src/webxdc.rs | 267 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 245 insertions(+), 67 deletions(-) diff --git a/src/chat.rs b/src/chat.rs index 368de5fa10..62165bd02b 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -49,6 +49,7 @@ use crate::tools::{ create_smeared_timestamps, get_abs_path, gm2local_offset, smeared_time, time, IsNoneOrEmpty, SystemTime, }; +use crate::webxdc::StatusUpdateSerial; /// An chat item, such as a message or a marker. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -4272,9 +4273,39 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg.timestamp_sort = create_smeared_timestamp(context); // note(treefit): only matters if it is the last message in chat (but probably to expensive to check, debounce also solves it) chatlist_events::emit_chatlist_item_changed(context, msg.chat_id); - if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { - context.scheduler.interrupt_smtp().await; + if create_send_msg_jobs(context, &mut msg).await?.is_empty() { + continue; } + if msg.viewtype == Viewtype::Webxdc { + let conn_fn = |conn: &mut rusqlite::Connection| { + let range = conn.query_row( + "SELECT IFNULL(min(id), 1), IFNULL(max(id), 0) \ + FROM msgs_status_updates WHERE msg_id=?", + (msg.id,), + |row| { + let min_id: StatusUpdateSerial = row.get(0)?; + let max_id: StatusUpdateSerial = row.get(1)?; + Ok((min_id, max_id)) + }, + )?; + if range.0 > range.1 { + return Ok(()); + }; + // `first_serial` must be decreased, otherwise if `Context::flush_status_updates()` + // runs in parallel, it would miss the race and instead of resending just remove the + // updates thinking that they have been already sent. + conn.execute( + "INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) \ + VALUES(?, ?, ?, '') \ + ON CONFLICT(msg_id) \ + DO UPDATE SET first_serial=min(first_serial - 1, excluded.first_serial)", + (msg.id, range.0, range.1), + )?; + Ok(()) + }; + context.sql.call_write(conn_fn).await?; + } + context.scheduler.interrupt_smtp().await; } Ok(()) } diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 3453cadc47..843462b3c0 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -31,6 +31,7 @@ use crate::tools::IsNoneOrEmpty; use crate::tools::{ create_outgoing_rfc724_mid, create_smeared_timestamp, remove_subject_prefix, time, }; +use crate::webxdc::StatusUpdateSerial; use crate::{location, peer_channels}; // attachments of 25 mb brutto should work on the majority of providers @@ -1369,8 +1370,13 @@ impl MimeFactory { } else if msg.viewtype == Viewtype::Webxdc { let topic = peer_channels::create_random_topic(); headers.push(create_iroh_header(context, topic, msg.id).await?); - if let Some(json) = context - .render_webxdc_status_update_object(msg.id, None) + if let (Some(json), _) = context + .render_webxdc_status_update_object( + msg.id, + StatusUpdateSerial::MIN, + StatusUpdateSerial::MAX, + None, + ) .await? { parts.push(context.build_status_update_part(&json)); diff --git a/src/webxdc.rs b/src/webxdc.rs index 48b645f00d..1a6e5d2592 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -18,6 +18,7 @@ mod integration; mod maps_integration; +use std::cmp::max; use std::path::Path; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; @@ -122,6 +123,11 @@ impl StatusUpdateSerial { StatusUpdateSerial(id) } + /// Minimum value. + pub const MIN: Self = Self(1); + /// Maximum value. + pub const MAX: Self = Self(u32::MAX - 1); + /// Gets StatusUpdateSerial as untyped integer. /// Avoid using this outside ffi. pub fn to_u32(self) -> u32 { @@ -196,6 +202,9 @@ fn find_zip_entry<'a>( None } +/// Status update JSON size soft limit. +const STATUS_UPDATE_SIZE_MAX: usize = 100 << 10; + impl Context { /// check if a file is an acceptable webxdc for sending or receiving. pub(crate) async fn is_webxdc_file(&self, filename: &str, file: &[u8]) -> Result { @@ -505,22 +514,19 @@ impl Context { Ok(()) } - /// Pops one record of queued webxdc status updates. - /// This function exists to make the sqlite statement testable. - async fn pop_smtp_status_update( + /// Returns one record of the queued webxdc status updates. + async fn smtp_status_update_get( &self, - ) -> Result> { - let _lock = self.sql.write_lock().await; + ) -> Result> { let res = self .sql .query_row_optional( - "DELETE FROM smtp_status_updates - WHERE msg_id IN (SELECT msg_id FROM smtp_status_updates LIMIT 1) - RETURNING msg_id, first_serial, last_serial, descr", + "SELECT msg_id, first_serial, last_serial, descr \ + FROM smtp_status_updates LIMIT 1", (), |row| { let instance_id: MsgId = row.get(0)?; - let first_serial: StatusUpdateSerial = row.get(1)?; + let first_serial: i64 = row.get(1)?; let last_serial: StatusUpdateSerial = row.get(2)?; let descr: String = row.get(3)?; Ok((instance_id, first_serial, last_serial, descr)) @@ -530,19 +536,50 @@ impl Context { Ok(res) } + async fn smtp_status_update_pop_serials( + &self, + msg_id: MsgId, + first: i64, + first_new: StatusUpdateSerial, + ) -> Result<()> { + if self + .sql + .execute( + "DELETE FROM smtp_status_updates \ + WHERE msg_id=? AND first_serial=? AND last_serial 0 + { + return Ok(()); + } + self.sql + .execute( + "UPDATE smtp_status_updates SET first_serial=? \ + WHERE msg_id=? AND first_serial=?", + (first_new, msg_id, first), + ) + .await?; + Ok(()) + } + /// Attempts to send queued webxdc status updates. pub(crate) async fn flush_status_updates(&self) -> Result<()> { loop { - let (instance_id, first_serial, last_serial, descr) = - match self.pop_smtp_status_update().await? { - Some(res) => res, - None => return Ok(()), - }; - - if let Some(json) = self - .render_webxdc_status_update_object(instance_id, Some((first_serial, last_serial))) - .await? - { + let (instance_id, first, last, descr) = match self.smtp_status_update_get().await? { + Some(res) => res, + None => return Ok(()), + }; + let (json, first_new) = self + .render_webxdc_status_update_object( + instance_id, + StatusUpdateSerial(max(first, 1).try_into()?), + last, + Some(STATUS_UPDATE_SIZE_MAX), + ) + .await?; + if let Some(json) = json { let instance = Message::load_from_db(self, instance_id).await?; let mut status_update = Message { chat_id: instance.chat_id, @@ -559,6 +596,8 @@ impl Context { status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed chat::send_msg(self, instance.chat_id, &mut status_update).await?; } + self.smtp_status_update_pop_serials(instance_id, first, first_new) + .await?; } } @@ -690,45 +729,59 @@ impl Context { /// Renders JSON-object for status updates as used on the wire. /// - /// Example: `{"updates": [{"payload":"any update data"}, - /// {"payload":"another update data"}]}` + /// Returns optional JSON and the first serial of updates not included due to a JSON size + /// limit. If all requested updates are included, returns the first not requested serial. + /// + /// Example JSON: `{"updates": [{"payload":"any update data"}, + /// {"payload":"another update data"}]}` /// - /// `range` is an optional range of status update serials to send. - /// If it is `None`, all updates are sent. - /// This is used when a message is resent using [`crate::chat::resend_msgs`]. + /// * `(first, last)`: range of status update serials to send. pub(crate) async fn render_webxdc_status_update_object( &self, instance_msg_id: MsgId, - range: Option<(StatusUpdateSerial, StatusUpdateSerial)>, - ) -> Result> { - let json = self + first: StatusUpdateSerial, + last: StatusUpdateSerial, + size_max: Option, + ) -> Result<(Option, StatusUpdateSerial)> { + let (json, first_new) = self .sql .query_map( - "SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND id>=? AND id<=? ORDER BY id", - ( - instance_msg_id, - range.map(|r|r.0).unwrap_or(StatusUpdateSerial(0)), - range.map(|r|r.1).unwrap_or(StatusUpdateSerial(u32::MAX)), - ), - |row| row.get::<_, String>(0), + "SELECT id, update_item FROM msgs_status_updates \ + WHERE msg_id=? AND id>=? AND id<=? ORDER BY id", + (instance_msg_id, first, last), + |row| { + let id: StatusUpdateSerial = row.get(0)?; + let update_item: String = row.get(1)?; + Ok((id, update_item)) + }, |rows| { let mut json = String::default(); for row in rows { - let update_item = row?; + let (id, update_item) = row?; + if !json.is_empty() + && json.len() + update_item.len() >= size_max.unwrap_or(usize::MAX) + { + return Ok((json, id)); + } if !json.is_empty() { json.push_str(",\n"); } json.push_str(&update_item); } - Ok(json) + Ok(( + json, + // Too late to fail here if an overflow happens. It's still better to send + // the updates. + StatusUpdateSerial::new(last.to_u32().saturating_add(1)), + )) }, ) .await?; - if json.is_empty() { - Ok(None) - } else { - Ok(Some(format!(r#"{{"updates":[{json}]}}"#))) - } + let json = match json.is_empty() { + true => None, + false => Some(format!(r#"{{"updates":[{json}]}}"#)), + }; + Ok((json, first_new)) } } @@ -1089,10 +1142,13 @@ mod tests { assert_eq!(alice_grp.get_msg_cnt(&alice).await?, 3); resend_msgs(&alice, &[alice_instance.id]).await?; let sent1 = alice.pop_sent_msg().await; + alice.flush_status_updates().await?; + let sent2 = alice.pop_sent_msg().await; - // Bob received webxdc, legacy info-messages updates are received but not added to the chat + // Bob receives webxdc, legacy info-messages updates are received and added to the chat. let bob = tcm.bob().await; let bob_instance = bob.recv_msg(&sent1).await; + bob.recv_msg_trash(&sent2).await; assert_eq!(bob_instance.viewtype, Viewtype::Webxdc); assert!(!bob_instance.is_info()); assert_eq!( @@ -1684,6 +1740,79 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_send_big_webxdc_status_update() -> Result<()> { + let alice = TestContext::new_alice().await; + alice.set_config_bool(Config::BccSelf, true).await?; + let bob = TestContext::new_bob().await; + + let alice_chat = alice.create_chat(&bob).await; + let alice_instance = send_webxdc_instance(&alice, alice_chat.id).await?; + let sent1 = &alice.pop_sent_msg().await; + assert_eq!(alice_instance.viewtype, Viewtype::Webxdc); + assert!(!sent1.payload().contains("report-type=status-update")); + + let update1_str = r#"{"payload":{"foo":""#.to_string() + + &String::from_utf8(vec![b'a'; STATUS_UPDATE_SIZE_MAX])? + + r#""}"#; + alice + .send_webxdc_status_update(alice_instance.id, &(update1_str.clone() + "}"), "descr1") + .await?; + alice + .send_webxdc_status_update( + alice_instance.id, + r#"{"payload" : {"foo":"bar2"}}"#, + "descr2", + ) + .await?; + alice + .send_webxdc_status_update( + alice_instance.id, + r#"{"payload" : {"foo":"bar3"}}"#, + "descr3", + ) + .await?; + alice.flush_status_updates().await?; + + // There's the message stack, so we pop messages in the reverse order. + let sent3 = &alice.pop_sent_msg().await; + let alice_update = sent3.load_from_db().await; + assert_eq!(alice_update.text, "descr3".to_string()); + let sent2 = &alice.pop_sent_msg().await; + let alice_update = sent2.load_from_db().await; + assert_eq!(alice_update.text, "descr3".to_string()); + assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 1); + + // Bob receives the instance. + let bob_instance = bob.recv_msg(sent1).await; + let bob_chat_id = bob_instance.chat_id; + assert_eq!(bob_instance.rfc724_mid, alice_instance.rfc724_mid); + assert_eq!(bob_instance.viewtype, Viewtype::Webxdc); + assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1); + + // Bob receives the status updates. + bob.recv_msg_trash(sent2).await; + expect_status_update_event(&bob, bob_instance.id).await?; + assert_eq!( + bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0)) + .await?, + "[".to_string() + &update1_str + r#","serial":1,"max_serial":1}]"# + ); + bob.recv_msg_trash(sent3).await; + for _ in 0..2 { + expect_status_update_event(&bob, bob_instance.id).await?; + } + assert_eq!( + bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(1)) + .await?, + r#"[{"payload":{"foo":"bar2"},"serial":2,"max_serial":3}, +{"payload":{"foo":"bar3"},"serial":3,"max_serial":3}]"# + ); + assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_render_webxdc_status_update_object() -> Result<()> { let t = TestContext::new_alice().await; @@ -1695,17 +1824,20 @@ mod tests { ) .await?; chat_id.set_draft(&t, Some(&mut instance)).await?; - assert!(t - .render_webxdc_status_update_object(instance.id, None) - .await? - .is_none()); + let (first, last) = (StatusUpdateSerial(1), StatusUpdateSerial::MAX); + assert_eq!( + t.render_webxdc_status_update_object(instance.id, first, last, None) + .await?, + (None, StatusUpdateSerial(u32::MAX)) + ); t.send_webxdc_status_update(instance.id, r#"{"payload": 1}"#, "bla") .await?; - assert!(t - .render_webxdc_status_update_object(instance.id, None) - .await? - .is_some()); + let (object, first_new) = t + .render_webxdc_status_update_object(instance.id, first, last, None) + .await?; + assert!(object.is_some()); + assert_eq!(first_new, StatusUpdateSerial(u32::MAX)); Ok(()) } @@ -1723,13 +1855,16 @@ mod tests { .await?; t.send_webxdc_status_update(instance.id, r#"{"payload": 4}"#, "d") .await?; - let json = t + let (json, first_new) = t .render_webxdc_status_update_object( instance.id, - Some((StatusUpdateSerial(2), StatusUpdateSerial(3))), + StatusUpdateSerial(2), + StatusUpdateSerial(3), + None, ) - .await? - .unwrap(); + .await?; + let json = json.unwrap(); + assert_eq!(first_new, StatusUpdateSerial(4)); let json = Regex::new(r#""uid":"[^"]*""#) .unwrap() .replace_all(&json, "XXX"); @@ -1761,7 +1896,7 @@ mod tests { let instance1 = send_webxdc_instance(&t, chat_id).await?; let instance2 = send_webxdc_instance(&t, chat_id).await?; let instance3 = send_webxdc_instance(&t, chat_id).await?; - assert!(t.pop_smtp_status_update().await?.is_none()); + assert!(t.smtp_status_update_get().await?.is_none()); t.send_webxdc_status_update(instance1.id, r#"{"payload": "1a"}"#, "descr1a") .await?; @@ -1782,20 +1917,27 @@ mod tests { 3 ); - // order of pop_status_update() is not defined, therefore the more complicated test + // order of smtp_status_update_get() is not defined, therefore the more complicated test let mut instances_checked = 0; for i in 0..3 { - let (instance, min_ser, max_ser, descr) = t.pop_smtp_status_update().await?.unwrap(); + let (instance, min_ser, max_ser, descr) = t.smtp_status_update_get().await?.unwrap(); + t.smtp_status_update_pop_serials( + instance, + min_ser, + StatusUpdateSerial::new(max_ser.to_u32().checked_add(1).unwrap()), + ) + .await?; + let min_ser: u32 = min_ser.try_into()?; if instance == instance1.id { - assert_eq!(min_ser, max_ser); + assert_eq!(min_ser, max_ser.to_u32()); assert_eq!(descr, "descr1a"); instances_checked += 1; } else if instance == instance2.id { - assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 1); + assert_eq!(min_ser, max_ser.to_u32() - 1); assert_eq!(descr, "descr2b"); instances_checked += 1; } else if instance == instance3.id { - assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 2); + assert_eq!(min_ser, max_ser.to_u32() - 2); assert_eq!(descr, "descr3c"); instances_checked += 1; } else { @@ -1809,7 +1951,7 @@ mod tests { ); } assert_eq!(instances_checked, 3); - assert!(t.pop_smtp_status_update().await?.is_none()); + assert!(t.smtp_status_update_get().await?.is_none()); Ok(()) } @@ -1836,12 +1978,11 @@ mod tests { alice .send_webxdc_status_update(alice_instance.id, r#"{"payload": {"foo":"bar"}}"#, "descr") .await?; - alice.flush_status_updates().await?; expect_status_update_event(&alice, alice_instance.id).await?; alice .send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#, "descr") .await?; - alice.flush_status_updates().await?; + expect_status_update_event(&alice, alice_instance.id).await?; assert_eq!( alice .sql