From d6d90db957ee171365ca5dbd1f14b4440d843985 Mon Sep 17 00:00:00 2001 From: link2xt Date: Fri, 19 Jul 2024 03:16:57 +0000 Subject: [PATCH] feat: new BACKUP2 transfer protocol New protocol streams .tar into iroh-net stream without traversing all the files first. Reception over old backup protocol is still supported to allow transferring backups from old devices to new ones, but not vice versa. --- deltachat-ffi/deltachat.h | 2 + deltachat-ffi/src/lib.rs | 2 +- deltachat-ffi/src/lot.rs | 5 + deltachat-jsonrpc/src/api/types/qr.rs | 13 + node/constants.js | 1 + node/lib/constants.ts | 1 + python/tests/test_1_online.py | 2 - src/imex.rs | 86 +++-- src/imex/transfer.rs | 456 ++++++++++++-------------- src/peer_channels.rs | 2 +- src/qr.rs | 43 ++- src/qr_code_generator.rs | 2 +- 12 files changed, 341 insertions(+), 274 deletions(-) diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index e95c76208b..6180fba9ec 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2504,6 +2504,7 @@ void dc_stop_ongoing_process (dc_context_t* context); #define DC_QR_FPR_WITHOUT_ADDR 230 // test1=formatted fingerprint #define DC_QR_ACCOUNT 250 // text1=domain #define DC_QR_BACKUP 251 +#define DC_QR_BACKUP2 252 #define DC_QR_WEBRTC_INSTANCE 260 // text1=domain, text2=instance pattern #define DC_QR_ADDR 320 // id=contact #define DC_QR_TEXT 330 // text1=text @@ -2550,6 +2551,7 @@ void dc_stop_ongoing_process (dc_context_t* context); * if so, call dc_set_config_from_qr() and then dc_configure(). * * - DC_QR_BACKUP: + * - DC_QR_BACKUP2: * ask the user if they want to set up a new device. * If so, pass the qr-code to dc_receive_backup(). * diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index f3e71e2f91..09cd4acf4a 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -4364,7 +4364,7 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid let ctx = &*ffi_provider.context; let provider = &mut ffi_provider.provider; block_on(provider) - .context("Failed to await BackupProvider") + .context("Failed to await backup provider") .log_err(ctx) .set_last_error(ctx) .ok(); diff --git a/deltachat-ffi/src/lot.rs b/deltachat-ffi/src/lot.rs index 52b5a1e445..5c7479a87d 100644 --- a/deltachat-ffi/src/lot.rs +++ b/deltachat-ffi/src/lot.rs @@ -50,6 +50,7 @@ impl Lot { Qr::FprWithoutAddr { fingerprint, .. } => Some(fingerprint), Qr::Account { domain } => Some(domain), Qr::Backup { .. } => None, + Qr::Backup2 { .. } => None, Qr::WebrtcInstance { domain, .. } => Some(domain), Qr::Addr { draft, .. } => draft.as_deref(), Qr::Url { url } => Some(url), @@ -102,6 +103,7 @@ impl Lot { Qr::FprWithoutAddr { .. } => LotState::QrFprWithoutAddr, Qr::Account { .. } => LotState::QrAccount, Qr::Backup { .. } => LotState::QrBackup, + Qr::Backup2 { .. } => LotState::QrBackup2, Qr::WebrtcInstance { .. } => LotState::QrWebrtcInstance, Qr::Addr { .. } => LotState::QrAddr, Qr::Url { .. } => LotState::QrUrl, @@ -127,6 +129,7 @@ impl Lot { Qr::FprWithoutAddr { .. } => Default::default(), Qr::Account { .. } => Default::default(), Qr::Backup { .. } => Default::default(), + Qr::Backup2 { .. } => Default::default(), Qr::WebrtcInstance { .. } => Default::default(), Qr::Addr { contact_id, .. } => contact_id.to_u32(), Qr::Url { .. } => Default::default(), @@ -177,6 +180,8 @@ pub enum LotState { QrBackup = 251, + QrBackup2 = 252, + /// text1=domain, text2=instance pattern QrWebrtcInstance = 260, diff --git a/deltachat-jsonrpc/src/api/types/qr.rs b/deltachat-jsonrpc/src/api/types/qr.rs index 0f6d79c8c3..8cda4ac766 100644 --- a/deltachat-jsonrpc/src/api/types/qr.rs +++ b/deltachat-jsonrpc/src/api/types/qr.rs @@ -35,6 +35,11 @@ pub enum QrObject { Backup { ticket: String, }, + Backup2 { + auth_token: String, + + node_addr: String, + }, WebrtcInstance { domain: String, instance_pattern: String, @@ -132,6 +137,14 @@ impl From for QrObject { Qr::Backup { ticket } => QrObject::Backup { ticket: ticket.to_string(), }, + Qr::Backup2 { + ref node_addr, + auth_token, + } => QrObject::Backup2 { + node_addr: serde_json::to_string(node_addr).unwrap_or_default(), + + auth_token, + }, Qr::WebrtcInstance { domain, instance_pattern, diff --git a/node/constants.js b/node/constants.js index 3f3fe3c63b..9296e6ea75 100644 --- a/node/constants.js +++ b/node/constants.js @@ -128,6 +128,7 @@ module.exports = { DC_QR_ASK_VERIFYCONTACT: 200, DC_QR_ASK_VERIFYGROUP: 202, DC_QR_BACKUP: 251, + DC_QR_BACKUP2: 252, DC_QR_ERROR: 400, DC_QR_FPR_MISMATCH: 220, DC_QR_FPR_OK: 210, diff --git a/node/lib/constants.ts b/node/lib/constants.ts index 7297bf386f..553f145ecd 100644 --- a/node/lib/constants.ts +++ b/node/lib/constants.ts @@ -128,6 +128,7 @@ export enum C { DC_QR_ASK_VERIFYCONTACT = 200, DC_QR_ASK_VERIFYGROUP = 202, DC_QR_BACKUP = 251, + DC_QR_BACKUP2 = 252, DC_QR_ERROR = 400, DC_QR_FPR_MISMATCH = 220, DC_QR_FPR_OK = 210, diff --git a/python/tests/test_1_online.py b/python/tests/test_1_online.py index 318262f3d8..d4f1901a82 100644 --- a/python/tests/test_1_online.py +++ b/python/tests/test_1_online.py @@ -1562,8 +1562,6 @@ def assert_account_is_proper(ac): # check progress events for import assert imex_tracker.wait_progress(1, progress_upper_limit=249) - assert imex_tracker.wait_progress(500, progress_upper_limit=749) - assert imex_tracker.wait_progress(750, progress_upper_limit=999) assert imex_tracker.wait_progress(1000) assert_account_is_proper(ac1) diff --git a/src/imex.rs b/src/imex.rs index 8489638038..17bf631d81 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -6,7 +6,7 @@ use std::path::{Path, PathBuf}; use ::pgp::types::KeyTrait; use anyhow::{bail, ensure, format_err, Context as _, Result}; use deltachat_contact_tools::EmailAddress; -use futures::StreamExt; +use futures::TryStreamExt; use futures_lite::FutureExt; use tokio::fs::{self, File}; @@ -269,24 +269,56 @@ async fn import_backup( context.get_dbfile().display() ); + import_backup_stream(context, backup_file, file_size, passphrase).await?; + Ok(()) +} + +/// Imports backup by reading a tar file from a stream. +/// +/// `file_size` is used to calculate the progress +/// and emit progress events. +/// Ideally it is the sum of the entry +/// sizes without the header overhead, +/// but can be estimated as tar file size +/// in which case the progress is underestimated +/// and may not reach 99.9% by the end of import. +/// Underestimating is better than +/// overestimating because the progress +/// jumps to 100% instead of getting stuck at 99.9% +/// for some time. +pub(crate) async fn import_backup_stream( + context: &Context, + backup_file: R, + file_size: u64, + passphrase: String, +) -> Result<()> { let mut archive = Archive::new(backup_file); let mut entries = archive.entries()?; - let mut last_progress = 0; - while let Some(file) = entries.next().await { - let f = &mut file?; - - let current_pos = f.raw_file_position(); - let progress = 1000 * current_pos / file_size; - if progress != last_progress && progress > 10 && progress < 1000 { - // We already emitted ImexProgress(10) above + + // We already emitted ImexProgress(10) above + let mut last_progress = 10; + let mut total_size = 0; + while let Some(mut f) = entries + .try_next() + .await + .context("Failed to get next entry")? + { + total_size += f.header().entry_size()?; + let progress = std::cmp::min( + 1000 * total_size.checked_div(file_size).unwrap_or_default(), + 999, + ); + if progress > last_progress { context.emit_event(EventType::ImexProgress(progress as usize)); last_progress = progress; } if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) { // async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file. - f.unpack_in(context.get_blobdir()).await?; + f.unpack_in(context.get_blobdir()) + .await + .context("Failed to unpack database")?; let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME); context .sql @@ -298,7 +330,9 @@ async fn import_backup( .context("cannot remove unpacked database")?; } else { // async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards. - f.unpack_in(context.get_blobdir()).await?; + f.unpack_in(context.get_blobdir()) + .await + .context("Failed to unpack blob")?; let from_path = context.get_blobdir().join(f.path()?); if from_path.is_file() { if let Some(name) = from_path.file_name() { @@ -375,34 +409,40 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res dest_path.display(), ); - export_backup_inner(context, &temp_db_path, &temp_path).await?; + let file = File::create(&temp_path).await?; + let blobdir = BlobDirContents::new(context).await?; + export_backup_stream(context, &temp_db_path, blobdir, file) + .await + .context("Exporting backup to file failed")?; fs::rename(temp_path, &dest_path).await?; context.emit_event(EventType::ImexFileWritten(dest_path)); Ok(()) } -async fn export_backup_inner( - context: &Context, +/// Exports the database and blobs into a stream. +pub(crate) async fn export_backup_stream<'a, W>( + context: &'a Context, temp_db_path: &Path, - temp_path: &Path, -) -> Result<()> { - let file = File::create(temp_path).await?; - - let mut builder = tokio_tar::Builder::new(file); + blobdir: BlobDirContents<'a>, + writer: W, +) -> Result<()> +where + W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static, +{ + let mut builder = tokio_tar::Builder::new(writer); builder .append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME) .await?; - let blobdir = BlobDirContents::new(context).await?; - let mut last_progress = 0; + let mut last_progress = 10; for (i, blob) in blobdir.iter().enumerate() { let mut file = File::open(blob.to_abs_path()).await?; let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(blob.as_name()); builder.append_file(path_in_archive, &mut file).await?; - let progress = 1000 * i / blobdir.len(); - if progress != last_progress && progress > 10 && progress < 1000 { + let progress = std::cmp::min(1000 * i / blobdir.len(), 999); + if progress > last_progress { context.emit_event(EventType::ImexProgress(progress)); last_progress = progress; } diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 5158382615..aa37507ba2 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -1,17 +1,12 @@ //! Transfer a backup to an other device. //! -//! This module provides support for using n0's iroh tool to initiate transfer of a backup -//! to another device using a QR code. -//! -//! Using the iroh terminology there are two parties to this: +//! This module provides support for using [iroh](https://iroh.computer/) +//! to initiate transfer of a backup to another device using a QR code. //! +//! There are two parties to this: //! - The *Provider*, which starts a server and listens for connections. //! - The *Getter*, which connects to the server and retrieves the data. //! -//! Iroh is designed around the idea of verifying hashes, the downloads are verified as -//! they are retrieved. The entire transfer is initiated by requesting the data of a single -//! root hash. -//! //! Both the provider and the getter are authenticated: //! //! - The provider is known by its *peer ID*. @@ -21,23 +16,30 @@ //! Both these are transferred in the QR code offered to the getter. This ensures that the //! getter can not connect to an impersonated provider and the provider does not offer the //! download to an impersonated getter. +//! +//! Protocol starts by getter opening a bidirectional QUIC stream +//! to the provider and sending authentication token. +//! Provider verifies received authentication token, +//! sends the size of all files in a backup (database and all blobs) +//! as an unsigned 64-bit big endian integer and streams the backup in tar format. +//! Getter receives the backup and acknowledges successful reception +//! by sending a single byte. +//! Provider closes the endpoint after receiving an acknowledgment. use std::future::Future; -use std::net::Ipv4Addr; -use std::path::Path; use std::pin::Pin; +use std::sync::Arc; use std::task::Poll; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; -use async_channel::Receiver; use futures_lite::StreamExt; -use iroh::blobs::Collection; -use iroh::get::DataStream; -use iroh::progress::ProgressEmitter; -use iroh::protocol::AuthToken; -use iroh::provider::{DataSource, Event, Provider, Ticket}; -use iroh::Hash; -use iroh_old as iroh; +use iroh_net::relay::RelayMode; +use iroh_net::Endpoint; +use iroh_old; +use iroh_old::blobs::Collection; +use iroh_old::get::DataStream; +use iroh_old::progress::ProgressEmitter; +use iroh_old::provider::Ticket; use tokio::fs::{self, File}; use tokio::io::{self, AsyncWriteExt, BufWriter}; use tokio::sync::broadcast::error::RecvError; @@ -46,19 +48,22 @@ use tokio::task::{JoinHandle, JoinSet}; use tokio_stream::wrappers::ReadDirStream; use tokio_util::sync::CancellationToken; -use crate::blob::BlobDirContents; use crate::chat::{add_device_msg, delete_and_reset_all_device_msgs}; use crate::context::Context; +use crate::imex::BlobDirContents; use crate::message::{Message, Viewtype}; use crate::qr::{self, Qr}; use crate::stock_str::backup_transfer_msg_body; -use crate::tools::{time, TempPathGuard}; -use crate::{e2ee, EventType}; +use crate::tools::{create_id, time, TempPathGuard}; +use crate::EventType; -use super::{export_database, DBFILE_BACKUP_NAME}; +use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME}; const MAX_CONCURRENT_DIALS: u8 = 16; +/// ALPN protocol identifier for the backup transfer protocol. +const BACKUP_ALPN: &[u8] = b"/deltachat/backup"; + /// Provide or send a backup of this device. /// /// This creates a backup of the current device and starts a service which offers another @@ -69,15 +74,21 @@ const MAX_CONCURRENT_DIALS: u8 = 16; /// /// This starts a task which acquires the global "ongoing" mutex. If you need to stop the /// task use the [`Context::stop_ongoing`] mechanism. -/// -/// The task implements [`Future`] and awaiting it will complete once a transfer has been -/// either completed or aborted. #[derive(Debug)] pub struct BackupProvider { - /// The supervisor task, run by [`BackupProvider::watch_provider`]. + /// iroh-net endpoint. + _endpoint: Endpoint, + + /// iroh-net address. + node_addr: iroh_net::NodeAddr, + + /// Authentication token that should be submitted + /// to retrieve the backup. + auth_token: String, + + /// Handle for the task accepting backup transfer requests. handle: JoinHandle>, - /// The ticket to retrieve the backup collection. - ticket: Ticket, + /// Guard to cancel the provider on drop. _drop_guard: tokio_util::sync::DropGuard, } @@ -94,9 +105,13 @@ impl BackupProvider { /// /// [`Accounts::stop_io`]: crate::accounts::Accounts::stop_io pub async fn prepare(context: &Context) -> Result { - e2ee::ensure_secret_key_exists(context) - .await - .context("Private key not available, aborting backup export")?; + let relay_mode = RelayMode::Disabled; + let endpoint = Endpoint::builder() + .alpns(vec![BACKUP_ALPN.to_vec()]) + .relay_mode(relay_mode) + .bind(0) + .await?; + let node_addr = endpoint.node_addr().await?; // Acquire global "ongoing" mutex. let cancel_token = context.alloc_ongoing().await?; @@ -104,195 +119,153 @@ impl BackupProvider { let context_dir = context .get_blobdir() .parent() - .ok_or_else(|| anyhow!("Context dir not found"))?; + .context("Context dir not found")?; let dbfile = context_dir.join(DBFILE_BACKUP_NAME); if fs::metadata(&dbfile).await.is_ok() { fs::remove_file(&dbfile).await?; warn!(context, "Previous database export deleted"); } let dbfile = TempPathGuard::new(dbfile); - let res = tokio::select! { - biased; - res = Self::prepare_inner(context, &dbfile) => { - match res { - Ok(slf) => Ok(slf), - Err(err) => { - error!(context, "Failed to set up second device setup: {:#}", err); - Err(err) - }, - } - }, - _ = cancel_token.recv() => Err(format_err!("cancelled")), - }; - let (provider, ticket) = match res { - Ok((provider, ticket)) => (provider, ticket), - Err(err) => { - context.free_ongoing().await; - return Err(err); - } - }; + + // Authentication token that receiver should send us to receive a backup. + let auth_token = create_id(); + + let passphrase = String::new(); + + export_database(context, &dbfile, passphrase, time()) + .await + .context("Database export failed")?; + context.emit_event(EventType::ImexProgress(300)); + let drop_token = CancellationToken::new(); let handle = { let context = context.clone(); let drop_token = drop_token.clone(); + let endpoint = endpoint.clone(); + let auth_token = auth_token.clone(); tokio::spawn(async move { - let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await; + Self::accept_loop( + context.clone(), + endpoint, + auth_token, + cancel_token, + drop_token, + dbfile, + ) + .await; + info!(context, "Finished accept loop."); + context.free_ongoing().await; // Explicit drop to move the guards into this future drop(paused_guard); - drop(dbfile); - res + Ok(()) }) }; Ok(Self { + _endpoint: endpoint, + node_addr, + auth_token, handle, - ticket, _drop_guard: drop_token.drop_guard(), }) } - /// Creates the provider task. - /// - /// Having this as a function makes it easier to cancel it when needed. - async fn prepare_inner(context: &Context, dbfile: &Path) -> Result<(Provider, Ticket)> { - // Generate the token up front: we also use it to encrypt the database. - let token = AuthToken::generate(); - context.emit_event(SendProgress::Started.into()); - export_database(context, dbfile, token.to_string(), time()) - .await - .context("Database export failed")?; - context.emit_event(SendProgress::DatabaseExported.into()); - - // Now we can be sure IO is not running. - let mut files = vec![DataSource::with_name( - dbfile.to_owned(), - format!("db/{DBFILE_BACKUP_NAME}"), - )]; - let blobdir = BlobDirContents::new(context).await?; + async fn handle_connection( + context: Context, + conn: iroh_net::endpoint::Connecting, + auth_token: String, + dbfile: Arc, + ) -> Result<()> { + let conn = conn.await?; + let (mut send_stream, mut recv_stream) = conn.accept_bi().await?; + + // Read authentication token from the stream. + let mut received_auth_token = vec![0u8; auth_token.len()]; + recv_stream.read_exact(&mut received_auth_token).await?; + if received_auth_token.as_slice() != auth_token.as_bytes() { + warn!(context, "Received wrong backup authentication token."); + return Ok(()); + } + + info!(context, "Received valid backup authentication token."); + + let blobdir = BlobDirContents::new(&context).await?; + + let mut file_size = 0; + file_size += dbfile.metadata()?.len(); for blob in blobdir.iter() { - let path = blob.to_abs_path(); - let name = format!("blob/{}", blob.as_file_name()); - files.push(DataSource::with_name(path, name)); + file_size += blob.to_abs_path().metadata()?.len() } - // Start listening. - let (db, hash) = iroh::provider::create_collection(files).await?; - context.emit_event(SendProgress::CollectionCreated.into()); - let provider = Provider::builder(db) - .bind_addr((Ipv4Addr::UNSPECIFIED, 0).into()) - .auth_token(token) - .spawn()?; - context.emit_event(SendProgress::ProviderListening.into()); - info!(context, "Waiting for remote to connect"); - let ticket = provider.ticket(hash)?; - Ok((provider, ticket)) + send_stream.write_all(&file_size.to_be_bytes()).await?; + + export_backup_stream(&context, &dbfile, blobdir, send_stream) + .await + .context("Failed to write backup into QUIC stream")?; + info!(context, "Finished writing backup into QUIC stream."); + let mut buf = [0u8; 1]; + info!(context, "Waiting for acknowledgment."); + recv_stream.read_exact(&mut buf).await?; + info!(context, "Received backup reception acknowledgement."); + context.emit_event(EventType::ImexProgress(1000)); + + let mut msg = Message::new(Viewtype::Text); + msg.text = backup_transfer_msg_body(&context).await; + add_device_msg(&context, None, Some(&mut msg)).await?; + + Ok(()) } - /// Supervises the iroh [`Provider`], terminating it when needed. - /// - /// This will watch the provider and terminate it when: - /// - /// - A transfer is completed, successful or unsuccessful. - /// - An event could not be observed to protect against not knowing of a completed event. - /// - The ongoing process is cancelled. - /// - /// The *cancel_token* is the handle for the ongoing process mutex, when this completes - /// we must cancel this operation. - async fn watch_provider( - context: &Context, - mut provider: Provider, - cancel_token: Receiver<()>, + async fn accept_loop( + context: Context, + endpoint: Endpoint, + auth_token: String, + cancel_token: async_channel::Receiver<()>, drop_token: CancellationToken, - ) -> Result<()> { - let mut events = provider.subscribe(); - let mut total_size = 0; - let mut current_size = 0; - let res = loop { + dbfile: TempPathGuard, + ) { + let dbfile = Arc::new(dbfile); + loop { tokio::select! { biased; - res = &mut provider => { - break res.context("BackupProvider failed"); - }, - maybe_event = events.recv() => { - match maybe_event { - Ok(event) => { - match event { - Event::ClientConnected { ..} => { - context.emit_event(SendProgress::ClientConnected.into()); - } - Event::RequestReceived { .. } => { - } - Event::TransferCollectionStarted { total_blobs_size, .. } => { - total_size = total_blobs_size; - context.emit_event(SendProgress::TransferInProgress { - current_size, - total_size, - }.into()); - } - Event::TransferBlobCompleted { size, .. } => { - current_size += size; - context.emit_event(SendProgress::TransferInProgress { - current_size, - total_size, - }.into()); - } - Event::TransferCollectionCompleted { .. } => { - context.emit_event(SendProgress::TransferInProgress { - current_size: total_size, - total_size - }.into()); - provider.shutdown(); - } - Event::TransferAborted { .. } => { - provider.shutdown(); - break Err(anyhow!("BackupProvider transfer aborted")); - } - } - } - Err(broadcast::error::RecvError::Closed) => { - // We should never see this, provider.join() should complete - // first. - } - Err(broadcast::error::RecvError::Lagged(_)) => { - // We really shouldn't be lagging, if we did we may have missed - // a completion event. - provider.shutdown(); - break Err(anyhow!("Missed events from BackupProvider")); + + conn = endpoint.accept() => { + if let Some(conn) = conn { + // Got a new in-progress connection. + let context = context.clone(); + let auth_token = auth_token.clone(); + let dbfile = dbfile.clone(); + if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).await { + warn!(context, "Error while handling backup connection: {err:#}."); + } else { + info!(context, "Backup transfer finished successfully."); + break; } + } else { + break; } }, _ = cancel_token.recv() => { - provider.shutdown(); - break Err(anyhow!("BackupProvider cancelled")); - }, + context.emit_event(EventType::ImexProgress(0)); + break; + } _ = drop_token.cancelled() => { - provider.shutdown(); - break Err(anyhow!("BackupProvider dropped")); + context.emit_event(EventType::ImexProgress(0)); + break; } } - }; - match &res { - Ok(_) => { - context.emit_event(SendProgress::Completed.into()); - let mut msg = Message::new(Viewtype::Text); - msg.text = backup_transfer_msg_body(context).await; - add_device_msg(context, None, Some(&mut msg)).await?; - } - Err(err) => { - error!(context, "Backup transfer failure: {err:#}"); - context.emit_event(SendProgress::Failed.into()) - } } - res } /// Returns a QR code that allows fetching this backup. /// /// This QR code can be passed to [`get_backup`] on a (different) device. pub fn qr(&self) -> Qr { - Qr::Backup { - ticket: self.ticket.clone(), + Qr::Backup2 { + node_addr: self.node_addr.clone(), + + auth_token: self.auth_token.clone(), } } } @@ -300,61 +273,14 @@ impl BackupProvider { impl Future for BackupProvider { type Output = Result<()>; + /// Waits for the backup transfer to complete. fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { Pin::new(&mut self.handle).poll(cx)? } } -/// Create [`EventType::ImexProgress`] events using readable names. -/// -/// Plus you get warnings if you don't use all variants. -#[derive(Debug)] -enum SendProgress { - Failed, - Started, - DatabaseExported, - CollectionCreated, - ProviderListening, - ClientConnected, - TransferInProgress { current_size: u64, total_size: u64 }, - Completed, -} - -impl From for EventType { - fn from(source: SendProgress) -> Self { - use SendProgress::*; - let num: u16 = match source { - Failed => 0, - Started => 100, - DatabaseExported => 300, - CollectionCreated => 350, - ProviderListening => 400, - ClientConnected => 450, - TransferInProgress { - current_size, - total_size, - } => { - // the range is 450..=950 - 450 + ((current_size as f64 / total_size as f64) * 500.).floor() as u16 - } - Completed => 1000, - }; - Self::ImexProgress(num.into()) - } -} - -/// Contacts a backup provider and receives the backup from it. -/// -/// This uses a QR code to contact another instance of deltachat which is providing a backup -/// using the [`BackupProvider`]. Once connected it will authenticate using the secrets in -/// the QR code and retrieve the backup. -/// -/// This is a long running operation which will only when completed. -/// -/// Using [`Qr`] as argument is a bit odd as it only accepts one specific variant of it. It -/// does avoid having [`iroh::provider::Ticket`] in the primary API however, without -/// having to revert to untyped bytes. -pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { +/// Retrieves backup from a legacy backup provider using iroh 0.4. +pub async fn get_legacy_backup(context: &Context, qr: Qr) -> Result<()> { ensure!( matches!(qr, Qr::Backup { .. }), "QR code for backup must be of type DCBACKUP" @@ -380,6 +306,64 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { res } +pub async fn get_backup2( + context: &Context, + node_addr: iroh_net::NodeAddr, + auth_token: String, +) -> Result<()> { + let relay_mode = RelayMode::Disabled; + + let endpoint = Endpoint::builder().relay_mode(relay_mode).bind(0).await?; + + let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?; + let (mut send_stream, mut recv_stream) = conn.open_bi().await?; + info!(context, "Sending backup authentication token."); + send_stream.write_all(auth_token.as_bytes()).await?; + + let passphrase = String::new(); + info!(context, "Starting to read backup from the stream."); + + let mut file_size_buf = [0u8; 8]; + recv_stream.read_exact(&mut file_size_buf).await?; + let file_size = u64::from_be_bytes(file_size_buf); + import_backup_stream(context, recv_stream, file_size, passphrase) + .await + .context("Failed to import backup from QUIC stream")?; + info!(context, "Finished importing backup from the stream."); + context.emit_event(EventType::ImexProgress(1000)); + + // Send an acknowledgement, but ignore the errors. + // We have imported backup successfully already. + send_stream.write_all(b".").await.ok(); + send_stream.finish().await.ok(); + info!(context, "Sent backup reception acknowledgment."); + + Ok(()) +} + +/// Contacts a backup provider and receives the backup from it. +/// +/// This uses a QR code to contact another instance of deltachat which is providing a backup +/// using the [`BackupProvider`]. Once connected it will authenticate using the secrets in +/// the QR code and retrieve the backup. +/// +/// This is a long running operation which will return only when completed. +/// +/// Using [`Qr`] as argument is a bit odd as it only accepts specific variants of it. It +/// does avoid having [`iroh_old::provider::Ticket`] in the primary API however, without +/// having to revert to untyped bytes. +pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { + match qr { + Qr::Backup { .. } => get_legacy_backup(context, qr).await?, + Qr::Backup2 { + node_addr, + auth_token, + } => get_backup2(context, node_addr, auth_token).await?, + _ => bail!("QR code for backup must be of type DCBACKUP or DCBACKUP2"), + } + Ok(()) +} + async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> { let ticket = match qr { Qr::Backup { ticket } => ticket, @@ -426,7 +410,7 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<() // Perform the transfer. let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality - let stats = iroh::get::run_ticket( + let stats = iroh_old::get::run_ticket( ticket, keylog, MAX_CONCURRENT_DIALS, @@ -458,7 +442,7 @@ async fn on_blob( progress: &ProgressEmitter, jobs: &Mutex>, ticket: &Ticket, - _hash: Hash, + _hash: iroh_old::Hash, mut reader: DataStream, name: String, ) -> Result { @@ -640,24 +624,6 @@ mod tests { .await; } - #[test] - fn test_send_progress() { - let cases = [ - ((0, 100), 450), - ((10, 100), 500), - ((50, 100), 700), - ((100, 100), 950), - ]; - - for ((current_size, total_size), progress) in cases { - let out = EventType::from(SendProgress::TransferInProgress { - current_size, - total_size, - }); - assert_eq!(out, EventType::ImexProgress(progress)); - } - } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_drop_provider() { let mut tcm = TestContextManager::new(); diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 8c24507a23..24a4b9f3db 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -221,7 +221,7 @@ impl ChannelState { } impl Context { - /// Create magic endpoint and gossip. + /// Create iroh endpoint and gossip. async fn init_peer_channels(&self) -> Result { let secret_key = SecretKey::generate(); let public_key = secret_key.public(); diff --git a/src/qr.rs b/src/qr.rs index 72e4f356e5..5a3fe60fe4 100644 --- a/src/qr.rs +++ b/src/qr.rs @@ -37,8 +37,13 @@ const VCARD_SCHEME: &str = "BEGIN:VCARD"; const SMTP_SCHEME: &str = "SMTP:"; const HTTP_SCHEME: &str = "http://"; const HTTPS_SCHEME: &str = "https://"; + +/// Legacy backup transfer based on iroh 0.4. pub(crate) const DCBACKUP_SCHEME: &str = "DCBACKUP:"; +/// Backup transfer based on iroh-net. +pub(crate) const DCBACKUP2_SCHEME: &str = "DCBACKUP2:"; + /// Scanned QR code. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Qr { @@ -106,7 +111,7 @@ pub enum Qr { domain: String, }, - /// Provides a backup that can be retrieve. + /// Provides a backup that can be retrieved using legacy iroh 0.4. /// /// This contains all the data needed to connect to a device and download a backup from /// it to configure the receiving device with the same account. @@ -120,6 +125,15 @@ pub enum Qr { ticket: iroh::provider::Ticket, }, + /// Provides a backup that can be retrieved using iroh-net based backup transfer protocol. + Backup2 { + /// Iroh node address. + node_addr: iroh_net::NodeAddr, + + /// Authentication token. + auth_token: String, + }, + /// Ask the user if they want to use the given service for video chats. WebrtcInstance { /// Server domain name. @@ -266,6 +280,8 @@ pub async fn check_qr(context: &Context, qr: &str) -> Result { decode_webrtc_instance(context, qr)? } else if starts_with_ignore_case(qr, DCBACKUP_SCHEME) { decode_backup(qr)? + } else if starts_with_ignore_case(qr, DCBACKUP2_SCHEME) { + decode_backup2(qr)? } else if qr.starts_with(MAILTO_SCHEME) { decode_mailto(context, qr).await? } else if qr.starts_with(SMTP_SCHEME) { @@ -295,6 +311,13 @@ pub async fn check_qr(context: &Context, qr: &str) -> Result { pub fn format_backup(qr: &Qr) -> Result { match qr { Qr::Backup { ref ticket } => Ok(format!("{DCBACKUP_SCHEME}{ticket}")), + Qr::Backup2 { + ref node_addr, + ref auth_token, + } => { + let node_addr = serde_json::to_string(node_addr)?; + Ok(format!("{DCBACKUP2_SCHEME}{auth_token}&{node_addr}")) + } _ => Err(anyhow!("Not a backup QR code")), } } @@ -529,6 +552,24 @@ fn decode_backup(qr: &str) -> Result { Ok(Qr::Backup { ticket }) } +/// Decodes a [`DCBACKUP2_SCHEME`] QR code. +fn decode_backup2(qr: &str) -> Result { + let payload = qr + .strip_prefix(DCBACKUP2_SCHEME) + .ok_or_else(|| anyhow!("invalid DCBACKUP scheme"))?; + let (auth_token, node_addr) = payload + .split_once('&') + .context("Backup QR code has no separator")?; + let auth_token = auth_token.to_string(); + let node_addr = serde_json::from_str::(node_addr) + .context("Invalid node addr in backup QR code")?; + + Ok(Qr::Backup2 { + node_addr, + auth_token, + }) +} + #[derive(Debug, Deserialize)] struct CreateAccountSuccessResponse { /// Email address. diff --git a/src/qr_code_generator.rs b/src/qr_code_generator.rs index 85c3ecba68..fc747aba20 100644 --- a/src/qr_code_generator.rs +++ b/src/qr_code_generator.rs @@ -58,7 +58,7 @@ async fn generate_verification_qr(context: &Context) -> Result { ) } -/// Renders a [`Qr::Backup`] QR code as an SVG image. +/// Renders a [`Qr::Backup2`] QR code as an SVG image. pub async fn generate_backup_qr(context: &Context, qr: &Qr) -> Result { let content = qr::format_backup(qr)?; let (avatar, displayname, _addr, color) = self_info(context).await?;