Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions packages/api/internal/sandbox/storage/redis/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (

"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/api/internal/sandbox"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
)

const cleanerInterval = time.Minute

// TODO: Remove once fully migrated to Redis
//
// Cleaner prunes stale entries from the two Redis sandbox indexes
// (`globalExpirationSet` and `globalTeamsSet`).
// Cleaner:
// - prunes stale entries from the two Redis sandbox indexes (`globalExpirationSet` and `globalTeamsSet`).
// - removes expired sandboxes
//
// Multi-pod safety: every operation the Cleaner triggers (ZREM/SREM of
// possibly-absent members) is idempotent. Concurrent Cleaners across pods
Expand Down Expand Up @@ -53,22 +55,20 @@ func (c *Cleaner) Start(ctx context.Context) {

// RunOnce performs one cleanup pass. Each sub-step is independent; a failure
// in one is logged but does not abort the other.
//
// Per-cycle work is bounded:
// - ExpiredItems caps internally at expiredItemsBatchSize (256) members.
// - TeamsWithSandboxCount is one ZRANGE + one pipelined SCARD batch.
func (c *Cleaner) RunOnce(ctx context.Context) error {
var errs []error

// 1. globalExpirationSet: ExpiredItems internally ZREMs members whose
// sandbox JSON is gone (items.go:131-135). Discard the returned
// sandbox list — actually evicting still-running sandboxes is the
// evictor's job, which in memory mode reads the memory backend.
if _, err := c.storage.ExpiredItems(ctx); err != nil {
// 1. globalExpirationSet: ExpiredItems internally ZREMs members whose sandbox JSON is gone.
// 2. evictExpired removes sandboxes whose EndTime is older than StaleCutoff;
// recently expired ones are left to the evictor to avoid racing it.
expired, err := c.storage.ExpiredItems(ctx)
if err != nil {
errs = append(errs, fmt.Errorf("expiration index sweep: %w", err))
} else {
c.evictExpired(ctx, expired)
Comment thread
jakubno marked this conversation as resolved.
}

// 2. globalTeamsSet: TeamsWithSandboxCount internally ZREMs teams whose
// 3. globalTeamsSet: TeamsWithSandboxCount internally ZREMs teams whose
// per-team SCARD is 0 AND whose score is older than StaleCutoff
// (operations.go:268-288). Discard the returned counts.
if _, err := c.storage.TeamsWithSandboxCount(ctx); err != nil {
Expand All @@ -77,3 +77,25 @@ func (c *Cleaner) RunOnce(ctx context.Context) error {

return errors.Join(errs...)
}

func (c *Cleaner) evictExpired(ctx context.Context, expired []sandbox.Sandbox) {
if len(expired) == 0 {
return
}

logger.L().Info(ctx, "Cleaner found expired sandboxes", zap.Int("count", len(expired)))

for _, sbx := range expired {
if time.Since(sbx.EndTime) < sandbox.StaleCutoff {
continue
}

if rmErr := c.storage.Remove(context.WithoutCancel(ctx), sbx.TeamID, sbx.SandboxID); rmErr != nil {
Comment thread
jakubno marked this conversation as resolved.
logger.L().Error(ctx, "Cleaner failed to remove stale expired sandbox",
zap.Error(rmErr),
logger.WithSandboxID(sbx.SandboxID),
logger.WithTeamID(sbx.TeamID.String()),
)
}
}
}
52 changes: 52 additions & 0 deletions packages/api/internal/sandbox/storage/redis/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,58 @@ func TestCleaner_PreservesFutureScoredExpirationEntry(t *testing.T) {
require.NoError(t, err, "future-scored entry must not be pruned")
}

// TestCleaner_EvictsStaleExpiredSandbox covers the new evictExpired path:
// a sandbox whose EndTime is older than StaleCutoff must be Remove()'d by
// the cleaner so its JSON key, per-team index entry, and globalExpirationSet
// member all disappear.
func TestCleaner_EvictsStaleExpiredSandbox(t *testing.T) {
t.Parallel()

storage, client := setupTestStorage(t)
ctx := t.Context()

sbx := createTestSandbox("stale-expired-" + uuid.NewString())
sbx.EndTime = time.Now().Add(-sandbox.StaleCutoff - time.Minute)
require.NoError(t, storage.Add(ctx, sbx))

cleaner := NewCleaner(storage)
require.NoError(t, cleaner.RunOnce(ctx))

_, err := storage.Get(ctx, sbx.TeamID, sbx.SandboxID)
require.ErrorIs(t, err, sandbox.ErrNotFound, "stale expired sandbox JSON should be removed")

_, err = client.ZScore(ctx, globalExpirationSet,
expirationMember(sbx.TeamID.String(), sbx.SandboxID)).Result()
require.ErrorIs(t, err, redis.Nil, "stale expired sandbox should be removed from globalExpirationSet")

isMember, err := client.SIsMember(ctx,
GetSandboxStorageTeamIndexKey(sbx.TeamID.String()), sbx.SandboxID).Result()
require.NoError(t, err)
require.False(t, isMember, "stale expired sandbox should be removed from per-team index")
}

// TestCleaner_PreservesRecentlyExpiredSandbox guards the StaleCutoff window
// inside evictExpired: a sandbox that has just expired (EndTime in the past
// but newer than StaleCutoff) is still the evictor's responsibility — the
// cleaner must leave it alone so we don't race the evictor.
func TestCleaner_PreservesRecentlyExpiredSandbox(t *testing.T) {
t.Parallel()

storage, _ := setupTestStorage(t)
ctx := t.Context()

sbx := createTestSandbox("fresh-expired-" + uuid.NewString())
sbx.EndTime = time.Now().Add(-time.Second)
require.NoError(t, storage.Add(ctx, sbx))

cleaner := NewCleaner(storage)
require.NoError(t, cleaner.RunOnce(ctx))

got, err := storage.Get(ctx, sbx.TeamID, sbx.SandboxID)
require.NoError(t, err, "recently expired sandbox must survive — eviction is the evictor's job")
require.Equal(t, sbx.SandboxID, got.SandboxID)
}

// Compile-time guard so future refactors of sandbox.StaleCutoff get noticed
// here: the cleaner's correctness depends on it being > 0.
var _ = func() bool {
Expand Down
Loading