diff --git a/crates/engine/src/import_export.rs b/crates/engine/src/import_export.rs index 73c503f..a68c92c 100755 --- a/crates/engine/src/import_export.rs +++ b/crates/engine/src/import_export.rs @@ -210,54 +210,108 @@ pub async fn handle_export_table( DynamoDbError::ValidationException("Cannot create output directory".to_owned()) })?; } - let mut file = tokio::fs::File::create(&output_path) + let file = tokio::fs::File::create(&output_path) .await .map_err(|_| DynamoDbError::ValidationException("Cannot create export file".to_owned()))?; - let mut item_count: i64 = 0; + // Run the entire scan inside a snapshot so the resulting file is + // consistent with respect to writes that happen during the export. + // The storage layer holds a REPEATABLE READ transaction for the + // duration; pages emitted via `on_page` reflect a single point-in- + // time view of the table. + // + // Note: "page" here is the snapshot scan's internal memory unit — + // it is unrelated to the Scan API's user-facing pagination and to + // the BatchGetItem / BatchWriteItem operation families. let max_export_items = ctx.limits.max_export_item_count; - let mut exclusive_start_key: Option = None; - loop { - let (items, last_key) = ctx - .storage - .scan( - &key_info, - Some(1000), - exclusive_start_key.as_ref(), - None, - None, - None, - ) - .await - .map_err(storage_err_to_dynamo)?; - item_count += i64::from(u16::try_from(items.len()).unwrap_or(u16::MAX)); - - if u64::try_from(item_count).unwrap_or(u64::MAX) > max_export_items { - return Err(DynamoDbError::ValidationException(format!( - "Export item count exceeds maximum ({max_export_items})" - ))); - } + // Bridge the storage layer's per-page callback into the file writer + // via an mpsc channel: the callback sends each page; a writer task + // drains the channel and writes to disk. This keeps the closure + // simple (no &mut File capture issues) and decouples disk I/O + // latency from the scan. + let (page_tx, mut page_rx) = tokio::sync::mpsc::channel::>(4); + + let writer_task: tokio::task::JoinHandle> = { + let mut file = file; + tokio::spawn(async move { + while let Some(items) = page_rx.recv().await { + for item in &items { + let wrapper = serde_json::json!({"Item": item}); + let mut line = serde_json::to_string(&wrapper).map_err(|e| { + tracing::error!(internal_error = %e, "failed to serialize export item"); + DynamoDbError::InternalServerError("Internal server error".to_owned()) + })?; + line.push('\n'); + tokio::io::AsyncWriteExt::write_all(&mut file, line.as_bytes()) + .await + .map_err(|_| { + DynamoDbError::ValidationException( + "Cannot write export file".to_owned(), + ) + })?; + } + } + // Best-effort flush; if it fails the writer task error wins. + let _ = tokio::io::AsyncWriteExt::flush(&mut file).await; + Ok(()) + }) + }; - for item in &items { - let wrapper = serde_json::json!({"Item": item}); - let mut line = serde_json::to_string(&wrapper).map_err(|e| { - tracing::error!(internal_error = %e, "failed to serialize export item"); - DynamoDbError::InternalServerError("Internal server error".to_owned()) - })?; - line.push('\n'); - tokio::io::AsyncWriteExt::write_all(&mut file, line.as_bytes()) - .await - .map_err(|_| { - DynamoDbError::ValidationException("Cannot write export file".to_owned()) + let scan_result: Result = { + // Track count inside the closure so we can short-circuit the + // snapshot scan as soon as `max_export_item_count` is exceeded. + // Returning Err from `on_page` propagates through + // `scan_full_table_snapshot_impl`, which exits its loop without + // committing — releasing the REPEATABLE READ snapshot promptly so + // VACUUM can resume reclaiming dead tuples on the table. + let mut count: u64 = 0; + let on_page: extenddb_storage::SnapshotPageHandler<'_> = Box::new(move |items| { + count += items.len() as u64; + let exceeded = count > max_export_items; + let owned = items.to_vec(); + let tx = page_tx.clone(); + Box::pin(async move { + if exceeded { + return Err(extenddb_storage::error::StorageError::Validation(format!( + "Export item count exceeds maximum ({max_export_items})" + ))); + } + tx.send(owned).await.map_err(|_| { + extenddb_storage::error::StorageError::Internal( + "export writer task closed early".to_owned(), + ) })?; - } + Ok(()) + }) + }); + // The original `page_tx` sender is moved into `on_page`; once the + // scan returns, `on_page` is dropped and with it the last sender, + // closing the channel and letting the writer task exit cleanly. + ctx.storage + .scan_full_table_snapshot(&key_info, 1000, on_page) + .await + .map_err(storage_err_to_dynamo) + }; - if last_key.is_none() { - break; + // Wait for the writer to finish flushing whatever the scan sent. + let writer_result = writer_task.await.unwrap_or_else(|e| { + Err(DynamoDbError::InternalServerError(format!( + "export writer task panicked: {e}" + ))) + }); + + // Combine results. A successful scan return (Ok(n)) implies n is at or + // under max_export_item_count — the over-limit case is now handled + // inside the snapshot transaction. On any error, remove the partial + // file before propagating. + let item_count: i64 = match (scan_result, writer_result) { + (Ok(n), Ok(())) => i64::try_from(n).unwrap_or(i64::MAX), + (Err(e), _) | (Ok(_), Err(e)) => { + let _ = tokio::fs::remove_file(&output_path).await; + return Err(e); } - exclusive_start_key = last_key; - } + }; let end_time = epoch_seconds(); let export_arn = format!("{}:export/{}", input.table_arn, uuid::Uuid::new_v4()); diff --git a/crates/storage-postgres/src/data/data_engine.rs b/crates/storage-postgres/src/data/data_engine.rs index 740dd02..f30cd92 100755 --- a/crates/storage-postgres/src/data/data_engine.rs +++ b/crates/storage-postgres/src/data/data_engine.rs @@ -162,6 +162,19 @@ impl DataEngine for PostgresEngine { }) } + fn scan_full_table_snapshot<'a>( + &'a self, + key_info: &'a TableKeyInfo, + page_size: i64, + on_page: extenddb_storage::SnapshotPageHandler<'a>, + ) -> BoxFuture<'a, Result> { + let key_info = key_info.clone(); + Box::pin(async move { + self.scan_full_table_snapshot_impl(&key_info, page_size, on_page) + .await + }) + } + fn transact_get_items( &self, ops: &[TransactGetOp<'_>], diff --git a/crates/storage-postgres/src/data/query.rs b/crates/storage-postgres/src/data/query.rs index ebffbed..28100ba 100755 --- a/crates/storage-postgres/src/data/query.rs +++ b/crates/storage-postgres/src/data/query.rs @@ -249,13 +249,20 @@ pub(crate) fn bind_sk_value<'q>( } /// Execute a scan SQL statement with dynamic parameter binding. -pub(crate) async fn execute_scan_sql( +/// +/// `executor` is generic so the caller can pass either a `&PgPool` +/// (the default, autocommit) or a `&mut Transaction<'_, Postgres>` +/// (for snapshot-isolated reads via `scan_full_table_snapshot`). +pub(crate) async fn execute_scan_sql<'e, E>( sql: &str, exclusive_start_key: Option<&Item>, key_schema: &[KeySchemaElement], attr_defs: &[AttributeDefinition], - pool: &sqlx::PgPool, -) -> Result, StorageError> { + executor: E, +) -> Result, StorageError> +where + E: sqlx::Executor<'e, Database = sqlx::Postgres>, +{ let mut query = sqlx::query_as::<_, (serde_json::Value,)>(sql); if let Some(start_key) = exclusive_start_key { @@ -275,7 +282,7 @@ pub(crate) async fn execute_scan_sql( } let rows: Vec<(serde_json::Value,)> = query - .fetch_all(pool) + .fetch_all(executor) .await .map_err(|e| StorageError::Internal(e.to_string()))?; diff --git a/crates/storage-postgres/src/data/query_scan.rs b/crates/storage-postgres/src/data/query_scan.rs index e91d94c..7af9ab3 100755 --- a/crates/storage-postgres/src/data/query_scan.rs +++ b/crates/storage-postgres/src/data/query_scan.rs @@ -291,4 +291,135 @@ impl PostgresEngine { Ok((items, last_key)) } + + /// Run a full-table scan inside a `REPEATABLE READ READ ONLY` + /// transaction, emitting items to `on_page` one snapshot page at a time. + /// + /// Snapshot isolation guarantees that all pages reflect the table state + /// at the time of the transaction's first read, regardless of concurrent + /// writers. The transaction is committed when the scan completes; if any + /// page handler returns `Err`, the transaction is rolled back instead. + /// + /// "Page" here is the implementation's internal unit of work and has no + /// connection to the public Scan API's pagination or to the BatchGet/ + /// BatchWrite operation families. + /// + /// Always scans the base table — no GSI/LSI routing, no parallel-scan + /// segments, no filter expressions. This is the right shape for whole- + /// table operations like `ExportTableToPointInTime`; range or filtered + /// reads use [`scan_impl`] instead. + pub(crate) async fn scan_full_table_snapshot_impl( + &self, + key_info: &TableKeyInfo, + page_size: i64, + mut on_page: extenddb_storage::SnapshotPageHandler<'_>, + ) -> Result { + use std::fmt::Write; + + // REPEATABLE READ on PostgreSQL gives snapshot isolation: every + // query in this transaction sees the same view of the database, as + // of the moment of the first read. READ ONLY is a hint that lets + // the planner skip locks we don't need. This combination is the + // standard PG idiom for "give me a consistent multi-statement + // read"; SERIALIZABLE would add serialization-failure retry + // overhead that's unnecessary for a read-only export. + let mut tx = self + .data_pool + .begin() + .await + .map_err(|e| StorageError::Internal(format!("begin snapshot: {e}")))?; + sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY") + .execute(&mut *tx) + .await + .map_err(|e| StorageError::Internal(format!("set isolation: {e}")))?; + + let ddb_table = data_table_name(&key_info.table_id); + let sk_info_val = sk_info(&key_info.key_schema, &key_info.attribute_definitions); + + let mut total: u64 = 0; + let mut exclusive_start_key: Option = None; + + loop { + // Build SQL — same pagination shape as `scan_impl`, but with a + // fixed `page_size` and no parallel-scan / index variants + // (full-table snapshot scans are always base-table only). + let mut sql = format!("SELECT item_data FROM {ddb_table}"); + let param_idx: u32 = 1; + let mut conditions: Vec = Vec::new(); + + if let Some(start_key) = exclusive_start_key.as_ref() { + let pk_name = &key_info.key_schema[0].attribute_name; + if !start_key.contains_key(pk_name) { + return Err(StorageError::Internal("missing pk in start key".to_owned())); + } + if let Some((_, sk_type)) = sk_info_val { + let sk_col = sk_column(sk_type); + let collate = if sk_type == ScalarAttributeType::S { + " COLLATE \"C\"" + } else { + "" + }; + conditions.push(format!( + "(pk, {sk_col}{collate}) > (${param_idx}, ${next})", + next = param_idx + 1 + )); + } else { + conditions.push(format!("pk > ${param_idx}")); + } + } + if !conditions.is_empty() { + sql.push_str(" WHERE "); + sql.push_str(&conditions.join(" AND ")); + } + if let Some((_, sk_type)) = sk_info_val { + let sk_col = sk_column(sk_type); + let collate = if sk_type == ScalarAttributeType::S { + " COLLATE \"C\"" + } else { + "" + }; + let _ = write!(sql, " ORDER BY pk, {sk_col}{collate}"); + } else { + sql.push_str(" ORDER BY pk"); + } + let _ = write!(sql, " LIMIT {}", page_size + 1); + + // Critical: pass `&mut *tx` so this query runs inside the + // snapshot transaction, not on a fresh pool connection. + let rows = execute_scan_sql( + &sql, + exclusive_start_key.as_ref(), + &key_info.key_schema, + &key_info.attribute_definitions, + &mut *tx, + ) + .await?; + + #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] + let actual_page = page_size.max(0) as usize; + let has_more = rows.len() > actual_page; + let items: Vec = rows + .into_iter() + .take(actual_page) + .map(json_to_item) + .collect::, _>>()?; + + if items.is_empty() { + break; + } + + total += items.len() as u64; + on_page(&items).await?; + + if !has_more { + break; + } + exclusive_start_key = items.last().map(|i| build_key(i, &key_info.key_schema)); + } + + tx.commit() + .await + .map_err(|e| StorageError::Internal(format!("commit snapshot: {e}")))?; + Ok(total) + } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index ca87133..e93aeb2 100755 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -145,6 +145,21 @@ pub trait TableEngine: Send + Sync { ) -> BoxFuture<'_, Result>; } +/// Callback invoked once per page of items produced by +/// [`DataEngine::scan_full_table_snapshot`]. The implementation calls +/// this sequentially for every page until the table is fully scanned; +/// returning `Err` short-circuits the snapshot and rolls back the +/// underlying transaction without committing. +/// +/// "Page" here refers to an internal memory-efficiency unit — it is +/// *not* the user-facing pagination of the public `Scan` API, and it +/// is unrelated to the `BatchGetItem` / `BatchWriteItem` operation +/// families. The whole snapshot is one logical read from the caller's +/// perspective; pages exist only so the implementation can avoid +/// buffering the entire table in memory. +pub type SnapshotPageHandler<'a> = + Box FnMut(&'b [Item]) -> BoxFuture<'b, Result<(), StorageError>> + Send + 'a>; + /// Item-level data operations. /// /// All methods receive a `TableKeyInfo` from the engine layer, which has @@ -270,6 +285,37 @@ pub trait DataEngine: Send + Sync { index_name: Option<&str>, ) -> BoxFuture<'_, QueryResult>; + /// Scan every item in a base table under a single consistent + /// snapshot, invoking `on_page` once for each page of items. + /// + /// This is a **full-table** snapshot read: it always covers the + /// entire base table, never a secondary index, never a key range, + /// and never with a filter expression. It is intended for whole- + /// table operations such as `ExportTableToPointInTime` that must + /// observe a single point-in-time view of the table even while + /// concurrent writers are modifying it. + /// + /// Implementations MUST guarantee that every page reflects the + /// table state at the time of the first read. Items written or + /// deleted by other connections after the snapshot is taken MUST + /// NOT appear in any page. On PostgreSQL this is achieved by + /// running every page query inside one transaction with snapshot + /// isolation (`REPEATABLE READ`). + /// + /// `page_size` is purely an internal memory-efficiency knob — it + /// has no effect on what items are returned, only on how many are + /// buffered per call to `on_page`. It is unrelated to the + /// `Scan` API's user-facing `Limit` parameter and to the multi- + /// operation `BatchGetItem` / `BatchWriteItem` families. + /// + /// Returns the total item count emitted across all pages. + fn scan_full_table_snapshot<'a>( + &'a self, + key_info: &'a TableKeyInfo, + page_size: i64, + on_page: SnapshotPageHandler<'a>, + ) -> BoxFuture<'a, Result>; + /// Execute multiple get operations in a single consistent snapshot. /// /// Returns one `Option` per request, in the same order as `ops`.