[PPSC-181] feat(api): implement streaming multipart uploads#91
[PPSC-181] feat(api): implement streaming multipart uploads#91yiftach-armis merged 8 commits intomainfrom
Conversation
Refactor StartIngest() to use io.Pipe() for streaming multipart uploads instead of buffering entire tarballs in memory. This prevents OOM crashes when scanning large repositories (2GB+) or container images (5GB+). Key changes: - Add copyWithContext() for cancellation-responsive large uploads - Add DisableRetry option to httpclient for non-rewindable streams - Split WithHTTPClient/WithUploadHTTPClient to preserve no-retry config - Fix slowReader test helper to comply with io.Reader contract - Add integration test for context cancellation during uploads The streaming approach keeps memory usage constant regardless of file size, achieving the ticket's goal of <500MB memory for 5GB uploads.
Addressed 4 findings from deep-review: - Add WithUploadHTTPClient to StartIngest tests for explicit upload client config - Update CLAUDE.md to document WithUploadHTTPClient and streaming architecture - Include HTTP status code in write error messages for better debugging - Increase copyChunkSize from 32KB to 256KB to reduce syscall overhead Ticket: PPSC-181
Silently ignore ENOTTY, EINVAL, and ENOTSUP errors from os.Stdout.Sync() which occur when stdout is a pipe, socket, or /dev/stdout that doesn't support fsync. The output is still delivered correctly.
Test Coverage Reporttotal: (statements) 80.1% Coverage by function |
There was a problem hiding this comment.
Pull request overview
Implements streaming multipart uploads in the API client to avoid buffering large artifacts in memory, adds upload-specific HTTP client behavior for non-idempotent streaming requests, and suppresses known benign stdout sync warnings.
Changes:
- Stream
StartIngestmultipart uploads viaio.Pipewith context-aware copying for cancellation responsiveness. - Add
DisableRetrysupport to the internal HTTP client and configure a dedicated no-retry upload client. - Suppress stdout
Sync()warnings for specific “not supported” errno cases and update documentation/tests accordingly.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/api/client.go | Switch upload to streaming multipart via io.Pipe; add copyWithContext; add upload-specific HTTP client option/config. |
| internal/api/client_test.go | Add tests for upload streaming error handling/cancellation and copyWithContext; update client construction to include upload client option. |
| internal/httpclient/client.go | Add DisableRetry flag and ensure retry logic can be fully disabled. |
| internal/httpclient/client_test.go | Add coverage to verify retry disabling behavior. |
| internal/output/output.go | Ignore specific stdout sync errno failures to suppress spurious warnings. |
| CLAUDE.md | Update package documentation to reflect streaming uploads and upload client behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
internal/output/output.go
Outdated
| // Silently ignore "sync not supported" errors - these occur when stdout | ||
| // is a pipe, socket, or /dev/stdout which don't support fsync. | ||
| // The output is still delivered correctly. | ||
| if !errors.Is(err, syscall.ENOTTY) && // "inappropriate ioctl for device" | ||
| !errors.Is(err, syscall.EINVAL) && // "invalid argument" | ||
| !errors.Is(err, syscall.ENOTSUP) { // "operation not supported" |
There was a problem hiding this comment.
The use of syscall.ENOTTY/EINVAL/ENOTSUP here will not compile on Windows (these errno constants are not defined in the windows syscall package). Since this repo builds releases for windows, please gate the errno checks behind OS-specific build tags (e.g., a helper implemented in *_unix.go vs *_windows.go) or otherwise avoid referencing unix-only constants from cross-platform code.
internal/api/client_test.go
Outdated
| } | ||
| // Return some data before failing | ||
| toRead := min(len(p), f.failAfter-f.bytesRead) | ||
| for i := range toRead { |
There was a problem hiding this comment.
This loop won’t compile: range cannot iterate over an int (toRead). Use an index-based loop (e.g., for i := 0; i < toRead; i++) or copy from a prefilled buffer.
| for i := range toRead { | |
| for i := 0; i < toRead; i++ { |
| httpClient := httpclient.NewClient(httpclient.Config{Timeout: 5 * time.Second}) | ||
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 1*time.Minute, WithHTTPClient(httpClient)) | ||
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 1*time.Minute, | ||
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) | ||
| if err != nil { |
There was a problem hiding this comment.
These tests pass the general-purpose HTTP client as the upload client. That client has retries enabled by default, which is incompatible with the streaming request body used by StartIngest (retries can’t safely resend a non-rewindable body and may cause flaky behavior on transient errors). Consider constructing a dedicated upload client for tests with DisableRetry: true (and the desired timeout) and pass that via WithUploadHTTPClient.
| httpClient := httpclient.NewClient(httpclient.Config{Timeout: 5 * time.Second}) | ||
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 1*time.Minute, WithHTTPClient(httpClient)) | ||
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 1*time.Minute, | ||
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) | ||
| if err != nil { |
There was a problem hiding this comment.
Same concern as above: reusing a retry-enabled httpclient.Client for uploads can trigger retries with a non-rewindable streaming body. Prefer a dedicated upload client in tests with DisableRetry: true (and an appropriate timeout) when passing WithUploadHTTPClient.
|
|
||
| httpClient := httpclient.NewClient(httpclient.Config{Timeout: 5 * time.Second}) | ||
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 50*time.Millisecond, WithHTTPClient(httpClient)) | ||
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 50*time.Millisecond, | ||
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) | ||
| if err != nil { |
There was a problem hiding this comment.
Same concern as above: WithUploadHTTPClient is being set to a client that retries by default. Since StartIngest streams via io.Pipe, retries can’t safely resend the body. Use an upload client configured with DisableRetry: true for these tests.
internal/api/client_test.go
Outdated
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 5*time.Second, | ||
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) |
There was a problem hiding this comment.
Same concern as above: the upload client passed here has retries enabled by default, which is incompatible with StartIngest’s streaming request body. Prefer a dedicated upload client with DisableRetry: true for tests.
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 5*time.Second, | |
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) | |
| uploadClient := httpclient.NewClient(httpclient.Config{Timeout: 2 * time.Second, DisableRetry: true}) | |
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 5*time.Second, | |
| WithHTTPClient(httpClient), WithUploadHTTPClient(uploadClient)) |
internal/api/client_test.go
Outdated
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 1*time.Minute, | ||
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) |
There was a problem hiding this comment.
Same concern as above: passing the general HTTP client as uploadHTTPClient enables retries, which is unsafe for io.Pipe streaming bodies. Create a separate upload client for tests with DisableRetry: true (plus the desired timeout) and pass that to WithUploadHTTPClient.
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 1*time.Minute, | |
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) | |
| uploadHTTPClient := httpclient.NewClient(httpclient.Config{Timeout: 5 * time.Second, DisableRetry: true}) | |
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 1*time.Minute, | |
| WithHTTPClient(httpClient), WithUploadHTTPClient(uploadHTTPClient)) |
internal/api/client_test.go
Outdated
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 5*time.Second, | ||
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) |
There was a problem hiding this comment.
Same concern as above: this sets uploadHTTPClient to a client that retries by default. Retries are not safe with StartIngest’s streaming body; use a dedicated upload client for tests with DisableRetry: true to match production behavior and avoid flakiness on transient errors.
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 5*time.Second, | |
| WithHTTPClient(httpClient), WithUploadHTTPClient(httpClient)) | |
| uploadClient := httpclient.NewClient(httpclient.Config{Timeout: 5 * time.Second, DisableRetry: true}) | |
| client, err := NewClient(server.URL, testutil.NewTestAuthProvider("token123"), false, 5*time.Second, | |
| WithHTTPClient(httpClient), WithUploadHTTPClient(uploadClient)) |
- Fix Windows compilation by extracting errno checks into platform-specific files (errno_unix.go, errno_windows.go) with build tags. ENOTSUP is not defined on Windows. - Change range-over-int syntax to traditional for loop for broader compatibility with older linters. - Use dedicated upload clients with DisableRetry: true in tests to match production behavior. Streaming bodies (io.Pipe) cannot be rewound for retries.
…ds [PPSC-181] When server returns early error (e.g., 401), it closes connection before reading full body. This causes io.ErrClosedPipe in writer goroutine. Previously, this pipe error surfaced instead of the clear HTTP error. Now check HTTP status before write errors, since server rejection is the root cause and pipe closure is just a symptom.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
internal/api/client.go
Outdated
| resp, err := c.uploadHTTPClient.Do(req) | ||
|
|
||
| // Wait for the writer goroutine to finish and collect any error | ||
| writeErr := <-errChan | ||
|
|
There was a problem hiding this comment.
In StartIngest(), the code waits on writeErr := <-errChan immediately after uploadHTTPClient.Do(req). This can deadlock if the server/transport returns a non-2xx response early (common with HTTP/2 or proxies) and the transport stops reading from pr while the writer goroutine is still blocked writing to the pipe, so it never reaches the deferred send on errChan. Consider ensuring the pipe is closed on all non-success paths (e.g., close the pipe reader with an error / defer-close the reader), and avoid blocking on errChan until after you’ve handled the HTTP response/status (or wait with a context-aware select).
| // Use io.Pipe to stream multipart data directly to the HTTP request body. | ||
| // This avoids buffering the entire file in memory, preventing OOM on large uploads. | ||
| pr, pw := io.Pipe() | ||
| writer := multipart.NewWriter(pw) | ||
| contentType := writer.FormDataContentType() // Capture before goroutine starts | ||
|
|
||
| // Channel to receive errors from the writer goroutine | ||
| errChan := make(chan error, 1) | ||
|
|
||
| // Goroutine writes multipart data to the pipe; HTTP request reads from it | ||
| go func() { | ||
| var writeErr error | ||
| defer func() { |
There was a problem hiding this comment.
The multipart writer goroutine is started before the request is created and before setAuthHeader succeeds. If request construction/auth fails, the goroutine may already be blocked in opts.Data.Read() and StartIngest then waits on <-errChan, delaying/possibly hanging the error return (since context cancellation won’t necessarily interrupt an arbitrary Reader). Consider only starting the writer goroutine after the request is successfully created/authenticated, or avoid blocking on errChan in these early-failure paths (close the pipe and return).
…ontext [PPSC-181] Address GitHub code scanning alert by: - Using `nw > nr` instead of `nr < nw` to match Go stdlib style - Adding clear comments distinguishing invalid writes (nw < 0 or nw > nr) from short writes (nw < nr) - Using distinct error types: errInvalidWrite vs io.ErrShortWrite The original logic was correct but the reversed comparison order confused static analysis tools. This change improves clarity without changing behavior.
Address two potential deadlock scenarios identified in PR review: 1. Defer goroutine start until after request/auth succeed, preventing hangs when early failures occur while goroutine is blocked reading 2. Always close pipe reader after Do(req) returns to unblock the writer goroutine if server returns early (non-2xx, HTTP/2, proxies) Co-authored-by: copilot-pull-request-reviewer
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
internal/api/client.go
Outdated
| resp, err := c.uploadHTTPClient.Do(req) | ||
|
|
||
| // Close the pipe reader to unblock the writer goroutine. | ||
| // This is critical: if the server returns early (non-2xx) or the transport | ||
| // stops reading (HTTP/2, proxies), the writer may be blocked on pw.Write(). | ||
| // CloseWithError ensures the writer sees an error instead of blocking forever. | ||
| _ = pr.CloseWithError(io.ErrUnexpectedEOF) | ||
|
|
||
| // Wait for the writer goroutine to finish and collect any error | ||
| writeErr := <-errChan |
There was a problem hiding this comment.
pr.CloseWithError(io.ErrUnexpectedEOF) is executed unconditionally right after uploadHTTPClient.Do(req) returns. With HTTP/2 (or any server that responds before fully reading the request body), Do can return while the request body is still being streamed; closing the pipe reader here can prematurely abort the upload and then the resulting io.ErrUnexpectedEOF is later ignored. Consider only closing the pipe reader to abort the writer in error/non-2xx paths, and in the 2xx path wait for the writer goroutine to finish (and then close pr normally) so a successful response can’t be returned while the upload stream was truncated.
internal/api/client.go
Outdated
| // errInvalidWrite indicates a Write returned an impossible byte count. | ||
| // This matches Go's internal io package error for invalid writes. | ||
| var errInvalidWrite = fmt.Errorf("invalid write result") |
There was a problem hiding this comment.
errInvalidWrite is a static sentinel error but is created with fmt.Errorf. Prefer errors.New for non-formatted constant errors to avoid formatting overhead and to better communicate intent (this also matches how the stdlib defines sentinel errors).
- Use errors.New for sentinel errInvalidWrite (style/perf) - Add zero-byte read protection to prevent infinite loop DoS - Fix premature pipe closure: only close pipe in error paths, let writer complete naturally on success (prevents HTTP/2 truncation)
Related Issue
Type of Change
Problem
Large repository/image uploads could cause out-of-memory (OOM) issues because the entire file was buffered in memory before being sent to the API. Additionally, users saw spurious warnings on macOS: "failed to flush stdout before exit: sync /dev/stdout: inappropriate ioctl for device".
Solution
Streaming multipart uploads: Replaced in-memory buffering with
io.Pipestreaming for uploads. The API client now streams data directly to the HTTP request body, preventing OOM on large files.Separate upload HTTP client: Added a dedicated upload client without retry logic (uploads are not idempotent) and without timeout (large uploads can take a long time).
Suppress stdout sync warnings: Silently ignore
ENOTTY,EINVAL, andENOTSUPerrors fromos.Stdout.Sync()which occur when stdout is a pipe or/dev/stdoutthat doesn't support fsync.Testing
Automated Tests
Manual Testing
make buildmake lint(0 issues)make test(1233 tests pass, 77.8% coverage)Checklist