Skip to content

Commit 2544570

Browse files
committed
Fix rapid cache cleanup bugs: always run both index and bucket passes, check Redis index during bucket scan, evict prefix from Redis on bulk deletes
1 parent 7d161c4 commit 2544570

3 files changed

Lines changed: 79 additions & 5 deletions

File tree

packages/orchestrator/cmd/clean-rapid-cache/main.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,21 @@ func clean(ctx context.Context, bucket string, prefix string, cutoff time.Time,
7171
_ = client.Close()
7272
}()
7373

74-
if deleted, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index); err != nil {
74+
deletedIndex, err := cleanFromIndex(ctx, client, bucket, cutoff, maxDeletions, dryRun, index)
75+
if err != nil {
7576
return err
76-
} else if deleted > 0 {
77-
log.Printf("summary dry_run=%t deleted=%d source=redis", dryRun, deleted)
77+
}
78+
79+
if deletedIndex > 0 {
80+
log.Printf("summary dry_run=%t deleted=%d source=redis", dryRun, deletedIndex)
81+
}
7882

83+
remaining := maxDeletions - deletedIndex
84+
if remaining <= 0 {
7985
return nil
8086
}
8187

82-
return cleanFromBucket(ctx, client, bucket, prefix, cutoff, maxDeletions, dryRun)
88+
return cleanFromBucket(ctx, client, bucket, prefix, cutoff, remaining, dryRun, index)
8389
}
8490

8591
func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) (int, error) {
@@ -117,7 +123,7 @@ func cleanFromIndex(ctx context.Context, client *gcs.Client, bucket string, cuto
117123
return deleted, nil
118124
}
119125

120-
func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool) error {
126+
func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, prefix string, cutoff time.Time, maxDeletions int, dryRun bool, index storage.RapidCacheIndex) error {
121127
var scanned, matched, deleted int
122128
objects := client.Bucket(bucket).Objects(ctx, &gcs.Query{Prefix: prefix})
123129
for {
@@ -133,6 +139,13 @@ func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, pre
133139
if !attrs.Updated.Before(cutoff) {
134140
continue
135141
}
142+
143+
if !dryRun {
144+
if recent, err := index.IsRecent(ctx, attrs.Name); err == nil && recent {
145+
continue
146+
}
147+
}
148+
136149
matched++
137150
if deleted >= maxDeletions {
138151
break
@@ -145,6 +158,7 @@ func cleanFromBucket(ctx context.Context, client *gcs.Client, bucket string, pre
145158
if err := client.Bucket(bucket).Object(attrs.Name).Delete(ctx); err != nil {
146159
return fmt.Errorf("delete cache object: %w", err)
147160
}
161+
_ = index.Evict(ctx, attrs.Name, attrs.Size)
148162
deleted++
149163
}
150164

packages/shared/pkg/storage/storage_cache_rapid.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func WrapInRapidBucketCache(_ context.Context, cache StorageProvider, inner Stor
2727
}
2828

2929
func (p *rapidCacheProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error {
30+
_ = p.index.EvictPrefix(ctx, prefix)
3031
_ = p.cache.DeleteObjectsWithPrefix(ctx, "rapid-cache/"+prefix)
3132

3233
return p.inner.DeleteObjectsWithPrefix(ctx, prefix)

packages/shared/pkg/storage/storage_cache_rapid_index.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type RapidCacheIndex interface {
1616
Admit(ctx context.Context, path string, size int64) error
1717
Evict(ctx context.Context, path string, size int64) error
1818
Candidates(ctx context.Context, before time.Time, limit int64) ([]string, error)
19+
IsRecent(ctx context.Context, path string) (bool, error)
20+
EvictPrefix(ctx context.Context, prefix string) error
1921
}
2022

2123
type noopRapidCacheIndex struct{}
@@ -28,6 +30,8 @@ func (noopRapidCacheIndex) Evict(context.Context, string, int64) error { return
2830
func (noopRapidCacheIndex) Candidates(context.Context, time.Time, int64) ([]string, error) {
2931
return nil, nil
3032
}
33+
func (noopRapidCacheIndex) IsRecent(context.Context, string) (bool, error) { return false, nil }
34+
func (noopRapidCacheIndex) EvictPrefix(context.Context, string) error { return nil }
3135

3236
type redisRapidCacheIndex struct {
3337
redis redis.UniversalClient
@@ -110,6 +114,61 @@ func (i *redisRapidCacheIndex) Candidates(ctx context.Context, before time.Time,
110114
}).Result()
111115
}
112116

117+
func (i *redisRapidCacheIndex) IsRecent(ctx context.Context, path string) (bool, error) {
118+
score := i.redis.ZScore(ctx, i.keys.chunks, path)
119+
if score.Err() == redis.Nil {
120+
return false, nil
121+
}
122+
if score.Err() != nil {
123+
return false, score.Err()
124+
}
125+
126+
return true, nil
127+
}
128+
129+
func (i *redisRapidCacheIndex) EvictPrefix(ctx context.Context, prefix string) error {
130+
script := redis.NewScript(`
131+
local cursor = "0"
132+
local prefix = ARGV[1]
133+
local chunks_key = KEYS[1]
134+
local build_bytes_key = KEYS[2]
135+
local build_chunks_key = KEYS[3]
136+
137+
repeat
138+
local scan_result = redis.call('ZSCAN', chunks_key, cursor, 'MATCH', prefix .. '*', 'COUNT', 100)
139+
cursor = scan_result[1]
140+
local members = scan_result[2]
141+
142+
for i = 1, #members, 2 do
143+
local path = members[i]
144+
redis.call('ZREM', chunks_key, path)
145+
146+
local path_without_prefix = string.gsub(path, '^rapid%-cache/', '')
147+
local build_id = string.match(path_without_prefix, '^([^/]+)')
148+
if build_id then
149+
local obj_result = redis.call('HGET', build_bytes_key, build_id)
150+
if obj_result then
151+
redis.call('HINCRBY', build_chunks_key, build_id, -1)
152+
local chunks_left = redis.call('HGET', build_chunks_key, build_id)
153+
if tonumber(chunks_left) <= 0 then
154+
redis.call('HDEL', build_bytes_key, build_id)
155+
redis.call('HDEL', build_chunks_key, build_id)
156+
end
157+
end
158+
end
159+
end
160+
until cursor == "0"
161+
162+
return 0
163+
`)
164+
165+
return script.Run(ctx, i.redis, []string{
166+
i.keys.chunks,
167+
i.keys.buildBytes,
168+
i.keys.buildChunks,
169+
}, "rapid-cache/"+prefix).Err()
170+
}
171+
113172
func rapidCacheBuildID(path string) string {
114173
path = strings.TrimPrefix(path, rapidCachePrefix)
115174
buildID, _ := SplitPath(path)

0 commit comments

Comments
 (0)