Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 23 additions & 30 deletions crates/fakecloud-ecr/src/oci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,25 +659,17 @@ async fn blob_upload_patch(
PathBuf::from(&upload.spool_path)
};

// Two ingestion modes — true streaming when the dispatcher handed
// us the raw body stream (1 GiB push lands in constant memory),
// and a buffered fallback for legacy callers that already read the
// body into `request.body` (small chunks from tests, edge proxies).
let appended: u64 = if let Some(stream) = request.take_body_stream() {
append_stream(&spool, stream).await?
} else {
let chunk = &request.body;
if !chunk.is_empty() {
tokio::task::block_in_place(|| append_bytes_sync(&spool, chunk)).map_err(|e| {
AwsServiceError::aws_error(
StatusCode::INTERNAL_SERVER_ERROR,
"InternalError",
format!("failed to append upload chunk: {e}"),
)
})?;
}
chunk.len() as u64
};
// Streaming-only: dispatch flags blob-upload PATCH/PUT to keep
// `body_stream` populated, so we always consume it here. A 1 GiB
// push lands in constant memory.
let stream = request.take_body_stream().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"BLOB_UPLOAD_INVALID",
"blob upload PATCH requires a streaming request body",
)
})?;
let appended = append_stream(&spool, stream).await?;

let mut accounts = service.state_handle().write();
let state = accounts
Expand Down Expand Up @@ -744,17 +736,18 @@ async fn blob_upload_finish(
PathBuf::from(&upload.spool_path)
};

if let Some(stream) = request.take_body_stream() {
append_stream(&spool, stream).await?;
} else if !request.body.is_empty() {
tokio::task::block_in_place(|| append_bytes_sync(&spool, &request.body)).map_err(|e| {
AwsServiceError::aws_error(
StatusCode::INTERNAL_SERVER_ERROR,
"InternalError",
format!("failed to append final upload chunk: {e}"),
)
})?;
}
// Final PUT may carry an empty body (chunks already streamed via
// PATCH) or one last chunk. Either way, dispatch keeps body_stream
// populated; we drain it into the spool unconditionally so a
// single-call OCI upload (PATCH-less) works the same way.
let stream = request.take_body_stream().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"BLOB_UPLOAD_INVALID",
"blob upload PUT requires a streaming request body",
)
})?;
append_stream(&spool, stream).await?;

