Skip to content

Commit

Permalink
add request hedging as an option to the serve command
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Moshenko <jacob.moshenko@gmail.com>
  • Loading branch information
jakedt committed Oct 14, 2021
1 parent 0bc713b commit 2756965
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 26 deletions.
24 changes: 24 additions & 0 deletions cmd/spicedb/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func registerServeCmd(rootCmd *cobra.Command) {
serveCmd.Flags().String("datastore-tx-overlap-strategy", "static", `strategy to generate transaction overlap keys ("prefix", "static", "insecure") (cockroach driver only)`)
serveCmd.Flags().String("datastore-tx-overlap-key", "key", "static key to touch when writing to ensure transactions overlap (only used if --datastore-tx-overlap-strategy=static is set; cockroach driver only)")

serveCmd.Flags().Bool("datastore-request-hedging", true, "enable request hedging")
serveCmd.Flags().Duration("datastore-request-hedging-initial-slow-value", 10*time.Millisecond, "initial value to use for slow datastore requests, before statistics have been collected")
serveCmd.Flags().Uint64("datastore-request-hedging-max-requests", 1_000_000, "maximum number of historical requests to consider")
serveCmd.Flags().Float64("datastore-request-hedging-quantile", 0.95, "quantile of historical datastore request time over which a request will be considered slow")

// Flags for the namespace manager
serveCmd.Flags().Duration("ns-cache-expiration", 1*time.Minute, "amount of time a namespace entry should remain cached")

Expand Down Expand Up @@ -203,6 +208,25 @@ func serveRun(cmd *cobra.Command, args []string) {
}
}

if cobrautil.MustGetBool(cmd, "datastore-request-hedging") {
initialSlowRequest := cobrautil.MustGetDuration(cmd, "datastore-request-hedging-initial-slow-value")
maxRequests := cobrautil.MustGetUint64(cmd, "datastore-request-hedging-max-requests")
hedgingQuantile := cobrautil.MustGetFloat64(cmd, "datastore-request-hedging-quantile")

log.Info().
Stringer("initialSlowRequest", initialSlowRequest).
Uint64("maxRequests", maxRequests).
Float64("hedgingQuantile", hedgingQuantile).
Msg("request hedging enabled")

ds = proxy.NewHedgingProxy(
ds,
initialSlowRequest,
maxRequests,
hedgingQuantile,
)
}

