From 8f0e340247d4694bffac7c8ef2239c415fc6c171 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Wed, 13 Oct 2021 18:37:56 -0400 Subject: [PATCH] use mocked time for testing request heding Signed-off-by: Jake Moshenko --- go.mod | 1 + go.sum | 2 + internal/datastore/proxy/hedging.go | 32 +++- internal/datastore/proxy/hedging_test.go | 218 +++++++++++------------ 4 files changed, 135 insertions(+), 118 deletions(-) diff --git a/go.mod b/go.mod index f2ce41eb95..781f36aefd 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/authzed/authzed-go v0.1.1-0.20210923172306-b4b512e4d359 github.com/authzed/grpcutil v0.0.0-20210914195113-c0d8369e7e1f github.com/aws/aws-sdk-go v1.40.53 + github.com/benbjohnson/clock v1.1.0 github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cespare/xxhash v1.1.0 github.com/containerd/continuity v0.0.0-20210315143101-93e15499afd5 // indirect diff --git a/go.sum b/go.sum index d38e9856d2..566e0128f3 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,8 @@ github.com/authzed/grpcutil v0.0.0-20210914195113-c0d8369e7e1f/go.mod h1:HwO/KbR github.com/aws/aws-sdk-go v1.17.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.40.53 h1:wi4UAslOQ1HfF2NjnIwI6st8n7sQg7shUUNLkaCgIpc= github.com/aws/aws-sdk-go v1.40.53/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/internal/datastore/proxy/hedging.go b/internal/datastore/proxy/hedging.go index 7497926e59..d7d155e737 100644 --- a/internal/datastore/proxy/hedging.go +++ b/internal/datastore/proxy/hedging.go @@ -8,6 +8,7 @@ import ( v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/benbjohnson/clock" "github.com/influxdata/tdigest" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" @@ -43,6 +44,7 @@ type subrequest func(ctx context.Context, responseReady chan<- struct{}) type hedger func(ctx context.Context, req subrequest) func newHedger( + timeSource clock.Clock, initialSlowRequestThreshold time.Duration, maxSampleCount uint64, quantile float64, @@ -68,8 +70,8 @@ func newHedger( digestLock.Unlock() slowRequestThreshold := time.Duration(slowRequestThresholdSeconds * float64(time.Second)) - timer := time.NewTimer(slowRequestThreshold) - start := time.Now() + timer := timeSource.Timer(slowRequestThreshold) + start := timeSource.Now() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -86,7 +88,7 @@ func newHedger( } // Compute how long it took for us to get any answer - duration := time.Since(start) + duration := timeSource.Since(start) digestLock.Lock() defer digestLock.Unlock() @@ -124,6 +126,22 @@ func NewHedgingProxy( initialSlowRequestThreshold time.Duration, maxSampleCount uint64, hedgingQuantile float64, +) datastore.Datastore { + return newHedgingProxyWithTimeSource( + delegate, + initialSlowRequestThreshold, + maxSampleCount, + hedgingQuantile, + clock.New(), + ) +} + +func newHedgingProxyWithTimeSource( + delegate datastore.Datastore, + initialSlowRequestThreshold time.Duration, + maxSampleCount uint64, + hedgingQuantile float64, + timeSource clock.Clock, ) datastore.Datastore { if initialSlowRequestThreshold < 0 { panic("initial slow request threshold negative") @@ -139,10 +157,10 @@ func NewHedgingProxy( return hedgingProxy{ delegate, - newHedger(initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), - newHedger(initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), - newHedger(initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), - newHedger(initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), + newHedger(timeSource, initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), + newHedger(timeSource, initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), + newHedger(timeSource, initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), + newHedger(timeSource, initialSlowRequestThreshold, maxSampleCount, hedgingQuantile), } } diff --git a/internal/datastore/proxy/hedging_test.go b/internal/datastore/proxy/hedging_test.go index 6ef4cae567..62cf31fc46 100644 --- a/internal/datastore/proxy/hedging_test.go +++ b/internal/datastore/proxy/hedging_test.go @@ -3,10 +3,12 @@ package proxy import ( "context" "errors" + "runtime" "testing" "time" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/benbjohnson/clock" "github.com/shopspring/decimal" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -89,8 +91,11 @@ func TestDatastoreRequestHedging(t *testing.T) { for _, tc := range testCases { t.Run(tc.methodName, func(t *testing.T) { + mockTime := clock.NewMock() delegate := &test.MockedDatastore{} - proxy := NewHedgingProxy(delegate, slowQueryTime, maxSampleCount, quantile) + proxy := newHedgingProxyWithTimeSource( + delegate, slowQueryTime, maxSampleCount, quantile, mockTime, + ) delegate. On(tc.methodName, tc.arguments...). @@ -102,7 +107,7 @@ func TestDatastoreRequestHedging(t *testing.T) { delegate. On(tc.methodName, tc.arguments...). - After(2 * slowQueryTime). + WaitUntil(mockTime.After(2 * slowQueryTime)). Return(tc.firstCallResults...). Once() delegate. @@ -110,47 +115,53 @@ func TestDatastoreRequestHedging(t *testing.T) { Return(tc.secondCallResults...). Once() + done := autoAdvance(mockTime, slowQueryTime, 3*slowQueryTime) + tc.f(t, proxy, false) delegate.AssertExpectations(t) + <-done + delegate. On(tc.methodName, tc.arguments...). - After(2 * slowQueryTime). + WaitUntil(mockTime.After(2 * slowQueryTime)). Return(tc.firstCallResults...). Once() delegate. On(tc.methodName, tc.arguments...). - After(2 * slowQueryTime). + WaitUntil(mockTime.After(3 * slowQueryTime)). Return(tc.secondCallResults...). Once() + autoAdvance(mockTime, slowQueryTime, 4*slowQueryTime) + tc.f(t, proxy, true) delegate.AssertExpectations(t) }) } } -func TestTupleQueryRequestHedging(t *testing.T) { - require := require.New(t) +type isMock interface { + On(methodName string, arguments ...interface{}) *mock.Call - delegate := &test.MockedTupleQuery{} - proxy := &hedgingTupleQuery{ - delegate, - newHedger(slowQueryTime, maxSampleCount, quantile), - } + AssertExpectations(t mock.TestingT) bool +} + +func runQueryTest(t *testing.T, delegate isMock, mockTime *clock.Mock, exec tupleExecutor) { + require := require.New(t) delegate. On("Execute", mock.Anything). Return(datastore.NewSliceTupleIterator(nil), errKnown). Once() - _, err := proxy.Execute(context.Background()) + _, err := exec(context.Background()) require.ErrorIs(errKnown, err) delegate.AssertExpectations(t) delegate. On("Execute", mock.Anything). - After(2*slowQueryTime). + WaitUntil(mockTime.After(2*slowQueryTime)). Return(datastore.NewSliceTupleIterator(nil), errKnown). Once() delegate. @@ -158,152 +169,104 @@ func TestTupleQueryRequestHedging(t *testing.T) { Return(datastore.NewSliceTupleIterator(nil), errAnotherKnown). Once() - _, err = proxy.Execute(context.Background()) + done := autoAdvance(mockTime, slowQueryTime, 3*slowQueryTime) + + _, err = exec(context.Background()) require.ErrorIs(errAnotherKnown, err) delegate.AssertExpectations(t) + <-done delegate. On("Execute", mock.Anything). - After(2*slowQueryTime). + WaitUntil(mockTime.After(2*slowQueryTime)). Return(datastore.NewSliceTupleIterator(nil), errKnown). Once() delegate. On("Execute", mock.Anything). - After(2*slowQueryTime). + WaitUntil(mockTime.After(3*slowQueryTime)). Return(datastore.NewSliceTupleIterator(nil), errAnotherKnown). Once() - _, err = proxy.Execute(context.Background()) + autoAdvance(mockTime, slowQueryTime, 4*slowQueryTime) + + _, err = exec(context.Background()) require.ErrorIs(errKnown, err) delegate.AssertExpectations(t) } -func TestReverseTupleQueryRequestHedging(t *testing.T) { - require := require.New(t) +func TestTupleQueryRequestHedging(t *testing.T) { + delegate := &test.MockedTupleQuery{} + mockTime := clock.NewMock() + proxy := &hedgingTupleQuery{ + delegate, + newHedger(mockTime, slowQueryTime, maxSampleCount, quantile), + } + + runQueryTest(t, delegate, mockTime, proxy.Execute) +} +func TestReverseTupleQueryRequestHedging(t *testing.T) { delegate := &test.MockedReverseTupleQuery{} + mockTime := clock.NewMock() proxy := &hedgingReverseTupleQuery{ delegate, - newHedger(slowQueryTime, maxSampleCount, quantile), + newHedger(mockTime, slowQueryTime, maxSampleCount, quantile), } - delegate. - On("Execute", mock.Anything). - Return(datastore.NewSliceTupleIterator(nil), errKnown). - Once() - - _, err := proxy.Execute(context.Background()) - require.ErrorIs(errKnown, err) - delegate.AssertExpectations(t) - - delegate. - On("Execute", mock.Anything). - After(2*slowQueryTime). - Return(datastore.NewSliceTupleIterator(nil), errKnown). - Once() - delegate. - On("Execute", mock.Anything). - Return(datastore.NewSliceTupleIterator(nil), errAnotherKnown). - Once() - - _, err = proxy.Execute(context.Background()) - require.ErrorIs(errAnotherKnown, err) - delegate.AssertExpectations(t) - - delegate. - On("Execute", mock.Anything). - After(2*slowQueryTime). - Return(datastore.NewSliceTupleIterator(nil), errKnown). - Once() - delegate. - On("Execute", mock.Anything). - After(2*slowQueryTime). - Return(datastore.NewSliceTupleIterator(nil), errAnotherKnown). - Once() - - _, err = proxy.Execute(context.Background()) - require.ErrorIs(errKnown, err) - delegate.AssertExpectations(t) + runQueryTest(t, delegate, mockTime, proxy.Execute) } func TestCommonTupleQueryRequestHedging(t *testing.T) { - require := require.New(t) - delegate := &test.MockedCommonTupleQuery{} + mockTime := clock.NewMock() proxy := &hedgingCommonTupleQuery{ delegate, - newHedger(slowQueryTime, maxSampleCount, quantile), + newHedger(mockTime, slowQueryTime, maxSampleCount, quantile), } - delegate. - On("Execute", mock.Anything). - Return(datastore.NewSliceTupleIterator(nil), errKnown). - Once() - - _, err := proxy.Execute(context.Background()) - require.ErrorIs(errKnown, err) - delegate.AssertExpectations(t) - - delegate. - On("Execute", mock.Anything). - After(2*slowQueryTime). - Return(datastore.NewSliceTupleIterator(nil), errKnown). - Once() - delegate. - On("Execute", mock.Anything). - Return(datastore.NewSliceTupleIterator(nil), errAnotherKnown). - Once() - - _, err = proxy.Execute(context.Background()) - require.ErrorIs(errAnotherKnown, err) - delegate.AssertExpectations(t) - - delegate. - On("Execute", mock.Anything). - After(2*slowQueryTime). - Return(datastore.NewSliceTupleIterator(nil), errKnown). - Once() - delegate. - On("Execute", mock.Anything). - After(2*slowQueryTime). - Return(datastore.NewSliceTupleIterator(nil), errAnotherKnown). - Once() - - _, err = proxy.Execute(context.Background()) - require.ErrorIs(errKnown, err) - delegate.AssertExpectations(t) + runQueryTest(t, delegate, mockTime, proxy.Execute) } func TestDigestRollover(t *testing.T) { require := require.New(t) delegate := &test.MockedCommonTupleQuery{} + mockTime := clock.NewMock() proxy := &hedgingCommonTupleQuery{ delegate, - newHedger(slowQueryTime, 100, 0.9999999999), + newHedger(mockTime, slowQueryTime, 100, 0.9999999999), } // Simulate a request that starts off fast enough delegate. On("Execute", mock.Anything). - After(slowQueryTime/2). + WaitUntil(mockTime.After(slowQueryTime/2)). Return(datastore.NewSliceTupleIterator(nil), errKnown). Once() + done := autoAdvance(mockTime, slowQueryTime/4, slowQueryTime*2) + _, err := proxy.Execute(context.Background()) require.ErrorIs(err, errKnown) delegate.AssertExpectations(t) - delegate. - On("Execute", mock.Anything). - After(100*time.Microsecond). - Return(datastore.NewSliceTupleIterator(nil), errKnown) + <-done + + for i := time.Duration(0); i < 205; i++ { + delegate. + On("Execute", mock.Anything). + WaitUntil(mockTime.After(i*100*time.Microsecond)). + Return(datastore.NewSliceTupleIterator(nil), errKnown). + Once() + } + + done = autoAdvance(mockTime, 100*time.Microsecond, 205*100*time.Microsecond) for i := 0; i < 200; i++ { _, err := proxy.Execute(context.Background()) require.ErrorIs(err, errKnown) } - delegate.AssertExpectations(t) + <-done delegate.ExpectedCalls = nil @@ -311,15 +274,17 @@ func TestDigestRollover(t *testing.T) { // request is hedged delegate. On("Execute", mock.Anything). - After(slowQueryTime/2). + WaitUntil(mockTime.After(slowQueryTime/2)). Return(datastore.NewSliceTupleIterator(nil), errKnown). Once() delegate. On("Execute", mock.Anything). - After(100*time.Microsecond). + WaitUntil(mockTime.After(100*time.Microsecond)). Return(datastore.NewSliceTupleIterator(nil), errAnotherKnown). Once() + autoAdvance(mockTime, 100*time.Microsecond, slowQueryTime) + _, err = proxy.Execute(context.Background()) require.ErrorIs(errAnotherKnown, err) delegate.AssertExpectations(t) @@ -355,8 +320,11 @@ func TestDatastoreE2E(t *testing.T) { delegateDatastore := &test.MockedDatastore{} delegateQuery := &test.MockedTupleQuery{} + mockTime := clock.NewMock() - proxy := NewHedgingProxy(delegateDatastore, slowQueryTime, maxSampleCount, quantile) + proxy := newHedgingProxyWithTimeSource( + delegateDatastore, slowQueryTime, maxSampleCount, quantile, mockTime, + ) delegateDatastore. On("QueryTuples", mock.Anything, mock.Anything). @@ -384,13 +352,15 @@ func TestDatastoreE2E(t *testing.T) { delegateQuery. On("Execute", mock.Anything). Return(datastore.NewSliceTupleIterator(expectedTuples), nil). - After(2 * slowQueryTime). + WaitUntil(mockTime.After(2 * slowQueryTime)). Once() delegateQuery. On("Execute", mock.Anything). Return(datastore.NewSliceTupleIterator(expectedTuples), nil). Once() + autoAdvance(mockTime, slowQueryTime/2, 2*slowQueryTime) + it, err := proxy.QueryTuples(datastore.TupleQueryResourceFilter{ ResourceType: "test", }, revisionKnown).Execute(context.Background()) @@ -410,20 +380,46 @@ func TestContextCancellation(t *testing.T) { require := require.New(t) delegate := &test.MockedDatastore{} - proxy := NewHedgingProxy(delegate, slowQueryTime, maxSampleCount, quantile) + mockTime := clock.NewMock() + proxy := newHedgingProxyWithTimeSource( + delegate, slowQueryTime, maxSampleCount, quantile, mockTime, + ) delegate. On("SyncRevision", mock.Anything). Return(decimal.Zero, errKnown). - After(500 * time.Microsecond). + WaitUntil(mockTime.After(500 * time.Microsecond)). Once() ctx, cancel := context.WithCancel(context.Background()) go func() { - time.Sleep(100 * time.Microsecond) + mockTime.Sleep(100 * time.Microsecond) cancel() }() + + autoAdvance(mockTime, 150*time.Microsecond, 1*time.Millisecond) + _, err := proxy.SyncRevision(ctx) require.Error(err) } + +func autoAdvance(timeSource *clock.Mock, step time.Duration, totalTime time.Duration) <-chan time.Time { + done := make(chan time.Time) + + go func() { + defer close(done) + + endTime := timeSource.Now().Add(totalTime) + + runtime.Gosched() + for now := timeSource.Now(); now.Before(endTime); now = timeSource.Now() { + timeSource.Add(step) + runtime.Gosched() + } + + done <- timeSource.Now() + }() + + return done +}