Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions store/cafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (c *CAFS) PutWithResult(ctx context.Context, r io.Reader) (*PutResult, erro
return nil, fmt.Errorf("writing content: %w", err)
}

// Track metadata for new blob (best effort, don't fail the operation)
if c.metaDB != nil {
entry := &metadb.BlobEntry{
Hash: hash.String(),
Expand All @@ -133,7 +132,9 @@ func (c *CAFS) PutWithResult(ctx context.Context, r io.Reader) (*PutResult, erro
LastAccess: time.Now(),
RefCount: 0,
}
_ = c.metaDB.PutBlob(ctx, entry)
if err := c.metaDB.PutBlob(ctx, entry); err != nil {
return nil, fmt.Errorf("tracking blob metadata: %w", err)
}
} else if c.metadata != nil {
_ = c.metadata.Create(ctx, hash, size)
}
Expand Down Expand Up @@ -335,7 +336,9 @@ func (c *CAFS) PutFramed(ctx context.Context, header *backend.BlobHeader, body i
LastAccess: time.Now(),
RefCount: 0,
}
_ = c.metaDB.PutBlob(ctx, entry)
if err := c.metaDB.PutBlob(ctx, entry); err != nil {
return contentcache.Hash{}, fmt.Errorf("tracking blob metadata: %w", err)
}
}

