Skip to content

Commit

Permalink
fix: Keep webxdc instance for delete_device_after period after a stat…
Browse files Browse the repository at this point in the history
…us update (#5365)

If `delete_device_after` is configured, that period should be counted for webxdc instances from the
last status update, otherwise nothing prevents from deleting them. Use `msgs.timestamp_rcvd` to
store the last status update timestamp, it anyway isn't used for anything except displaying a
detailed message info. Also, as `ephemeral::select_expired_messages()` now also checks
`timestamp_rcvd`, we have an improvement that a message is guaranteed not to be deleted for the
`delete_device_after` period since its receipt. Before only the sort timestamp was checked which is
derived from the "sent" timestamp.
  • Loading branch information
iequidoo committed Apr 8, 2024
1 parent 7e5959e commit c1d2510
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/debug_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
document: None,
uid: None,
},
time,
)
.await
{
Expand Down
5 changes: 3 additions & 2 deletions src/ephemeral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ WHERE
SELECT id, chat_id, type
FROM msgs
WHERE
timestamp < ?
timestamp < ?1
AND timestamp_rcvd < ?1
AND chat_id > ?
AND chat_id != ?
AND chat_id != ?
Expand Down Expand Up @@ -490,7 +491,7 @@ async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<
.sql
.query_get_value(
r#"
SELECT min(timestamp)
SELECT min(max(timestamp, timestamp_rcvd))
FROM msgs
WHERE chat_id > ?
AND chat_id != ?
Expand Down
87 changes: 67 additions & 20 deletions src/webxdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};

use deltachat_derive::FromSql;
use lettre_email::PartBuilder;
use rusqlite::OptionalExtension;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -291,7 +292,7 @@ impl Context {
from_id: ContactId,
) -> Result<Option<StatusUpdateSerial>> {
let Some(status_update_serial) = self
.write_status_update_inner(&instance.id, &status_update_item)
.write_status_update_inner(&instance.id, &status_update_item, timestamp)
.await?
else {
return Ok(None);
Expand Down Expand Up @@ -372,27 +373,30 @@ impl Context {
&self,
instance_id: &MsgId,
status_update_item: &StatusUpdateItem,
timestamp: i64,
) -> Result<Option<StatusUpdateSerial>> {
let _lock = self.sql.write_lock().await;
let uid = status_update_item.uid.as_deref();
let Some(rowid) = self
.sql
.query_row_optional(
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
ON CONFLICT (uid) DO NOTHING
RETURNING id",
(
instance_id,
serde_json::to_string(&status_update_item)?,
uid,
),
|row| {
let id: u32 = row.get(0)?;
Ok(id)
},
)
.await?
else {
let status_update_item = serde_json::to_string(&status_update_item)?;
let trans_fn = |t: &mut rusqlite::Transaction| {
t.execute(
"UPDATE msgs SET timestamp_rcvd=? WHERE id=?",
(timestamp, instance_id),
)?;
let rowid = t
.query_row(
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
ON CONFLICT (uid) DO NOTHING
RETURNING id",
(instance_id, status_update_item, uid),
|row| {
let id: u32 = row.get(0)?;
Ok(id)
},
)
.optional()?;
Ok(rowid)
};
let Some(rowid) = self.sql.transaction(trans_fn).await? else {
let uid = uid.unwrap_or("-");
info!(self, "Ignoring duplicate status update with uid={uid}");
return Ok(None);
Expand Down Expand Up @@ -850,6 +854,8 @@ impl Message {

#[cfg(test)]
mod tests {
use std::time::Duration;

use serde_json::json;

use super::*;
Expand All @@ -860,8 +866,10 @@ mod tests {
use crate::chatlist::Chatlist;
use crate::config::Config;
use crate::contact::Contact;
use crate::ephemeral;
use crate::receive_imf::{receive_imf, receive_imf_from_inbox};
use crate::test_utils::TestContext;
use crate::tools::{self, SystemTime};
use crate::{message, sql};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -2650,4 +2658,43 @@ sth_for_the = "future""#

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_status_update_vs_delete_device_after() -> Result<()> {
let alice = &TestContext::new_alice().await;
let bob = &TestContext::new_bob().await;
bob.set_config(Config::DeleteDeviceAfter, Some("3600"))
.await?;
let alice_chat = alice.create_chat(bob).await;
let alice_instance = send_webxdc_instance(alice, alice_chat.id).await?;
let bob_instance = bob.recv_msg(&alice.pop_sent_msg().await).await;

SystemTime::shift(Duration::from_secs(1800));
let mut update = Message {
chat_id: alice_chat.id,
viewtype: Viewtype::Text,
text: "I'm an update".to_string(),
hidden: true,
..Default::default()
};
update.param.set_cmd(SystemMessage::WebxdcStatusUpdate);
update
.param
.set(Param::Arg, r#"{"updates":[{"payload":{"foo":"bar"}}]}"#);
update.set_quote(alice, Some(&alice_instance)).await?;
let sent_msg = alice.send_msg(alice_chat.id, &mut update).await;
bob.recv_msg(&sent_msg).await;
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);

SystemTime::shift(Duration::from_secs(2700));
ephemeral::delete_expired_messages(bob, tools::time()).await?;
let bob_instance = Message::load_from_db(bob, bob_instance.id).await?;
assert_eq!(bob_instance.chat_id.is_trash(), false);

Ok(())
}
}

0 comments on commit c1d2510

Please sign in to comment.