Skip to content

Commit

Permalink
Merge pull request #1663 from chriskdon/fix-gc-next-interval-reset
Browse files Browse the repository at this point in the history
Fix garbage collector interval and backoff resets
  • Loading branch information
vroldanbet committed Nov 29, 2023
2 parents 6e1677c + 1dbc173 commit 59e2696
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 15 deletions.
4 changes: 4 additions & 0 deletions internal/datastore/common/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, gc GarbageColl
backoffInterval.InitialInterval = interval
backoffInterval.MaxInterval = max(MaxGCInterval, interval)
backoffInterval.MaxElapsedTime = maxElapsedTime
backoffInterval.Reset()

nextInterval := interval

Expand Down Expand Up @@ -141,7 +142,10 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, gc GarbageColl
Msg("error attempting to perform garbage collection")
continue
}

backoffInterval.Reset()
nextInterval = interval

log.Ctx(ctx).Debug().
Dur("next-run-in", interval).
Msg("datastore garbage collection scheduled for next run")
Expand Down
147 changes: 132 additions & 15 deletions internal/datastore/common/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,126 @@ package common
import (
"context"
"fmt"
"slices"
"sync"
"testing"
"time"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/revision"

"github.com/prometheus/client_golang/prometheus"
promclient "github.com/prometheus/client_model/go"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"
)

type testGC struct{}
// Fake garbage collector that returns a new incremented revision each time
// TxIDBefore is called.
type fakeGC struct {
lastRevision int64
deleter gcDeleter
metrics gcMetrics
lock sync.RWMutex
}

func (t testGC) ReadyState(_ context.Context) (datastore.ReadyState, error) {
return datastore.ReadyState{}, fmt.Errorf("hi")
type gcMetrics struct {
deleteBeforeTxCount int
markedCompleteCount int
resetGCCompletedCount int
}

func (t testGC) Now(_ context.Context) (time.Time, error) {
return time.Now(), fmt.Errorf("hi")
func newFakeGC(deleter gcDeleter) fakeGC {
return fakeGC{
lastRevision: 0,
deleter: deleter,
}
}

func (t testGC) TxIDBefore(_ context.Context, _ time.Time) (datastore.Revision, error) {
return nil, fmt.Errorf("hi")
func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) {
return datastore.ReadyState{
Message: "Ready",
IsReady: true,
}, nil
}

func (t testGC) DeleteBeforeTx(_ context.Context, _ datastore.Revision) (DeletionCounts, error) {
return DeletionCounts{}, fmt.Errorf("hi")
func (*fakeGC) Now(_ context.Context) (time.Time, error) {
return time.Now(), nil
}

func (t testGC) HasGCRun() bool {
return true
func (gc *fakeGC) TxIDBefore(_ context.Context, _ time.Time) (datastore.Revision, error) {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.lastRevision++

rev := revision.NewFromDecimal(decimal.NewFromInt(gc.lastRevision))

return rev, nil
}

func (t testGC) MarkGCCompleted() {
func (gc *fakeGC) DeleteBeforeTx(_ context.Context, rev datastore.Revision) (DeletionCounts, error) {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.metrics.deleteBeforeTxCount++

revInt := rev.(revision.Decimal).Decimal.IntPart()

return gc.deleter.DeleteBeforeTx(revInt)
}

func (t testGC) ResetGCCompleted() {
func (gc *fakeGC) HasGCRun() bool {
gc.lock.Lock()
defer gc.lock.Unlock()

return gc.metrics.markedCompleteCount > 0
}

func (gc *fakeGC) MarkGCCompleted() {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.metrics.markedCompleteCount++
}

func (gc *fakeGC) ResetGCCompleted() {
gc.lock.Lock()
defer gc.lock.Unlock()

gc.metrics.resetGCCompletedCount++
}

func (gc *fakeGC) GetMetrics() gcMetrics {
gc.lock.Lock()
defer gc.lock.Unlock()

return gc.metrics
}

// Allows specifying different deletion behaviors for tests
type gcDeleter interface {
DeleteBeforeTx(revision int64) (DeletionCounts, error)
}

// Always error trying to perform a delete
type alwaysErrorDeleter struct{}

func (alwaysErrorDeleter) DeleteBeforeTx(_ int64) (DeletionCounts, error) {
return DeletionCounts{}, fmt.Errorf("delete error")
}

// Only error on specific revisions
type revisionErrorDeleter struct {
errorOnRevisions []int64
}

func (d revisionErrorDeleter) DeleteBeforeTx(revision int64) (DeletionCounts, error) {
if slices.Contains(d.errorOnRevisions, revision) {
return DeletionCounts{}, fmt.Errorf("delete error")
}

return DeletionCounts{}, nil
}

func TestGCFailureBackoff(t *testing.T) {
Expand All @@ -49,7 +133,8 @@ func TestGCFailureBackoff(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, testGC{}, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute, localCounter))
gc := newFakeGC(alwaysErrorDeleter{})
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, &gc, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute, localCounter))
}()
time.Sleep(200 * time.Millisecond)
cancel()
Expand All @@ -70,7 +155,8 @@ func TestGCFailureBackoff(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
go func() {
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, testGC{}, 100*time.Millisecond, 0, 1*time.Second, 1*time.Minute, localCounter))
gc := newFakeGC(alwaysErrorDeleter{})
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, &gc, 100*time.Millisecond, 0, 1*time.Second, 1*time.Minute, localCounter))
}()
time.Sleep(200 * time.Millisecond)
cancel()
Expand All @@ -84,3 +170,34 @@ func TestGCFailureBackoff(t *testing.T) {
}
require.Less(t, *(mf.GetMetric()[0].Counter.Value), 3.0, "MaxElapsedTime=0 should have not caused backoff to get ignored")
}

// Ensure the garbage collector interval is reset after recovering from an
// error. The garbage collector should not continue to use the exponential
// backoff interval that is activated on error.
func TestGCFailureBackoffReset(t *testing.T) {
gc := newFakeGC(revisionErrorDeleter{
// Error on revisions 1 - 5, giving the exponential
// backoff enough time to fail the test if the interval
// is not reset properly.
errorOnRevisions: []int64{1, 2, 3, 4, 5},
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
interval := 10 * time.Millisecond
window := 10 * time.Second
timeout := 1 * time.Minute

require.Error(t, StartGarbageCollector(ctx, &gc, interval, window, timeout))
}()

time.Sleep(500 * time.Millisecond)
cancel()

// The next interval should have been reset after recovering from the error.
// If it is not reset, the last exponential backoff interval will not give
// the GC enough time to run.
require.Greater(t, gc.GetMetrics().markedCompleteCount, 20, "Next interval was not reset with backoff")
}

0 comments on commit 59e2696

Please sign in to comment.