return hash, nil
Expand Down
17 changes: 10 additions & 7 deletions store/gc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ func DefaultConfig() Config {

// Result contains the results of a GC run.
type Result struct {
StartedAt time.Time `json:"started_at"`
Duration time.Duration `json:"duration"`
OrphanBlobsDeleted int `json:"orphan_blobs_deleted"`
ExpiredMetaDeleted int `json:"expired_meta_deleted"`
LRUBlobsEvicted int `json:"lru_blobs_evicted"`
BytesReclaimed int64 `json:"bytes_reclaimed"`
Errors []string `json:"errors,omitempty"`
StartedAt time.Time `json:"started_at"`
Duration time.Duration `json:"duration"`
UnreferencedBlobsDeleted int `json:"unreferenced_blobs_deleted"`
OrphanBlobsDeleted int `json:"orphan_blobs_deleted"`
ExpiredMetaDeleted int `json:"expired_meta_deleted"`
LRUBlobsEvicted int `json:"lru_blobs_evicted"`
BytesReclaimed int64 `json:"bytes_reclaimed"`
Errors []string `json:"errors,omitempty"`
}

// Manager manages garbage collection for the content cache.
Expand Down Expand Up @@ -202,6 +203,7 @@ func (m *Manager) runGC(ctx context.Context) *Result {

m.logger.Info("gc run completed",
"duration", result.Duration,
"unreferenced_blobs_deleted", result.UnreferencedBlobsDeleted,
"orphan_blobs_deleted", result.OrphanBlobsDeleted,
"expired_meta_deleted", result.ExpiredMetaDeleted,
"lru_blobs_evicted", result.LRUBlobsEvicted,
Expand All @@ -219,6 +221,7 @@ func (m *Manager) recordMetrics(ctx context.Context, result *Result) {

m.metrics.runsTotal.Add(ctx, 1)
m.metrics.runDuration.Record(ctx, result.Duration.Seconds())
m.metrics.unreferencedBlobsDeleted.Add(ctx, int64(result.UnreferencedBlobsDeleted))
m.metrics.orphanBlobsDeleted.Add(ctx, int64(result.OrphanBlobsDeleted))
m.metrics.expiredMetaDeleted.Add(ctx, int64(result.ExpiredMetaDeleted))
m.metrics.lruBlobsEvicted.Add(ctx, int64(result.LRUBlobsEvicted))
Expand Down
32 changes: 27 additions & 5 deletions store/gc/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestManager_RunNow(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, result)
assert.Equal(t, 1, result.ExpiredMetaDeleted, "should delete 1 expired metadata")
assert.Equal(t, 1, result.OrphanBlobsDeleted, "should delete 1 orphan blob")
assert.Equal(t, 1, result.UnreferencedBlobsDeleted, "should delete 1 unreferenced blob")
assert.Greater(t, result.Duration, time.Duration(0))
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestManager_PhaseDeleteUnreferenced(t *testing.T) {
result, err := mgr.RunNow(ctx)

require.NoError(t, err)
assert.Equal(t, 2, result.OrphanBlobsDeleted)
assert.Equal(t, 2, result.UnreferencedBlobsDeleted)
assert.Equal(t, int64(300), result.BytesReclaimed)

_, err = db.GetBlob(ctx, "unreferenced1")
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestManager_PhaseLRUEviction(t *testing.T) {
result, err := mgr.RunNow(ctx)

require.NoError(t, err)
assert.GreaterOrEqual(t, result.LRUBlobsEvicted+result.OrphanBlobsDeleted, 3, "should delete at least 3 blobs to get under 200 bytes")
assert.GreaterOrEqual(t, result.LRUBlobsEvicted+result.UnreferencedBlobsDeleted, 3, "should delete at least 3 blobs to get under 200 bytes")

total, err = db.TotalBlobSize(ctx)
require.NoError(t, err)
Expand All @@ -250,6 +250,7 @@ func TestManager_Status(t *testing.T) {
require.NotNil(t, status)
assert.Equal(t, result.StartedAt, status.StartedAt)
assert.Equal(t, result.Duration, status.Duration)
assert.Equal(t, result.UnreferencedBlobsDeleted, status.UnreferencedBlobsDeleted)
assert.Equal(t, result.OrphanBlobsDeleted, status.OrphanBlobsDeleted)
assert.Equal(t, result.ExpiredMetaDeleted, status.ExpiredMetaDeleted)
assert.Equal(t, result.LRUBlobsEvicted, status.LRUBlobsEvicted)
Expand All @@ -261,8 +262,8 @@ func TestManager_PhaseDeleteOrphans(t *testing.T) {
db := newTestDB(t)
fs := newTestBackend(t)

hash1 := "orphanblob123"
hash2 := "trackedblob456"
hash1 := "aa00112233445566778899aabbccddeeff"
hash2 := "bb00112233445566778899aabbccddeeff"
key1 := blobKey(hash1)
key2 := blobKey(hash2)
require.NoError(t, fs.Write(ctx, key1, strings.NewReader("orphan data")))
Expand Down Expand Up @@ -330,3 +331,24 @@ func TestManager_DoubleStart(t *testing.T) {
err := mgr.Stop(stopCtx)
require.NoError(t, err)
}

func TestExtractHashFromKey(t *testing.T) {
tests := []struct {
name string
key string
want string
}{
{"valid key", "blobs/ab/ab0123456789abcdef0123456789abcdef", "ab0123456789abcdef0123456789abcdef"},
{"empty string", "", ""},
{"too short", "blobs/ab/abc123", ""},
{"non-hex chars", "blobs/ab/tempfile.tmp.000000000000000000000000000000000", ""},
{"temp file", "blobs/.tmp-upload-123", ""},
{"directory-like", "blobs/ab/", ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := extractHashFromKey(tt.key)
assert.Equal(t, tt.want, got)
})
}
}
49 changes: 30 additions & 19 deletions store/gc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (

// Metrics holds GC-related OpenTelemetry metric instruments.
type Metrics struct {
runsTotal metric.Int64Counter
runDuration metric.Float64Histogram
orphanBlobsDeleted metric.Int64Counter
expiredMetaDeleted metric.Int64Counter
lruBlobsEvicted metric.Int64Counter
bytesReclaimed metric.Int64Counter
errorsTotal metric.Int64Counter
lastRunTimestamp metric.Float64Gauge
lastRunSuccess metric.Float64Gauge
runsTotal metric.Int64Counter
runDuration metric.Float64Histogram
unreferencedBlobsDeleted metric.Int64Counter
orphanBlobsDeleted metric.Int64Counter
expiredMetaDeleted metric.Int64Counter
lruBlobsEvicted metric.Int64Counter
bytesReclaimed metric.Int64Counter
errorsTotal metric.Int64Counter
lastRunTimestamp metric.Float64Gauge
lastRunSuccess metric.Float64Gauge
}

// NewMetrics creates a new Metrics instance with the given meter.
Expand All @@ -38,9 +39,18 @@ func NewMetrics(meter metric.Meter) (*Metrics, error) {
return nil, err
}

unreferencedBlobsDeleted, err := meter.Int64Counter(
"content_cache_gc_unreferenced_blobs_deleted_total",
metric.WithDescription("Total number of unreferenced blobs deleted (RefCount==0)"),
metric.WithUnit("{blob}"),
)
if err != nil {
return nil, err
}

orphanBlobsDeleted, err := meter.Int64Counter(
"content_cache_gc_orphan_blobs_deleted_total",
metric.WithDescription("Total number of orphan blobs deleted"),
metric.WithDescription("Total number of orphan blobs deleted (on disk but missing from DB)"),
metric.WithUnit("{blob}"),
)
if err != nil {
Expand Down Expand Up @@ -102,14 +112,15 @@ func NewMetrics(meter metric.Meter) (*Metrics, error) {
}

return &Metrics{
runsTotal: runsTotal,
runDuration: runDuration,
orphanBlobsDeleted: orphanBlobsDeleted,
expiredMetaDeleted: expiredMetaDeleted,
lruBlobsEvicted: lruBlobsEvicted,
bytesReclaimed: bytesReclaimed,
errorsTotal: errorsTotal,
lastRunTimestamp: lastRunTimestamp,
lastRunSuccess: lastRunSuccess,
runsTotal: runsTotal,
runDuration: runDuration,
unreferencedBlobsDeleted: unreferencedBlobsDeleted,
orphanBlobsDeleted: orphanBlobsDeleted,
expiredMetaDeleted: expiredMetaDeleted,
lruBlobsEvicted: lruBlobsEvicted,
bytesReclaimed: bytesReclaimed,
errorsTotal: errorsTotal,
lastRunTimestamp: lastRunTimestamp,
lastRunSuccess: lastRunSuccess,
}, nil
}
26 changes: 23 additions & 3 deletions store/gc/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package gc

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/wolfeidau/content-cache/backend"
"github.com/wolfeidau/content-cache/store/metadb"
)

Expand Down Expand Up @@ -73,7 +75,7 @@ func (m *Manager) phaseDeleteUnreferenced(ctx context.Context, result *Result) {
continue
}

result.OrphanBlobsDeleted++
result.UnreferencedBlobsDeleted++
result.BytesReclaimed += bytesReclaimed

m.logger.Debug("deleted unreferenced blob", "hash", hash, "size", bytesReclaimed)
Expand Down Expand Up @@ -230,7 +232,7 @@ func (m *Manager) deleteBlob(ctx context.Context, hash string) (int64, error) {
}

key := blobKey(hash)
if err := m.backend.Delete(ctx, key); err != nil {
if err := m.backend.Delete(ctx, key); err != nil && !errors.Is(err, backend.ErrNotFound) {
return 0, fmt.Errorf("delete from backend: %w", err)
}

Expand All @@ -255,5 +257,23 @@ func extractHashFromKey(key string) string {
if len(parts) == 0 {
return ""
}
return parts[len(parts)-1]
hash := parts[len(parts)-1]
if !isValidHexHash(hash) {
return ""
}
return hash
}

const minHashLength = 32

func isValidHexHash(s string) bool {
if len(s) < minHashLength {
return false
}
for _, c := range s {
if (c < '0' || c > '9') && (c < 'a' || c > 'f') && (c < 'A' || c > 'F') {
return false
}
}
return true
}