Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 93 additions & 39 deletions crates/engine/src/import_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,54 +210,108 @@ pub async fn handle_export_table(
DynamoDbError::ValidationException("Cannot create output directory".to_owned())
})?;
}
let mut file = tokio::fs::File::create(&output_path)
let file = tokio::fs::File::create(&output_path)
.await
.map_err(|_| DynamoDbError::ValidationException("Cannot create export file".to_owned()))?;

let mut item_count: i64 = 0;
// Run the entire scan inside a snapshot so the resulting file is
// consistent with respect to writes that happen during the export.
// The storage layer holds a REPEATABLE READ transaction for the
// duration; pages emitted via `on_page` reflect a single point-in-
// time view of the table.
//
// Note: "page" here is the snapshot scan's internal memory unit —
// it is unrelated to the Scan API's user-facing pagination and to
// the BatchGetItem / BatchWriteItem operation families.
let max_export_items = ctx.limits.max_export_item_count;
let mut exclusive_start_key: Option<Item> = None;
loop {
let (items, last_key) = ctx
.storage
.scan(
&key_info,
Some(1000),
exclusive_start_key.as_ref(),
None,
None,
None,
)
.await
.map_err(storage_err_to_dynamo)?;

item_count += i64::from(u16::try_from(items.len()).unwrap_or(u16::MAX));

if u64::try_from(item_count).unwrap_or(u64::MAX) > max_export_items {
return Err(DynamoDbError::ValidationException(format!(
"Export item count exceeds maximum ({max_export_items})"
)));
}
// Bridge the storage layer's per-page callback into the file writer
// via an mpsc channel: the callback sends each page; a writer task
// drains the channel and writes to disk. This keeps the closure
// simple (no &mut File capture issues) and decouples disk I/O
// latency from the scan.
let (page_tx, mut page_rx) = tokio::sync::mpsc::channel::<Vec<Item>>(4);

let writer_task: tokio::task::JoinHandle<Result<(), DynamoDbError>> = {
let mut file = file;
tokio::spawn(async move {
while let Some(items) = page_rx.recv().await {
for item in &items {
let wrapper = serde_json::json!({"Item": item});
let mut line = serde_json::to_string(&wrapper).map_err(|e| {
tracing::error!(internal_error = %e, "failed to serialize export item");
DynamoDbError::InternalServerError("Internal server error".to_owned())
})?;
line.push('\n');
tokio::io::AsyncWriteExt::write_all(&mut file, line.as_bytes())
.await
.map_err(|_| {
DynamoDbError::ValidationException(
"Cannot write export file".to_owned(),
)
})?;
}
}
// Best-effort flush; if it fails the writer task error wins.
let _ = tokio::io::AsyncWriteExt::flush(&mut file).await;
Ok(())
})
};

for item in &items {
let wrapper = serde_json::json!({"Item": item});
let mut line = serde_json::to_string(&wrapper).map_err(|e| {
tracing::error!(internal_error = %e, "failed to serialize export item");
DynamoDbError::InternalServerError("Internal server error".to_owned())
})?;
line.push('\n');
tokio::io::AsyncWriteExt::write_all(&mut file, line.as_bytes())
.await
.map_err(|_| {
DynamoDbError::ValidationException("Cannot write export file".to_owned())
let scan_result: Result<u64, DynamoDbError> = {
// Track count inside the closure so we can short-circuit the
// snapshot scan as soon as `max_export_item_count` is exceeded.
// Returning Err from `on_page` propagates through
// `scan_full_table_snapshot_impl`, which exits its loop without
// committing — releasing the REPEATABLE READ snapshot promptly so
// VACUUM can resume reclaiming dead tuples on the table.
let mut count: u64 = 0;
let on_page: extenddb_storage::SnapshotPageHandler<'_> = Box::new(move |items| {
count += items.len() as u64;
let exceeded = count > max_export_items;
let owned = items.to_vec();
let tx = page_tx.clone();
Box::pin(async move {
if exceeded {
return Err(extenddb_storage::error::StorageError::Validation(format!(
"Export item count exceeds maximum ({max_export_items})"
)));
}
tx.send(owned).await.map_err(|_| {
extenddb_storage::error::StorageError::Internal(
"export writer task closed early".to_owned(),
)
})?;
}
Ok(())
})
});
// The original `page_tx` sender is moved into `on_page`; once the
// scan returns, `on_page` is dropped and with it the last sender,
// closing the channel and letting the writer task exit cleanly.
ctx.storage
.scan_full_table_snapshot(&key_info, 1000, on_page)
.await
.map_err(storage_err_to_dynamo)
};

if last_key.is_none() {
break;
// Wait for the writer to finish flushing whatever the scan sent.
let writer_result = writer_task.await.unwrap_or_else(|e| {
Err(DynamoDbError::InternalServerError(format!(
"export writer task panicked: {e}"
)))
});

// Combine results. A successful scan return (Ok(n)) implies n is at or
// under max_export_item_count — the over-limit case is now handled
// inside the snapshot transaction. On any error, remove the partial
// file before propagating.
let item_count: i64 = match (scan_result, writer_result) {
(Ok(n), Ok(())) => i64::try_from(n).unwrap_or(i64::MAX),
(Err(e), _) | (Ok(_), Err(e)) => {
let _ = tokio::fs::remove_file(&output_path).await;
return Err(e);
}
exclusive_start_key = last_key;
}
};

let end_time = epoch_seconds();
let export_arn = format!("{}:export/{}", input.table_arn, uuid::Uuid::new_v4());
Expand Down
13 changes: 13 additions & 0 deletions crates/storage-postgres/src/data/data_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@ impl DataEngine for PostgresEngine {
})
}

