Skip to content

Make OCI image layer cache safe for concurrent use#3150

Open
robnester-rh wants to merge 2 commits intoconforma:mainfrom
robnester-rh:EC-1669
Open

Make OCI image layer cache safe for concurrent use#3150
robnester-rh wants to merge 2 commits intoconforma:mainfrom
robnester-rh:EC-1669

Conversation

@robnester-rh
Copy link
Contributor

Wrap go-containerregistry's FilesystemCache in a thread-safe wrapper that uses singleflight to serialize Put and layer stream reads. Prevents races when multiple goroutines validate the same image (e.g. parallel components sharing layers).

Add unit tests for the safe cache and an acceptance scenario that runs validation with EC_CACHE=true.

Ref: #1109
Ref: EC-1669

@qodo-code-review
Copy link
Contributor

Review Summary by Qodo

Make OCI image layer cache safe for concurrent use

✨ Enhancement 🧪 Tests

Grey Divider

Walkthroughs

Description
• Wrap OCI image layer cache with thread-safe singleflight serialization
• Prevents race conditions when multiple goroutines validate shared layers
• Add comprehensive unit tests for concurrent cache access patterns
• Add acceptance test validating parallel image validation with cache
Diagram
flowchart LR
  A["FilesystemCache"] -->|wrapped by| B["SafeCache"]
  B -->|serializes Put| C["singleflight.Group"]
  B -->|wraps Layer| D["SafeLayer"]
  D -->|serializes Compressed/Uncompressed| C
  E["Multiple Goroutines"] -->|concurrent access| B
  B -->|prevents races| F["Shared Layer Cache"]
Loading

Grey Divider

File Changes

1. internal/utils/oci/client.go ✨ Enhancement +3/-1

Integrate SafeCache wrapper into OCI client initialization

• Modified initCache() to wrap FilesystemCache with NewSafeCache()
• Added comment referencing issue #1109 for concurrent cache safety
• Passes cache directory path to enable safe layer file access

internal/utils/oci/client.go


2. internal/utils/oci/safe_cache.go ✨ Enhancement +173/-0

Implement thread-safe cache wrapper with singleflight

• Implement safeCache struct wrapping cache.Cache with singleflight serialization
• Implement safeLayer struct wrapping v1.Layer with serialized Compressed() and
 Uncompressed() methods
• Use singleflight.Group to serialize Put operations by digest and layer stream reads
• Add cachePath() helper function for cross-platform cache file path resolution

internal/utils/oci/safe_cache.go


3. internal/utils/oci/safe_cache_test.go 🧪 Tests +230/-0

Add comprehensive unit tests for safe cache

• Test nil inner cache returns nil
• Test concurrent Put of same layer serializes correctly without races
• Test concurrent Put of different layers executes in parallel
• Test Put then Get returns same digest
• Test concurrent Compressed() and Uncompressed() reads on same layer
• Test Delete removes layer from cache

internal/utils/oci/safe_cache_test.go


View more (1)
4. features/validate_image.feature 🧪 Tests +26/-0

Add acceptance test for concurrent cache validation

• Add new acceptance scenario "parallel validation with cache and shared image layers"
• Test validates two components sharing same image with EC_CACHE=true environment variable
• Scenario uses --json-input to validate multiple components in parallel
• Verifies exit status 0 and output matches snapshot

features/validate_image.feature


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Contributor

qodo-code-review bot commented Mar 2, 2026

Code Review by Qodo

🐞 Bugs (4) 📘 Rule violations (0) 📎 Requirement gaps (0)

Grey Divider


Action required

1. require in goroutines 🐞 Bug ⛯ Reliability
Description
New unit tests call require.NoError from spawned goroutines. If any assertion fails, require
uses FailNow semantics which are not safe from non-test goroutines and can panic or yield unreliable
failures.
Code

internal/utils/oci/safe_cache_test.go[R103-111]

