Skip to content
Draft
2 changes: 1 addition & 1 deletion deltachat-rpc-client/tests/test_something.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pytest

from deltachat_rpc_client import Contact, EventType, Message, events
from deltachat_rpc_client.const import DownloadState, MessageState
from deltachat_rpc_client.const import MessageState
from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS
from deltachat_rpc_client.rpc import JsonRpcError

Expand Down
8 changes: 7 additions & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ impl Context {
convert_folder_meaning(self, folder_meaning).await?
{
connection
.fetch_move_delete(self, &mut session, &watch_folder, folder_meaning)
.fetch_move_delete(self, &mut session, true, &watch_folder, folder_meaning)
.await?;
}
}
Expand All @@ -605,6 +605,12 @@ impl Context {
warn!(self, "Failed to update quota: {err:#}.");
}
}

// OPTIONAL TODO: if time left start downloading messages
// while (msg = download_when_normal_starts) {
// if not time_left {break;}
// connection.download_message(msg) }
// }
}

info!(
Expand Down
168 changes: 149 additions & 19 deletions src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use serde::{Deserialize, Serialize};

use crate::context::Context;
use crate::imap::session::Session;
use crate::log::info;
use crate::message::{Message, MsgId};
use crate::log::{info, warn};
use crate::message::{self, Message, MsgId, rfc724_mid_exists};
use crate::{EventType, chatlist_events};

/// If a message is downloaded only partially
Expand All @@ -26,8 +26,7 @@ pub(crate) const PRE_MESSAGE_ATTACHMENT_SIZE_THRESHOLD: u64 = 140_000;
/// Max message size to be fetched in the background.
/// This limit defines what messages are fully fetched in the background.
/// This is for all messages that don't have the full message header.
#[allow(unused)]
pub(crate) const MAX_FETCH_MSG_SIZE: usize = 1_000_000;
pub(crate) const MAX_FETCH_MSG_SIZE: u32 = 1_000_000;

/// Max size for pre messages. A warning is emitted when this is exceeded.
/// Should be well below `MAX_FETCH_MSG_SIZE`
Expand Down Expand Up @@ -78,11 +77,17 @@ impl MsgId {
}
DownloadState::InProgress => return Err(anyhow!("Download already in progress.")),
DownloadState::Available | DownloadState::Failure => {
if msg.rfc724_mid().is_empty() {
return Err(anyhow!("Download not possible, message has no rfc724_mid"));
}
self.update_download_state(context, DownloadState::InProgress)
.await?;
context
.sql
.execute("INSERT INTO download (msg_id) VALUES (?)", (self,))
.execute(
"INSERT INTO download (msg_id) VALUES (?)",
(msg.rfc724_mid(),),
)
.await?;
context.scheduler.interrupt_inbox().await;
}
Expand Down Expand Up @@ -131,25 +136,14 @@ impl Message {
/// Most messages are downloaded automatically on fetch instead.
pub(crate) async fn download_msg(
context: &Context,
msg_id: MsgId,
rfc724_mid: String,
session: &mut Session,
) -> Result<()> {
let Some(msg) = Message::load_from_db_optional(context, msg_id).await? else {
// If partially downloaded message was already deleted
// we do not know its Message-ID anymore
// so cannot download it.
//
// Probably the message expired due to `delete_device_after`
// setting or was otherwise removed from the device,
// so we don't want it to reappear anyway.
return Ok(());
};

let row = context
.sql
.query_row_optional(
"SELECT uid, folder FROM imap WHERE rfc724_mid=? AND target!=''",
(&msg.rfc724_mid,),
(&rfc724_mid,),
|row| {
let server_uid: u32 = row.get(0)?;
let server_folder: String = row.get(1)?;
Expand All @@ -164,7 +158,7 @@ pub(crate) async fn download_msg(
};

session
.fetch_single_msg(context, &server_folder, server_uid, msg.rfc724_mid.clone())
.fetch_single_msg(context, &server_folder, server_uid, rfc724_mid)
.await?;
Ok(())
}
Expand Down Expand Up @@ -206,6 +200,142 @@ impl Session {
}
}

async fn set_msg_state_to_failed(context: &Context, rfc724_mid: &str) -> Result<()> {
if let Some(msg_id) = rfc724_mid_exists(context, rfc724_mid).await? {
// Update download state to failure
// so it can be retried.
//
// On success update_download_state() is not needed
// as receive_imf() already
// set the state and emitted the event.
msg_id
.update_download_state(context, DownloadState::Failure)
.await?;
}
Ok(())
}

async fn available_full_msgs_contains_rfc724_mid(
context: &Context,
rfc724_mid: &str,
) -> Result<bool> {
Ok(context
.sql
.query_get_value::<MsgId>(
"SELECT rfc724_mid FROM available_full_msgs WHERE rfc724_mid=?",
(&rfc724_mid,),
)
.await?
.is_some())
}

async fn remove_from_available_full_msgs_table(context: &Context, rfc724_mid: &str) -> Result<()> {
context
.sql
.execute(
"DELETE FROM available_full_msgs WHERE rfc724_mid=?",
(&rfc724_mid,),
)
.await?;
Ok(())
}

async fn remove_from_download_table(context: &Context, rfc724_mid: &str) -> Result<()> {
context
.sql
.execute("DELETE FROM download WHERE rfc724_mid=?", (&rfc724_mid,))
.await?;
Ok(())
}

// this is a dedicated method because it is used in multiple places.
pub(crate) async fn premessage_is_downloaded_for(
context: &Context,
rfc724_mid: &str,
) -> Result<bool> {
Ok(message::rfc724_mid_exists(context, rfc724_mid)
.await?
.is_some())
}

pub(crate) async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> {
let rfc724_mids = context
.sql
.query_map_vec("SELECT rfc724_mid FROM download", (), |row| {
let rfc724_mid: String = row.get(0)?;
Ok(rfc724_mid)
})
.await?;

for rfc724_mid in &rfc724_mids {
let res = download_msg(context, rfc724_mid.clone(), session).await;
if res.is_ok() {
remove_from_download_table(context, rfc724_mid).await?;
remove_from_available_full_msgs_table(context, rfc724_mid).await?;
}
if let Err(err) = res {
warn!(
context,
"Failed to download message rfc724_mid={rfc724_mid}: {:#}.", err
);
if !premessage_is_downloaded_for(context, rfc724_mid).await? {
// This is probably a classical email that vanished before we could download it
warn!(
context,
"{rfc724_mid} is probably a classical email that vanished before we could download it"
);
remove_from_download_table(context, rfc724_mid).await?;
} else if available_full_msgs_contains_rfc724_mid(context, rfc724_mid).await? {
// set the message to DownloadState::Failure - probably it was deleted on the server in the meantime
set_msg_state_to_failed(context, rfc724_mid).await?;
remove_from_download_table(context, rfc724_mid).await?;
remove_from_available_full_msgs_table(context, rfc724_mid).await?;
} else {
// leave the message in DownloadState::InProgress;
// it will be downloaded once it arrives.
}
}
}

Ok(())
}

/// Download known full messages without pre_message
/// in order to guard against lost pre-messages:
// TODO better fn name
pub(crate) async fn download_known_full_messages_without_pre_message(
context: &Context,
session: &mut Session,
) -> Result<()> {
let rfc724_mids = context
.sql
.query_map_vec("SELECT rfc724_mid FROM available_full_msgs", (), |row| {
let rfc724_mid: String = row.get(0)?;
Ok(rfc724_mid)
})
.await?;
for rfc724_mid in &rfc724_mids {
if !premessage_is_downloaded_for(context, rfc724_mid).await? {
// Download the full-message unconditionally,
// because the pre-message got lost.
// The message may be in the wrong order,
// but at least we have it at all.
let res = download_msg(context, rfc724_mid.clone(), session).await;
if res.is_ok() {
remove_from_available_full_msgs_table(context, rfc724_mid).await?;
}
if let Err(err) = res {
warn!(
context,
"download_known_full_messages_without_pre_message: Failed to download message rfc724_mid={rfc724_mid}: {:#}.",
err
);
}
}
}
Ok(())
}

#[cfg(test)]
mod tests {
use mailparse::MailHeaderMap;
Expand Down
5 changes: 5 additions & 0 deletions src/headerdef.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,15 @@ pub enum HeaderDef {
/// used to encrypt and decrypt messages.
/// This secret is sent to a new member in the member-addition message.
ChatBroadcastSecret,

/// This message announces a bigger message with attachment that is refereced by rfc724_mid.
#[strum(serialize = "Chat-Full-Message-ID")] // correct casing
ChatFullMessageId,

/// Announce full message attachment size inside of a pre-message.
#[strum(serialize = "Chat-Full-Message-Size")] // correct casing
ChatFullMessageSize,

/// This message has a pre-message
/// and thus this message can be skipped while fetching messages.
/// This is a cleartext / unproteced header.
Expand Down
Loading
Loading