let combined = read_spool(&spool).map_err(|e| {
AwsServiceError::aws_error(
Expand Down
8 changes: 7 additions & 1 deletion crates/fakecloud-s3/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3208,6 +3208,12 @@ mod tests {
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("&");
// Wire body_stream from the same bytes so streaming-only handlers
// (put_object, upload_part) can consume it. Buffered handlers
// (put_object_tagging, put_object_acl, …) read `body` directly
// and ignore the stream.
let stream_body =
fakecloud_core::service::RequestBodyStream::from(Bytes::copy_from_slice(body));
AwsRequest {
service: "s3".to_string(),
action: String::new(),
Expand All @@ -3217,7 +3223,7 @@ mod tests {
headers: HeaderMap::new(),
query_params,
body: Bytes::copy_from_slice(body),
body_stream: parking_lot::Mutex::new(None),
body_stream: parking_lot::Mutex::new(Some(stream_body)),
path_segments: segments,
raw_path: path.to_string(),
raw_query,
Expand Down
57 changes: 18 additions & 39 deletions crates/fakecloud-s3/src/service/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,45 +193,24 @@ impl S3Service {
}
}

// Streaming part body: spool chunks to a tempfile while computing
// MD5 + size in constant memory. Buffered callers (tests, the
// legacy buffered code path) fall through with the existing
// `req.body` flow.
let spooled: Option<fakecloud_core::service::SpooledBody> =
if let Some(stream) = req.take_body_stream() {
Some(
fakecloud_core::service::spool_request_stream(
stream,
self.store.spool_dir().as_deref(),
)
.await?,
)
} else {
None
};
let buffered_body: Option<Bytes> = if spooled.is_none() {
Some(req.body.clone())
} else {
None
};
let part_size: u64 = match (&spooled, &buffered_body) {
(Some(s), _) => s.size,
(None, Some(b)) => b.len() as u64,
(None, None) => 0,
};
let etag: String = match (&spooled, &buffered_body) {
(Some(s), _) => s.md5_hex.clone(),
(None, Some(b)) => compute_md5(b),
(None, None) => compute_md5(&Bytes::new()),
};

let body_source: BodySource = if let Some(b) = &buffered_body {
BodySource::Bytes(b.clone())
} else if let Some(spool) = spooled {
BodySource::File(spool.path)
} else {
BodySource::Bytes(Bytes::new())
};
// UploadPart is streaming-only: spool chunks to a tempfile
// while computing MD5 + size in constant memory, then hand
// the file to the store as `BodySource::File`.
let stream = req.take_body_stream().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"MalformedRequestBody",
"UploadPart requires a streaming request body",
)
})?;
let spooled = fakecloud_core::service::spool_request_stream(
stream,
self.store.spool_dir().as_deref(),
)
.await?;
let part_size = spooled.size;
let etag = spooled.md5_hex.clone();
let body_source = BodySource::File(spooled.path);

let mut accts = self.state.write();
let state = accts.get_or_create(account_id);
Expand Down
119 changes: 45 additions & 74 deletions crates/fakecloud-s3/src/service/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::state::{AclGrant, S3Object};

use super::{
canned_acl_grants_for_object, check_get_conditionals, check_head_conditionals,
check_object_lock_for_overwrite, compute_checksum, compute_md5, deliver_notifications,
etag_matches, extract_user_metadata, extract_xml_value, is_frozen, is_valid_storage_class,
check_object_lock_for_overwrite, compute_checksum, deliver_notifications, etag_matches,
extract_user_metadata, extract_xml_value, is_frozen, is_valid_storage_class,
make_delete_marker, no_such_bucket, no_such_key, parse_delete_objects_xml, parse_grant_headers,
parse_range_header, parse_url_encoded_tags, precondition_failed, replicate_through_store,
resolve_object, s3_xml, url_encode_s3_key, xml_escape, RangeResult, S3Service,
Expand Down Expand Up @@ -732,40 +732,28 @@ impl S3Service {
}; // read lock dropped

// --- Preparation phase: compute object data outside any lock ---
// Streaming PutObject: drain the raw HTTP body to a tempfile on
// disk while computing MD5 + size in constant memory. For
// disk-mode the spool lands inside the S3 root so the eventual
// rename is a same-FS metadata move; for memory-mode the spool
// is read back into bytes and unlinked once the store consumes
// it. Buffered (non-streaming) callers fall through with the
// existing `req.body` flow.
let spooled: Option<fakecloud_core::service::SpooledBody> =
if let Some(stream) = req.take_body_stream() {
Some(
fakecloud_core::service::spool_request_stream(
stream,
self.store.spool_dir().as_deref(),
)
.await?,
)
} else {
None
};
let buffered_body: Option<Bytes> = if spooled.is_none() {
Some(req.body.clone())
} else {
None
};
let data_size: u64 = match (&spooled, &buffered_body) {
(Some(s), _) => s.size,
(None, Some(b)) => b.len() as u64,
(None, None) => 0,
};
let etag: String = match (&spooled, &buffered_body) {
(Some(s), _) => s.md5_hex.clone(),
(None, Some(b)) => compute_md5(b),
(None, None) => compute_md5(&Bytes::new()),
};
// PutObject is a streaming-only route: dispatch always wires
// `body_stream` for PUT-on-bucket-key requests routed through
// S3, and the test helper does the same for direct service
// calls. We drain the stream into a tempfile here, computing
// MD5 + size in constant memory; in disk mode the spool lands
// on the S3 root so the eventual rename is a same-FS metadata
// move, in memory mode the store reads the file back and
// unlinks it.
let stream = req.take_body_stream().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"MalformedRequestBody",
"PutObject requires a streaming request body",
)
})?;
let spooled = fakecloud_core::service::spool_request_stream(
stream,
self.store.spool_dir().as_deref(),
)
.await?;
let data_size: u64 = spooled.size;
let etag: String = spooled.md5_hex.clone();
let content_type = req
.headers
.get("content-type")
Expand Down Expand Up @@ -925,19 +913,17 @@ impl S3Service {
None
}
});
// Checksum computation. For buffered uploads we hash the in-memory
// body. For streamed uploads with an explicit checksum algorithm
// requested by the client, fold the spool file through the
// hasher in 1 MiB chunks (constant memory). Streamed uploads
// with no checksum algorithm skip this entirely.
let checksum_value = match (&checksum_algorithm, &buffered_body, &spooled) {
(Some(algo), Some(b), _) => Some(compute_checksum(algo, b)),
(Some(algo), None, Some(spool)) => Some(
super::compute_checksum_streaming(algo, &spool.path)
// Checksum computation: when the client requested an explicit
// algorithm, fold the spool through the hasher in 1 MiB chunks.
// Constant memory; skipped when no algorithm is requested.
let checksum_value = if let Some(algo) = &checksum_algorithm {
Some(
super::compute_checksum_streaming(algo, &spooled.path)
.await
.map_err(super::io_to_aws)?,
),
_ => None,
)
} else {
None
};

// Object lock - explicit headers or bucket default
Expand Down Expand Up @@ -978,43 +964,28 @@ impl S3Service {
}
}