if cobrautil.MustGetBool(cmd, "datastore-readonly") {
log.Warn().Msg("setting the service to read-only")
ds = proxy.NewReadonlyDatastore(ds)
Expand Down
55 changes: 29 additions & 26 deletions internal/datastore/proxy/hedging.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,10 @@ func newHedger(

select {
case <-responseReady:
case <-ctx.Done():
case <-timer.C:
log.Debug().Dur("after", slowRequestThreshold).Msg("sending hedged datastore request")
go req(ctx, responseReady)
select {
case <-responseReady:
case <-ctx.Done():
}
<-responseReady
}

// Compute how long it took for us to get any answer
Expand All @@ -75,7 +71,7 @@ func newHedger(

// Swap the current active digest if it has too many samples
if digests[0].Count() >= float64(maxSampleCount) {
log.Debug().Float64("count", digests[0].Count()).Msg("switching to next hedging digest")
log.Trace().Float64("count", digests[0].Count()).Msg("switching to next hedging digest")
exhausted := digests[0]
digests = digests[1:]
exhausted.Reset()
Expand Down Expand Up @@ -231,17 +227,38 @@ type hedgingTupleQuery struct {
queryTuplesHedger hedger
}

func (htq hedgingTupleQuery) Execute(ctx context.Context) (delegateIterator datastore.TupleIterator, err error) {
type tupleExecutor func(ctx context.Context) (datastore.TupleIterator, error)

func executeQuery(ctx context.Context, exec tupleExecutor, queryHedger hedger) (delegateIterator datastore.TupleIterator, err error) {
doneLock := sync.Mutex{}
alreadyReturned := false

subreq := func(ctx context.Context, responseReady chan<- struct{}) {
delegateIterator, err = htq.delegate.Execute(ctx)
responseReady <- struct{}{}
tempIterator, tempErr := exec(ctx)
doneLock.Lock()
defer doneLock.Unlock()

if alreadyReturned {
if tempErr == nil {
tempIterator.Close()
}
} else {
alreadyReturned = true
delegateIterator = tempIterator
err = tempErr
responseReady <- struct{}{}
}
}

htq.queryTuplesHedger(ctx, subreq)
queryHedger(ctx, subreq)

return
}

func (htq hedgingTupleQuery) Execute(ctx context.Context) (delegateIterator datastore.TupleIterator, err error) {
return executeQuery(ctx, htq.delegate.Execute, htq.queryTuplesHedger)
}

func (htq hedgingTupleQuery) Limit(limit uint64) datastore.CommonTupleQuery {
return hedgingCommonTupleQuery{
htq.delegate.Limit(limit),
Expand All @@ -265,14 +282,7 @@ type hedgingReverseTupleQuery struct {
}

func (hrtq hedgingReverseTupleQuery) Execute(ctx context.Context) (delegateIterator datastore.TupleIterator, err error) {
subreq := func(ctx context.Context, responseReady chan<- struct{}) {
delegateIterator, err = hrtq.delegate.Execute(ctx)
responseReady <- struct{}{}
}

hrtq.queryTuplesHedger(ctx, subreq)

return
return executeQuery(ctx, hrtq.delegate.Execute, hrtq.queryTuplesHedger)
}

func (hrtq hedgingReverseTupleQuery) Limit(limit uint64) datastore.CommonTupleQuery {
Expand All @@ -298,12 +308,5 @@ func (hctq hedgingCommonTupleQuery) Limit(limit uint64) datastore.CommonTupleQue
}

func (hctq hedgingCommonTupleQuery) Execute(ctx context.Context) (delegateIterator datastore.TupleIterator, err error) {
subreq := func(ctx context.Context, responseReady chan<- struct{}) {
delegateIterator, err = hctq.delegate.Execute(ctx)
responseReady <- struct{}{}
}

hctq.queryTuplesHedger(ctx, subreq)

return
return executeQuery(ctx, hctq.delegate.Execute, hctq.queryTuplesHedger)
}
78 changes: 78 additions & 0 deletions internal/datastore/proxy/hedging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,81 @@ func TestBadArgs(t *testing.T) {
}
require.Panics(invalidQuantileTooLarge)
}

func TestDatastoreE2E(t *testing.T) {
require := require.New(t)

delegateDatastore := &test.MockedDatastore{}
delegateQuery := &test.MockedTupleQuery{}

proxy := NewHedgingProxy(delegateDatastore, slowQueryTime, maxSampleCount, quantile)

delegateDatastore.
On("QueryTuples", mock.Anything, mock.Anything).
Return(delegateQuery).
Once()

expectedTuples := []*v0.RelationTuple{
{
ObjectAndRelation: &v0.ObjectAndRelation{
Namespace: "test",
ObjectId: "test",
Relation: "test",
},
User: &v0.User{
UserOneof: &v0.User_Userset{
Userset: &v0.ObjectAndRelation{
Namespace: "test",
ObjectId: "test",
Relation: "test",
},
},
},
},
}
delegateQuery.
On("Execute", mock.Anything).
Return(datastore.NewSliceTupleIterator(expectedTuples), nil).
After(2 * slowQueryTime).
Once()
delegateQuery.
On("Execute", mock.Anything).
Return(datastore.NewSliceTupleIterator(expectedTuples), nil).
Once()

it, err := proxy.QueryTuples(datastore.TupleQueryResourceFilter{
ResourceType: "test",
}, revisionKnown).Execute(context.Background())
require.NoError(err)

only := it.Next()
require.Equal(expectedTuples[0], only)

require.Nil(it.Next())
require.NoError(it.Err())

delegateDatastore.AssertExpectations(t)
delegateQuery.AssertExpectations(t)
}

func TestContextCancellation(t *testing.T) {
require := require.New(t)

delegate := &test.MockedDatastore{}
proxy := NewHedgingProxy(delegate, slowQueryTime, maxSampleCount, quantile)

delegate.
On("SyncRevision", mock.Anything).
Return(decimal.Zero, errKnown).
After(500 * time.Microsecond).
Once()

ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(100 * time.Microsecond)
cancel()
}()
_, err := proxy.SyncRevision(ctx)

require.Error(err)
}

0 comments on commit 2756965

Please sign in to comment.