-
Notifications
You must be signed in to change notification settings - Fork 246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix garbage collector interval and backoff resets #1663
Changes from all commits
8e4d9fd
4eac203
9c0b5b4
1dbc173
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,42 +3,126 @@ package common | |
import ( | ||
"context" | ||
"fmt" | ||
"slices" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/authzed/spicedb/pkg/datastore" | ||
"github.com/authzed/spicedb/pkg/datastore/revision" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
promclient "github.com/prometheus/client_model/go" | ||
"github.com/shopspring/decimal" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type testGC struct{} | ||
// Fake garbage collector that returns a new incremented revision each time | ||
// TxIDBefore is called. | ||
type fakeGC struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is great 馃憤馃徎 |
||
lastRevision int64 | ||
deleter gcDeleter | ||
metrics gcMetrics | ||
lock sync.RWMutex | ||
} | ||
|
||
func (t testGC) ReadyState(_ context.Context) (datastore.ReadyState, error) { | ||
return datastore.ReadyState{}, fmt.Errorf("hi") | ||
type gcMetrics struct { | ||
deleteBeforeTxCount int | ||
markedCompleteCount int | ||
resetGCCompletedCount int | ||
} | ||
|
||
func (t testGC) Now(_ context.Context) (time.Time, error) { | ||
return time.Now(), fmt.Errorf("hi") | ||
func newFakeGC(deleter gcDeleter) fakeGC { | ||
return fakeGC{ | ||
lastRevision: 0, | ||
deleter: deleter, | ||
} | ||
} | ||
|
||
func (t testGC) TxIDBefore(_ context.Context, _ time.Time) (datastore.Revision, error) { | ||
return nil, fmt.Errorf("hi") | ||
func (*fakeGC) ReadyState(_ context.Context) (datastore.ReadyState, error) { | ||
return datastore.ReadyState{ | ||
Message: "Ready", | ||
IsReady: true, | ||
}, nil | ||
} | ||
|
||
func (t testGC) DeleteBeforeTx(_ context.Context, _ datastore.Revision) (DeletionCounts, error) { | ||
return DeletionCounts{}, fmt.Errorf("hi") | ||
func (*fakeGC) Now(_ context.Context) (time.Time, error) { | ||
return time.Now(), nil | ||
} | ||
|
||
func (t testGC) HasGCRun() bool { | ||
return true | ||
func (gc *fakeGC) TxIDBefore(_ context.Context, _ time.Time) (datastore.Revision, error) { | ||
gc.lock.Lock() | ||
defer gc.lock.Unlock() | ||
|
||
gc.lastRevision++ | ||
|
||
rev := revision.NewFromDecimal(decimal.NewFromInt(gc.lastRevision)) | ||
|
||
return rev, nil | ||
} | ||
|
||
func (t testGC) MarkGCCompleted() { | ||
func (gc *fakeGC) DeleteBeforeTx(_ context.Context, rev datastore.Revision) (DeletionCounts, error) { | ||
gc.lock.Lock() | ||
defer gc.lock.Unlock() | ||
|
||
gc.metrics.deleteBeforeTxCount++ | ||
|
||
revInt := rev.(revision.Decimal).Decimal.IntPart() | ||
|
||
return gc.deleter.DeleteBeforeTx(revInt) | ||
} | ||
|
||
func (t testGC) ResetGCCompleted() { | ||
func (gc *fakeGC) HasGCRun() bool { | ||
gc.lock.Lock() | ||
defer gc.lock.Unlock() | ||
|
||
return gc.metrics.markedCompleteCount > 0 | ||
} | ||
|
||
func (gc *fakeGC) MarkGCCompleted() { | ||
gc.lock.Lock() | ||
defer gc.lock.Unlock() | ||
|
||
gc.metrics.markedCompleteCount++ | ||
} | ||
|
||
func (gc *fakeGC) ResetGCCompleted() { | ||
gc.lock.Lock() | ||
defer gc.lock.Unlock() | ||
|
||
gc.metrics.resetGCCompletedCount++ | ||
} | ||
|
||
func (gc *fakeGC) GetMetrics() gcMetrics { | ||
gc.lock.Lock() | ||
defer gc.lock.Unlock() | ||
|
||
return gc.metrics | ||
} | ||
|
||
// Allows specifying different deletion behaviors for tests | ||
type gcDeleter interface { | ||
DeleteBeforeTx(revision int64) (DeletionCounts, error) | ||
} | ||
|
||
// Always error trying to perform a delete | ||
type alwaysErrorDeleter struct{} | ||
|
||
func (alwaysErrorDeleter) DeleteBeforeTx(_ int64) (DeletionCounts, error) { | ||
return DeletionCounts{}, fmt.Errorf("delete error") | ||
} | ||
|
||
// Only error on specific revisions | ||
type revisionErrorDeleter struct { | ||
errorOnRevisions []int64 | ||
} | ||
|
||
func (d revisionErrorDeleter) DeleteBeforeTx(revision int64) (DeletionCounts, error) { | ||
if slices.Contains(d.errorOnRevisions, revision) { | ||
return DeletionCounts{}, fmt.Errorf("delete error") | ||
} | ||
|
||
return DeletionCounts{}, nil | ||
} | ||
|
||
func TestGCFailureBackoff(t *testing.T) { | ||
|
@@ -49,7 +133,8 @@ func TestGCFailureBackoff(t *testing.T) { | |
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
go func() { | ||
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, testGC{}, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute, localCounter)) | ||
gc := newFakeGC(alwaysErrorDeleter{}) | ||
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, &gc, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute, localCounter)) | ||
}() | ||
time.Sleep(200 * time.Millisecond) | ||
cancel() | ||
|
@@ -70,7 +155,8 @@ func TestGCFailureBackoff(t *testing.T) { | |
ctx, cancel = context.WithCancel(context.Background()) | ||
defer cancel() | ||
go func() { | ||
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, testGC{}, 100*time.Millisecond, 0, 1*time.Second, 1*time.Minute, localCounter)) | ||
gc := newFakeGC(alwaysErrorDeleter{}) | ||
require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, &gc, 100*time.Millisecond, 0, 1*time.Second, 1*time.Minute, localCounter)) | ||
}() | ||
time.Sleep(200 * time.Millisecond) | ||
cancel() | ||
|
@@ -84,3 +170,34 @@ func TestGCFailureBackoff(t *testing.T) { | |
} | ||
require.Less(t, *(mf.GetMetric()[0].Counter.Value), 3.0, "MaxElapsedTime=0 should have not caused backoff to get ignored") | ||
} | ||
|
||
// Ensure the garbage collector interval is reset after recovering from an | ||
// error. The garbage collector should not continue to use the exponential | ||
// backoff interval that is activated on error. | ||
func TestGCFailureBackoffReset(t *testing.T) { | ||
gc := newFakeGC(revisionErrorDeleter{ | ||
// Error on revisions 1 - 5, giving the exponential | ||
// backoff enough time to fail the test if the interval | ||
// is not reset properly. | ||
errorOnRevisions: []int64{1, 2, 3, 4, 5}, | ||
}) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
go func() { | ||
interval := 10 * time.Millisecond | ||
window := 10 * time.Second | ||
timeout := 1 * time.Minute | ||
|
||
require.Error(t, StartGarbageCollector(ctx, &gc, interval, window, timeout)) | ||
}() | ||
|
||
time.Sleep(500 * time.Millisecond) | ||
cancel() | ||
|
||
// The next interval should have been reset after recovering from the error. | ||
// If it is not reset, the last exponential backoff interval will not give | ||
// the GC enough time to run. | ||
require.Greater(t, gc.GetMetrics().markedCompleteCount, 20, "Next interval was not reset with backoff") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the other issue I discovered during testing. Without this, the initial interval used by the backoff will be the default
500ms
untilbackoffInterval.Reset()
is called. This is an edge case, but it can cause the wrong initial value to be used if the first iteration of the GC has an error.