diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dc1371d65..f671758b31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - strip leading/trailing whitespace from "Chat-Group-Name{,-Changed}:" headers content #3650 - Assume all Thunderbird users prefer encryption #3774 - refactor peerstate handling to ensure no duplicate peerstates #3776 +- Fetch messages in order of their INTERNALDATE (fixes reactions for Gmail f.e.) #3789 ## 1.102.0 diff --git a/python/tests/test_1_online.py b/python/tests/test_1_online.py index 39cbae48cd..07da891347 100644 --- a/python/tests/test_1_online.py +++ b/python/tests/test_1_online.py @@ -1,6 +1,7 @@ import os import queue import sys +import time from datetime import datetime, timezone import pytest @@ -1350,6 +1351,43 @@ def ac_reactions_changed(self, message): assert reactions.get_by_contact(contacts[0]) == react_str +def test_reactions_for_a_reordering_move(acfactory, lp): + """When a batch of messages is moved from Inbox to DeltaChat folder with a single MOVE command, + their UIDs may be reordered (e.g. Gmail is known for that) which led to that messages were + processed by receive_imf in the wrong order, and, particularly, reactions were processed before + messages they refer to and thus dropped. + """ + (ac1,) = acfactory.get_online_accounts(1) + ac2 = acfactory.new_online_configuring_account(mvbox_move=True) + acfactory.bring_accounts_online() + chat1 = acfactory.get_accepted_chat(ac1, ac2) + ac2.stop_io() + + lp.sec("sending message + reaction from ac1 to ac2") + msg1 = chat1.send_text("hi") + ac1._evtracker.wait_msg_delivered(msg1) + # It's is sad, but messages must differ in their INTERNALDATEs to be processed in the correct + # order by DC, and most (if not all) mail servers provide only seconds precision. + time.sleep(2) + react_str = "\N{THUMBS UP SIGN}" + ac1._evtracker.wait_msg_delivered(msg1.send_reaction(react_str)) + + lp.sec("moving messages to ac2's DeltaChat folder in the reverse order") + ac2.direct_imap.connect() + for uid in sorted([m.uid for m in ac2.direct_imap.get_all_messages()], reverse=True): + ac2.direct_imap.conn.move(uid, "DeltaChat") + + lp.sec("receiving messages by ac2") + ac2.start_io() + msg2 = ac2._evtracker.wait_next_reactions_changed() + assert msg2.text == msg1.text + reactions = msg2.get_reactions() + contacts = reactions.get_contacts() + assert len(contacts) == 1 + assert contacts[0].addr == ac1.get_config("addr") + assert reactions.get_by_contact(contacts[0]) == react_str + + def test_import_export_online_all(acfactory, tmpdir, data, lp): (ac1,) = acfactory.get_online_accounts(1) diff --git a/src/imap.rs b/src/imap.rs index 91948e0dbb..f3501539ba 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -6,8 +6,9 @@ use std::{ cmp, cmp::max, - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, iter::Peekable, + mem::take, }; use anyhow::{bail, format_err, Context as _, Result}; @@ -71,7 +72,7 @@ pub enum ImapActionResult { /// - Chat-Version to check if a message is a chat message /// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message, /// not necessarily sent by Delta Chat. -const PREFETCH_FLAGS: &str = "(UID RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\ +const PREFETCH_FLAGS: &str = "(UID INTERNALDATE RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\ MESSAGE-ID \ X-MICROSOFT-ORIGINAL-MESSAGE-ID \ FROM \ @@ -1263,14 +1264,15 @@ impl Imap { .as_mut() .context("IMAP No Connection established")?; - let uids = session + let mut uids: Vec<_> = session .uid_search(get_imap_self_sent_search_command(context).await?) .await? .into_iter() .collect(); + uids.sort_unstable(); let mut result = Vec::new(); - for uid_set in &build_sequence_sets(uids) { + for (_, uid_set) in build_sequence_sets(&uids)? { let mut list = session .uid_fetch(uid_set, "(UID BODY.PEEK[HEADER.FIELDS (FROM TO CC BCC)])") .await @@ -1296,8 +1298,9 @@ impl Imap { Ok(result) } - /// Prefetch all messages greater than or equal to `uid_next`. Return a list of fetch results. - async fn prefetch(&mut self, uid_next: u32) -> Result> { + /// Prefetch all messages greater than or equal to `uid_next`. Returns a list of fetch results + /// in the order of ascending delivery time to the server (INTERNALDATE). + async fn prefetch(&mut self, uid_next: u32) -> Result> { let session = self.session.as_mut(); let session = session.context("fetch_after(): IMAP No Connection established")?; @@ -1312,25 +1315,25 @@ impl Imap { while let Some(fetch) = list.next().await { let msg = fetch?; if let Some(msg_uid) = msg.uid { - msgs.insert(msg_uid, msg); + // If the mailbox is not empty, results always include + // at least one UID, even if last_seen_uid+1 is past + // the last UID in the mailbox. It happens because + // uid:* is interpreted the same way as *:uid. + // See for + // standard reference. Therefore, sometimes we receive + // already seen messages and have to filter them out. + if msg_uid >= uid_next { + msgs.insert((msg.internal_date(), msg_uid), msg); + } } } drop(list); - // If the mailbox is not empty, results always include - // at least one UID, even if last_seen_uid+1 is past - // the last UID in the mailbox. It happens because - // uid:* is interpreted the same way as *:uid. - // See for - // standard reference. Therefore, sometimes we receive - // already seen messages and have to filter them out. - let new_msgs = msgs.split_off(&uid_next); - - Ok(new_msgs) + Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect()) } /// Like fetch_after(), but not for new messages but existing ones (the DC_FETCH_EXISTING_MSGS_COUNT newest messages) - async fn prefetch_existing_msgs(&mut self) -> Result> { + async fn prefetch_existing_msgs(&mut self) -> Result> { let exists: i64 = { let mailbox = self .config @@ -1355,11 +1358,11 @@ impl Imap { while let Some(fetch) = list.next().await { let msg = fetch?; if let Some(msg_uid) = msg.uid { - msgs.insert(msg_uid, msg); + msgs.insert((msg.internal_date(), msg_uid), msg); } } - Ok(msgs) + Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect()) } /// Fetches a list of messages by server UID. @@ -1382,12 +1385,10 @@ impl Imap { } let session = self.session.as_mut().context("no IMAP session")?; - - let sets = build_sequence_sets(server_uids.clone()); - let mut count = 0; + let sets = build_sequence_sets(&server_uids)?; let mut last_uid = None; - for set in sets.iter() { + for (server_uids, set) in sets.iter() { let mut msgs = match session .uid_fetch( &set, @@ -1413,19 +1414,41 @@ impl Imap { } }; - while let Some(Ok(msg)) = msgs.next().await { - let server_uid = msg.uid.unwrap_or_default(); - - if !server_uids.contains(&server_uid) { - warn!( - context, - "Got unwanted uid {} not in {:?}, requested {:?}", - &server_uid, - server_uids, - &sets - ); - continue; + let mut uid_msgs = server_uids + .iter() + .map(|&uid| (uid, None)) + .collect::>(); + let mut server_uids_it = server_uids.iter().peekable(); + let mut count = 0; + while let Some(&&server_uid) = server_uids_it.peek() { + let mut msg = uid_msgs.insert(server_uid, None).flatten(); + while msg.is_none() { + let msg_unwrapped = match msgs.next().await { + Some(Ok(msg)) => msg, + Some(Err(_)) => continue, + None => break, + }; + let msg_uid = msg_unwrapped.uid.unwrap_or_default(); + if !uid_msgs.contains_key(&msg_uid) { + // Unwanted UIDs are possible because of unsolicited responses, e.g. if + // another client changes \Seen flag on a message after we do a prefetch but + // before fetch. It's not an error if we receive such unsolicited response. + continue; + } + msg = Some(msg_unwrapped); + if msg_uid != server_uid && uid_msgs.insert(msg_uid, msg.take()).is_some() { + warn!(context, "Got duplicated UID {}", msg_uid); + } } + let msg = match msg { + Some(msg) => msg, + None => { + warn!(context, "Missed UID {} in the server response", server_uid); + server_uids_it.next(); + continue; + } + }; + server_uids_it.next(); count += 1; let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted); @@ -1483,17 +1506,19 @@ impl Imap { }; last_uid = Some(server_uid) } - } - - if count != server_uids.len() { - warn!( - context, - "failed to fetch all uids: got {}, requested {}, we requested the UIDs {:?} using {:?}", - count, - server_uids.len(), - server_uids, - sets - ); + // If we don't process the whole response, IMAP client is left in a broken state where + // it will try to process the rest of response as the next response. + while msgs.next().await.is_some() {} + if count != server_uids.len() { + warn!( + context, + "failed to fetch all uids: got {}, requested {}, we requested the UIDs {:?} using {:?}", + count, + server_uids.len(), + server_uids, + sets, + ); + } } Ok((last_uid, received_msgs)) @@ -2300,13 +2325,11 @@ async fn should_ignore_folder( /// Builds a list of sequence/uid sets. The returned sets have each no more than around 1000 /// characters because according to /// command lines should not be much more than 1000 chars (servers should allow at least 8000 chars) -fn build_sequence_sets(mut uids: Vec) -> Vec { - uids.sort_unstable(); - +fn build_sequence_sets(uids: &[u32]) -> Result, String)>> { // first, try to find consecutive ranges: let mut ranges: Vec = vec![]; - for current in uids { + for ¤t in uids { if let Some(last) = ranges.last_mut() { if last.end + 1 == current { last.end = current; @@ -2321,22 +2344,24 @@ fn build_sequence_sets(mut uids: Vec) -> Vec { } // Second, sort the uids into uid sets that are each below ~1000 characters - let mut result = vec![String::new()]; + let mut result = vec![]; + let (mut last_uids, mut last_str) = (Vec::new(), String::new()); for range in ranges { - if let Some(last) = result.last_mut() { - if !last.is_empty() { - last.push(','); - } - last.push_str(&range.to_string()); + last_uids.reserve((range.end - range.start + 1).try_into()?); + (range.start..=range.end).for_each(|u| last_uids.push(u)); + if !last_str.is_empty() { + last_str.push(','); + } + last_str.push_str(&range.to_string()); - if last.len() > 990 { - result.push(String::new()); // Start a new uid set - } + if last_str.len() > 990 { + result.push((take(&mut last_uids), take(&mut last_str))); } } + result.push((last_uids, last_str)); - result.retain(|s| !s.is_empty()); - result + result.retain(|(_, s)| !s.is_empty()); + Ok(result) } struct UidRange { @@ -2442,61 +2467,69 @@ mod tests { #[test] fn test_build_sequence_sets() { + assert_eq!(build_sequence_sets(&[]).unwrap(), vec![]); + let cases = vec![ - (vec![], vec![]), - (vec![1], vec!["1"]), - (vec![3291], vec!["3291"]), - (vec![1, 3, 5, 7, 9, 11], vec!["1,3,5,7,9,11"]), - (vec![1, 2, 3], vec!["1:3"]), - (vec![1, 4, 5, 6], vec!["1,4:6"]), - ((1..=500).collect(), vec!["1:500"]), - (vec![3, 4, 8, 9, 10, 11, 39, 50, 2], vec!["2:4,8:11,39,50"]), + (vec![1], "1"), + (vec![3291], "3291"), + (vec![1, 3, 5, 7, 9, 11], "1,3,5,7,9,11"), + (vec![1, 2, 3], "1:3"), + (vec![1, 4, 5, 6], "1,4:6"), + ((1..=500).collect(), "1:500"), + (vec![3, 4, 8, 9, 10, 11, 39, 50, 2], "3:4,8:11,39,50,2"), ]; - for (input, output) in cases { - assert_eq!(build_sequence_sets(input), output); + for (input, s) in cases { + assert_eq!( + build_sequence_sets(&input).unwrap(), + vec![(input, s.into())] + ); } + let has_number = |(uids, s): &(Vec, String), number| { + uids.iter().any(|&n| n == number) + && s.split(',').any(|n| n.parse::().unwrap() == number) + }; + let numbers: Vec<_> = (2..=500).step_by(2).collect(); - let result = build_sequence_sets(numbers.clone()); - for set in &result { + let result = build_sequence_sets(&numbers).unwrap(); + for (_, set) in &result { assert!(set.len() < 1010); assert!(!set.ends_with(',')); assert!(!set.starts_with(',')); } assert!(result.len() == 1); // these UIDs fit in one set - for number in &numbers { - assert!(result - .iter() - .any(|set| set.split(',').any(|n| n.parse::().unwrap() == *number))); + for &number in &numbers { + assert!(result.iter().any(|r| has_number(r, number))); } let numbers: Vec<_> = (1..=1000).step_by(3).collect(); - let result = build_sequence_sets(numbers.clone()); - for set in &result { + let result = build_sequence_sets(&numbers).unwrap(); + for (_, set) in &result { assert!(set.len() < 1010); assert!(!set.ends_with(',')); assert!(!set.starts_with(',')); } - assert!(result.last().unwrap().ends_with("997,1000")); + let (last_uids, last_str) = result.last().unwrap(); + assert_eq!( + last_uids.get((last_uids.len() - 2)..).unwrap(), + &[997, 1000] + ); + assert!(last_str.ends_with("997,1000")); assert!(result.len() == 2); // This time we need 2 sets - for number in &numbers { - assert!(result - .iter() - .any(|set| set.split(',').any(|n| n.parse::().unwrap() == *number))); + for &number in &numbers { + assert!(result.iter().any(|r| has_number(r, number))); } let numbers: Vec<_> = (30000000..=30002500).step_by(4).collect(); - let result = build_sequence_sets(numbers.clone()); - for set in &result { + let result = build_sequence_sets(&numbers).unwrap(); + for (_, set) in &result { assert!(set.len() < 1010); assert!(!set.ends_with(',')); assert!(!set.starts_with(',')); } assert_eq!(result.len(), 6); - for number in &numbers { - assert!(result - .iter() - .any(|set| set.split(',').any(|n| n.parse::().unwrap() == *number))); + for &number in &numbers { + assert!(result.iter().any(|r| has_number(r, number))); } }