// SSE-KMS: encrypt the body via the KMS hook so it lands as a
// fakecloud-kms envelope on disk and through replication. The
// KMS hook needs the plaintext as `Bytes` so streamed uploads
// get materialized here once (acceptable: KMS-encrypted objects
// are typically smaller than streamable plaintext, and the
// alternative would require chunked AES-GCM, which AWS itself
// doesn't expose). Fail-closed: a KMS error here aborts
// PutObject before any mutation. Non-KMS streamed uploads fall
// through with `body_source = BodySource::File` and never
// touch RAM.
// SSE-KMS: read the spool back into memory so the KMS hook can
// produce an envelope-encrypted blob (AES-GCM needs the whole
// plaintext at once — chunked AEAD isn't part of the AWS S3
// wire format). Fail-closed: a KMS error aborts PutObject
// before any mutation. Non-KMS uploads pass `BodySource::File`
// straight to the store and never copy the payload into RAM.
let plaintext_size = data_size;
let body_source: BodySource = if sse_algorithm.as_deref() == Some("aws:kms") {
let plaintext: Bytes = match (&buffered_body, &spooled) {
(Some(b), _) => b.clone(),
(None, Some(spool)) => {
let bytes = tokio::fs::read(&spool.path)
.await
.map_err(super::io_to_aws)?;
let _ = tokio::fs::remove_file(&spool.path).await;
Bytes::from(bytes)
}
(None, None) => Bytes::new(),
};
let bytes = tokio::fs::read(&spooled.path)
.await
.map_err(super::io_to_aws)?;
let _ = tokio::fs::remove_file(&spooled.path).await;
let cipher = self.encrypt_object_body(
account_id,
&region,
bucket,
&plaintext,
&Bytes::from(bytes),
sse_kms_key_id.as_deref(),
)?;
BodySource::Bytes(cipher)
} else if let Some(b) = &buffered_body {
BodySource::Bytes(b.clone())
} else if let Some(spool) = spooled {
BodySource::File(spool.path)
} else {
BodySource::Bytes(Bytes::new())
BodySource::File(spooled.path)
};
let obj = S3Object {
key: key.to_string(),
Expand Down
Loading