fn scan_full_table_snapshot<'a>(
&'a self,
key_info: &'a TableKeyInfo,
page_size: i64,
on_page: extenddb_storage::SnapshotPageHandler<'a>,
) -> BoxFuture<'a, Result<u64, StorageError>> {
let key_info = key_info.clone();
Box::pin(async move {
self.scan_full_table_snapshot_impl(&key_info, page_size, on_page)
.await
})
}

fn transact_get_items(
&self,
ops: &[TransactGetOp<'_>],
Expand Down
15 changes: 11 additions & 4 deletions crates/storage-postgres/src/data/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,20 @@ pub(crate) fn bind_sk_value<'q>(
}

/// Execute a scan SQL statement with dynamic parameter binding.
pub(crate) async fn execute_scan_sql(
///
/// `executor` is generic so the caller can pass either a `&PgPool`
/// (the default, autocommit) or a `&mut Transaction<'_, Postgres>`
/// (for snapshot-isolated reads via `scan_full_table_snapshot`).
pub(crate) async fn execute_scan_sql<'e, E>(
sql: &str,
exclusive_start_key: Option<&Item>,
key_schema: &[KeySchemaElement],
attr_defs: &[AttributeDefinition],
pool: &sqlx::PgPool,
) -> Result<Vec<serde_json::Value>, StorageError> {
executor: E,
) -> Result<Vec<serde_json::Value>, StorageError>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let mut query = sqlx::query_as::<_, (serde_json::Value,)>(sql);

if let Some(start_key) = exclusive_start_key {
Expand All @@ -275,7 +282,7 @@ pub(crate) async fn execute_scan_sql(
}

let rows: Vec<(serde_json::Value,)> = query
.fetch_all(pool)
.fetch_all(executor)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

Expand Down
131 changes: 131 additions & 0 deletions crates/storage-postgres/src/data/query_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,135 @@ impl PostgresEngine {

Ok((items, last_key))
}

/// Run a full-table scan inside a `REPEATABLE READ READ ONLY`
/// transaction, emitting items to `on_page` one snapshot page at a time.
///
/// Snapshot isolation guarantees that all pages reflect the table state
/// at the time of the transaction's first read, regardless of concurrent
/// writers. The transaction is committed when the scan completes; if any
/// page handler returns `Err`, the transaction is rolled back instead.
///
/// "Page" here is the implementation's internal unit of work and has no
/// connection to the public Scan API's pagination or to the BatchGet/
/// BatchWrite operation families.
///
/// Always scans the base table — no GSI/LSI routing, no parallel-scan
/// segments, no filter expressions. This is the right shape for whole-
/// table operations like `ExportTableToPointInTime`; range or filtered
/// reads use [`scan_impl`] instead.
pub(crate) async fn scan_full_table_snapshot_impl(
&self,
key_info: &TableKeyInfo,
page_size: i64,
mut on_page: extenddb_storage::SnapshotPageHandler<'_>,
) -> Result<u64, StorageError> {
use std::fmt::Write;

// REPEATABLE READ on PostgreSQL gives snapshot isolation: every
// query in this transaction sees the same view of the database, as
// of the moment of the first read. READ ONLY is a hint that lets
// the planner skip locks we don't need. This combination is the
// standard PG idiom for "give me a consistent multi-statement
// read"; SERIALIZABLE would add serialization-failure retry
// overhead that's unnecessary for a read-only export.
let mut tx = self
.data_pool
.begin()
.await
.map_err(|e| StorageError::Internal(format!("begin snapshot: {e}")))?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY")
.execute(&mut *tx)
.await
.map_err(|e| StorageError::Internal(format!("set isolation: {e}")))?;

let ddb_table = data_table_name(&key_info.table_id);
let sk_info_val = sk_info(&key_info.key_schema, &key_info.attribute_definitions);

let mut total: u64 = 0;
let mut exclusive_start_key: Option<Item> = None;

loop {
// Build SQL — same pagination shape as `scan_impl`, but with a
// fixed `page_size` and no parallel-scan / index variants
// (full-table snapshot scans are always base-table only).
let mut sql = format!("SELECT item_data FROM {ddb_table}");
let param_idx: u32 = 1;
let mut conditions: Vec<String> = Vec::new();

if let Some(start_key) = exclusive_start_key.as_ref() {
let pk_name = &key_info.key_schema[0].attribute_name;
if !start_key.contains_key(pk_name) {
return Err(StorageError::Internal("missing pk in start key".to_owned()));
}
if let Some((_, sk_type)) = sk_info_val {
let sk_col = sk_column(sk_type);
let collate = if sk_type == ScalarAttributeType::S {
" COLLATE \"C\""
} else {
""
};
conditions.push(format!(
"(pk, {sk_col}{collate}) > (${param_idx}, ${next})",
next = param_idx + 1
));
} else {
conditions.push(format!("pk > ${param_idx}"));
}
}
if !conditions.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&conditions.join(" AND "));
}
if let Some((_, sk_type)) = sk_info_val {
let sk_col = sk_column(sk_type);
let collate = if sk_type == ScalarAttributeType::S {
" COLLATE \"C\""
} else {
""
};
let _ = write!(sql, " ORDER BY pk, {sk_col}{collate}");
} else {
sql.push_str(" ORDER BY pk");
}
let _ = write!(sql, " LIMIT {}", page_size + 1);

// Critical: pass `&mut *tx` so this query runs inside the
// snapshot transaction, not on a fresh pool connection.
let rows = execute_scan_sql(
&sql,
exclusive_start_key.as_ref(),
&key_info.key_schema,
&key_info.attribute_definitions,
&mut *tx,
)
.await?;

#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
let actual_page = page_size.max(0) as usize;
let has_more = rows.len() > actual_page;
let items: Vec<Item> = rows
.into_iter()
.take(actual_page)
.map(json_to_item)
.collect::<Result<Vec<_>, _>>()?;

if items.is_empty() {
break;
}

total += items.len() as u64;
on_page(&items).await?;

if !has_more {
break;
}
exclusive_start_key = items.last().map(|i| build_key(i, &key_info.key_schema));
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(format!("commit snapshot: {e}")))?;
Ok(total)
}
}
46 changes: 46 additions & 0 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ pub trait TableEngine: Send + Sync {
) -> BoxFuture<'_, Result<IndexInfo, StorageError>>;
}

/// Callback invoked once per page of items produced by
/// [`DataEngine::scan_full_table_snapshot`]. The implementation calls
/// this sequentially for every page until the table is fully scanned;
/// returning `Err` short-circuits the snapshot and rolls back the
/// underlying transaction without committing.
///
/// "Page" here refers to an internal memory-efficiency unit — it is
/// *not* the user-facing pagination of the public `Scan` API, and it
/// is unrelated to the `BatchGetItem` / `BatchWriteItem` operation
/// families. The whole snapshot is one logical read from the caller's
/// perspective; pages exist only so the implementation can avoid
/// buffering the entire table in memory.
pub type SnapshotPageHandler<'a> =
Box<dyn for<'b> FnMut(&'b [Item]) -> BoxFuture<'b, Result<(), StorageError>> + Send + 'a>;

/// Item-level data operations.
///
/// All methods receive a `TableKeyInfo` from the engine layer, which has
Expand Down Expand Up @@ -270,6 +285,37 @@ pub trait DataEngine: Send + Sync {
index_name: Option<&str>,
) -> BoxFuture<'_, QueryResult>;

/// Scan every item in a base table under a single consistent
/// snapshot, invoking `on_page` once for each page of items.
///
/// This is a **full-table** snapshot read: it always covers the
/// entire base table, never a secondary index, never a key range,
/// and never with a filter expression. It is intended for whole-
/// table operations such as `ExportTableToPointInTime` that must
/// observe a single point-in-time view of the table even while
/// concurrent writers are modifying it.
///
/// Implementations MUST guarantee that every page reflects the
/// table state at the time of the first read. Items written or
/// deleted by other connections after the snapshot is taken MUST
/// NOT appear in any page. On PostgreSQL this is achieved by
/// running every page query inside one transaction with snapshot
/// isolation (`REPEATABLE READ`).
///
/// `page_size` is purely an internal memory-efficiency knob — it
/// has no effect on what items are returned, only on how many are
/// buffered per call to `on_page`. It is unrelated to the
/// `Scan` API's user-facing `Limit` parameter and to the multi-
/// operation `BatchGetItem` / `BatchWriteItem` families.
///
/// Returns the total item count emitted across all pages.
fn scan_full_table_snapshot<'a>(
&'a self,
key_info: &'a TableKeyInfo,
page_size: i64,
on_page: SnapshotPageHandler<'a>,
) -> BoxFuture<'a, Result<u64, StorageError>>;

/// Execute multiple get operations in a single consistent snapshot.
///
/// Returns one `Option<Item>` per request, in the same order as `ops`.
Expand Down
Loading