+		go func(size int) {
+			defer wg.Done()
+			layer, err := random.Layer(int64(size+1)*128, "application/vnd.oci.image.layer.v1.tar+gzip")
+			require.NoError(t, err)
+			cached, err := wrapped.Put(layer)
+			require.NoError(t, err)
+			rc, err := cached.Compressed()
+			require.NoError(t, err)
+			_, _ = io.Copy(io.Discard, rc)
Evidence
These tests spawn goroutines and then call require.NoError(t, err) inside them; on failure this
triggers FailNow from a non-test goroutine. The pattern appears in multiple tests.

internal/utils/oci/safe_cache_test.go[103-113]
internal/utils/oci/safe_cache_test.go[163-170]
internal/utils/oci/safe_cache_test.go[193-200]
Best Practice: Go testing semantics

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
Several unit tests call `require.NoError` from inside spawned goroutines. If an assertion fails, `require` calls `t.FailNow()` which is not safe to call from goroutines other than the one running the test function.

### Issue Context
These are concurrency stress tests; failures should be reported deterministically and safely without panics.

### Fix Focus Areas
- internal/utils/oci/safe_cache_test.go[103-203]

### Suggested approach
- Replace `require.NoError` calls inside goroutines with either:
 - `assert.NoError` plus sending an error to a channel for main goroutine aggregation, or
 - manual `if err != nil { errCh <- err; return }` checks.
- After `wg.Wait()`, in the main goroutine, `require.Empty(t, errs)` (or iterate and `require.NoError`).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

2. Layer reads fully buffered 🐞 Bug ➹ Performance
Description
safeLayer.Compressed()/Uncompressed() drain the entire underlying stream into io.Discard and
only then return a reader by reopening the cached file. This breaks streaming semantics and adds
extra I/O/latency per call (download/write then re-read).
Code

internal/utils/oci/safe_cache.go[R115-142]

+func (l *safeLayer) Compressed() (io.ReadCloser, error) {
+	digest, err := l.inner.Digest()
+	if err != nil {
+		return nil, err
+	}
+	key := "compressed:" + digest.String()
+	v, err, _ := l.flight.Do(key, func() (any, error) {
+		rc, err := l.inner.Compressed()
+		if err != nil {
+			return nil, err
+		}
+		if _, err := io.Copy(io.Discard, rc); err != nil {
+			_ = rc.Close()
+			return nil, err
+		}
+		if err := rc.Close(); err != nil {
+			return nil, err
+		}
+		return cachePath(l.path, digest), nil
+	})
+	if err != nil {
+		return nil, err
+	}
+	f, err := os.Open(v.(string))
+	if err != nil {
+		return nil, err
+	}
+	return f, nil
Evidence
The implementation explicitly reads the whole layer stream to completion (`io.Copy(io.Discard,
rc)`), closes it, then opens a file and returns the file reader. Since the image cache is applied to
all fetched images, this affects normal validation/fetch flows.

internal/utils/oci/safe_cache.go[115-142]
internal/utils/oci/safe_cache.go[145-172]
internal/utils/oci/client.go[222-231]
cmd/validate/image.go[342-408]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`safeLayer.Compressed()` and `safeLayer.Uncompressed()` currently force full materialization of the stream before returning a reader by draining into `io.Discard` and reopening the cached file. This breaks streaming semantics and adds extra I/O/latency.

### Issue Context
The cache is applied globally to `client.Image()` via `cache.Image(img, c)`, so these methods can be hit in normal validation flows.

### Fix Focus Areas
- internal/utils/oci/safe_cache.go[115-173]
- internal/utils/oci/client.go[222-231]

### Suggested approach
- Rework `safeLayer.Compressed/Uncompressed` so the first caller can stream while also populating the cache (e.g., `io.TeeReader` to a temp file + atomic rename on close).
- For concurrent callers, block until the cache file is finalized, then `os.Open` it.
- If a full redesign is too large, at least add a per-layer `sync.Once`/memoized path so subsequent sequential calls do not re-drain the stream.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. Put ignores Get errors 🐞 Bug ⛯ Reliability
Description
safeCache.Put treats any inner.Get(digest) error as a cache miss and proceeds to inner.Put.
This can mask underlying I/O/corruption errors and may overwrite or compound failures instead of
returning the original error.
Code

internal/utils/oci/safe_cache.go[R79-87]

+	v, err, _ := s.putFlight.Do(digest.String(), func() (any, error) {
+		if layer, err := s.inner.Get(digest); err == nil {
+			return layer, nil
+		}
+		layer, err := s.inner.Put(l)
+		if err != nil {
+			return nil, err
+		}
+		return &safeLayer{inner: layer, path: s.path, flight: &s.putFlight}, nil
Evidence
On cache miss, FilesystemCache.Get returns cache.ErrNotFound (as asserted by tests).
safeCache.Put currently doesn’t distinguish ErrNotFound from other errors, treating all errors
identically.

internal/utils/oci/safe_cache.go[79-87]
internal/utils/oci/safe_cache_test.go[47-50]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`safeCache.Put` currently treats any error from `inner.Get(digest)` as a cache miss. This can hide real errors (e.g., permission, corruption) that should fail fast.

### Issue Context
Your unit tests already assert a normal miss is `cache.ErrNotFound`.

### Fix Focus Areas
- internal/utils/oci/safe_cache.go[79-87]
- internal/utils/oci/safe_cache_test.go[36-53]

### Suggested change
- Import `errors`.
- Change the `inner.Get` branch to:
 - `layer, err := s.inner.Get(digest)`
 - `if err == nil { return wrap(layer), nil }`
 - `if !errors.Is(err, cache.ErrNotFound) { return nil, err }`
 - otherwise proceed to `inner.Put`.
- Consider wrapping the returned `layer` on cache-hit too if you want read singleflight consistently.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


4. Cache hits bypass safeLayer 🐞 Bug ⛯ Reliability
Description
On cache-hit, safeCache.Put returns the inner.Get layer directly, and safeCache.Get always
returns inner.Get directly. Those returned layers are not wrapped with safeLayer, so their
Compressed()/Uncompressed() calls are not singleflight-protected by this wrapper, potentially
undermining the stated goal of serializing layer stream reads under concurrency.
Code

internal/utils/oci/safe_cache.go[R65-83]

+// Get implements cache.Cache. No per-key serialization is needed for reads.
+func (s *safeCache) Get(h v1.Hash) (v1.Layer, error) {
+	return s.inner.Get(h)
+}
+
+// Put implements cache.Cache. Only one goroutine runs inner.Put for a given
+// digest; others wait and receive the same result. The returned layer is
+// wrapped so that Compressed() and Uncompressed() are also singleflighted,
+// ensuring only one writer fills each cache file.
+func (s *safeCache) Put(l v1.Layer) (v1.Layer, error) {
+	digest, err := l.Digest()
+	if err != nil {
+		return nil, err
+	}
+	v, err, _ := s.putFlight.Do(digest.String(), func() (any, error) {
+		if layer, err := s.inner.Get(digest); err == nil {
+			return layer, nil
+		}
+		layer, err := s.inner.Put(l)
Evidence
The wrapper’s own comments describe singleflighting layer stream reads via safeLayer, but Get
never returns a safeLayer, and Put returns an unwrapped layer on cache-hit. Under concurrent
validation workers, it’s plausible to have concurrent access to the same cached layer object/path;
returning unwrapped layers makes read-side behavior depend on underlying cache implementation
details rather than this safety wrapper.

internal/utils/oci/safe_cache.go[65-68]
internal/utils/oci/safe_cache.go[70-83]
internal/utils/oci/safe_cache.go[100-103]
cmd/validate/image.go[342-408]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`safeLayer` is the mechanism that singleflights `Compressed()`/`Uncompressed()` to avoid concurrent cache writes, but `safeCache.Get` never returns a `safeLayer`, and `safeCache.Put` returns unwrapped layers on cache hits.

### Issue Context
The validation command runs multiple workers concurrently, and the OCI image cache is global. To make concurrency-safety independent of timing windows (hit/miss), stream-read serialization should be applied uniformly.

### Fix Focus Areas
- internal/utils/oci/safe_cache.go[65-92]
- cmd/validate/image.go[342-408]

### Suggested approach
- Wrap successful `inner.Get` results with `safeLayer{flight: &s.putFlight}`.
- Similarly, in `Put`, when `inner.Get` succeeds, return a wrapped layer (or a wrapper that only singleflights when a cache artifact is missing).
- If you implement wrapping-on-hit, also implement an efficient fast-path/memoization in `safeLayer` to avoid the current full-drain behavior on every call.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

@robnester-rh robnester-rh force-pushed the EC-1669 branch 2 times, most recently from f1b01de to 5be317b Compare March 2, 2026 21:50
Wrap go-containerregistry's FilesystemCache in a thread-safe wrapper that
uses singleflight to serialize Put and layer stream reads. Prevents races
when multiple goroutines validate the same image (e.g. parallel components
sharing layers).

Add unit tests for the safe cache and an acceptance scenario that runs
validation with EC_CACHE=true.

Ref: conforma#1109
Ref: EC-1669
Co-authored-by: Claude Code <noreply@anthropic.com>
Signed-off-by: Rob Nester <rnester@redhat.com>
@robnester-rh
Copy link
Contributor Author

/review

@qodo-code-review
Copy link
Contributor

qodo-code-review bot commented Mar 3, 2026

PR Reviewer Guide 🔍

(Review updated until commit 37c6631)

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis 🔶

1109 - Partially compliant

Compliant requirements:

  • Address that the EC image layer cache is not thread safe under concurrent validation with shared layers
  • Prevent cache races that can lead to spurious redhat_manifests.redhat_manifests_missing violations
  • Make the cache safe for concurrent use (rather than disabling it)

Non-compliant requirements:

Requires further human verification:

  • Confirm the originally reported violation no longer reproduces in a real parallel validation workload (shared layers) with EC_CACHE=true
  • Confirm behavior on Windows (cache filename layout / pathing) in an environment matching user reports
⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Error handling

Compressed()/Uncompressed() wait for a background drain to finish and then os.Open() the expected cache file. If the underlying cache write fails (or writes to a different path than cachePath computes), waiters may block until drain completes and then still fail with a filesystem error that hides the real root cause. Consider propagating the stream/caching failure explicitly to waiters and/or validating file existence after drain with a clearer error.

func (l *safeLayer) Compressed() (io.ReadCloser, error) {
	digest, err := l.inner.Digest()
	if err != nil {
		return nil, err
	}
	path := cachePath(l.path, digest)
	if _, err := os.Stat(path); err == nil {
		return os.Open(path)
	}
	key := "compressed:" + digest.String()
	v, err, _ := l.flight.Do(key, func() (any, error) {
		rc, err := l.inner.Compressed()
		if err != nil {
			return nil, err
		}
		ready := make(chan struct{})
		go func() {
			_, _ = io.Copy(io.Discard, rc)
			_ = rc.Close()
			close(ready)
		}()
		return ready, nil
	})
	if err != nil {
		return nil, err
	}
	<-v.(chan struct{})
	return os.Open(path)
}

func (l *safeLayer) Uncompressed() (io.ReadCloser, error) {
	diffID, err := l.inner.DiffID()
	if err != nil {
		return nil, err
	}
	path := cachePath(l.path, diffID)
	if _, err := os.Stat(path); err == nil {
		return os.Open(path)
	}
	key := "uncompressed:" + diffID.String()
	v, err, _ := l.flight.Do(key, func() (any, error) {
		rc, err := l.inner.Uncompressed()
		if err != nil {
			return nil, err
		}
		ready := make(chan struct{})
		go func() {
			_, _ = io.Copy(io.Discard, rc)
			_ = rc.Close()
			close(ready)
		}()
		return ready, nil
	})
	if err != nil {
		return nil, err
	}
	<-v.(chan struct{})
	return os.Open(path)
}
Goroutine lifecycle

Each first-miss Compressed()/Uncompressed() spawns a goroutine to drain the returned reader. If callers frequently request many unique layers, this can create a burst of goroutines; also, if inner.Compressed() returns a reader that can block indefinitely, the goroutine can hang and all waiters block. Consider bounding concurrency (semaphore) or avoiding per-call goroutines by draining in the singleflight function synchronously and returning an error if draining fails.

	v, err, _ := l.flight.Do(key, func() (any, error) {
		rc, err := l.inner.Compressed()
		if err != nil {
			return nil, err
		}
		ready := make(chan struct{})
		go func() {
			_, _ = io.Copy(io.Discard, rc)
			_ = rc.Close()
			close(ready)
		}()
		return ready, nil
	})
	if err != nil {
		return nil, err
	}
	<-v.(chan struct{})
	return os.Open(path)
}

func (l *safeLayer) Uncompressed() (io.ReadCloser, error) {
	diffID, err := l.inner.DiffID()
	if err != nil {
		return nil, err
	}
	path := cachePath(l.path, diffID)
	if _, err := os.Stat(path); err == nil {
		return os.Open(path)
	}
	key := "uncompressed:" + diffID.String()
	v, err, _ := l.flight.Do(key, func() (any, error) {
		rc, err := l.inner.Uncompressed()
		if err != nil {
			return nil, err
		}
		ready := make(chan struct{})
		go func() {
			_, _ = io.Copy(io.Discard, rc)
			_ = rc.Close()
			close(ready)
		}()
		return ready, nil
	})
Command quoting

The --json-input {...} argument is embedded directly inside a quoted step string. It’s worth double-checking the step implementation/runner handles nested quotes/braces consistently across shells/OS (and doesn’t require escaping), otherwise this scenario can be flaky even if the cache fix is correct.

# Ensures the image layer cache is safe under concurrent use when validating multiple
# components that share the same image (same layers). See https://github.com/conforma/cli/issues/1109.
# Step uses same JSON-in-quotes pattern as "json-input single component" scenario (line ~649).
Scenario: parallel validation with cache and shared image layers
  Given a key pair named "known"
  Given an image named "acceptance/ec-happy-day"
  Given a valid image signature of "acceptance/ec-happy-day" image signed by the "known" key
  Given a valid attestation of "acceptance/ec-happy-day" signed by the "known" key
  Given a git repository named "happy-day-policy" with
    | main.rego | examples/happy_day.rego |
  Given policy configuration named "ec-policy" with specification
  """
  {
    "sources": [
      {
        "policy": [
          "git::https://${GITHOST}/git/happy-day-policy.git"
        ]
      }
    ]
  }
  """
  And the environment variable is set "EC_CACHE=true"
  When ec command is run with "validate image --json-input {"components":[{"name":"A","containerImage":"${REGISTRY}/acceptance/ec-happy-day"},{"name":"B","containerImage":"${REGISTRY}/acceptance/ec-happy-day"}]} --policy acceptance/ec-policy --rekor-url ${REKOR} --public-key ${known_PUBLIC_KEY} --show-successes --output json"
  Then the exit status should be 0
  Then the output should match the snapshot

@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 96.55172% with 3 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
internal/utils/oci/safe_cache.go 96.47% 3 Missing ⚠️
Flag Coverage Δ
acceptance 54.84% <55.17%> (+<0.01%) ⬆️
generative 18.02% <0.00%> (-0.14%) ⬇️
integration 26.81% <0.00%> (-0.20%) ⬇️
unit 68.85% <96.55%> (+0.20%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
internal/utils/oci/client.go 57.25% <100.00%> (+0.34%) ⬆️
internal/utils/oci/safe_cache.go 96.47% <96.47%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Use digest- and diffID-scoped singleflight inside Compressed() and
Uncompressed() so only one goroutine runs the inner stream per digest
across all safeLayer instances (e.g. concurrent Get(digest) callers).
Waiters no longer depend on the first caller closing a reader, avoiding
deadlock. Remove per-layer ready channels and streamingCloseReader.

Add unit tests for error paths, cache-hit paths, and concurrent Get
callers calling Compressed(); add a short comment in the acceptance
scenario for the parallel-validation-with-cache step.

Ref: conforma#1109
Ref: EC-1669
Co-authored-by: Claude Code <noreply@anthropic.com>
Signed-off-by: Rob Nester <rnester@redhat.com>
@robnester-rh
Copy link
Contributor Author

/review

@qodo-code-review
Copy link
Contributor

Persistent review updated to latest commit 37c6631

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant