Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle context cancelation in LookupResources processing #1389

Merged
merged 4 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/dispatch/graph/lookupresources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ func TestLookupResourcesImmediateTimeout(t *testing.T) {
},
}, stream)

require.NoError(err)
require.Empty(stream.Results())
require.ErrorIs(err, context.DeadlineExceeded)
require.ErrorContains(err, "context deadline exceeded")
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
}

func TestLookupResourcesWithError(t *testing.T) {
Expand Down
41 changes: 35 additions & 6 deletions internal/graph/checkingresourcestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ func newCheckingResourceStream(
concurrencyLimit = 1
}

// Since one goroutine is used for publishing, allocate one less processing goroutine.
processingConcurrencyLimit := concurrencyLimit - 1
if processingConcurrencyLimit == 0 {
processingConcurrencyLimit = 1
}

cancelCtx, cancel := context.WithCancel(lookupContext)

crs := &checkingResourceStream{
Expand All @@ -206,7 +212,7 @@ func newCheckingResourceStream(
parentStream: parentStream,
limits: limits,

sem: make(chan token, concurrencyLimit),
sem: make(chan token, processingConcurrencyLimit),

rq: &resourceQueue{
ctx: lookupContext,
Expand Down Expand Up @@ -246,7 +252,14 @@ func (crs *checkingResourceStream) waitForPublishing() (uint64, *v1.Cursor, erro
crs.processingWaitGroup.Wait()

// Mark publishing as ready for final publishing.
crs.availableForPublishing <- false
select {
case crs.availableForPublishing <- false:
break

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
break
}

// Wait for any remaining publishing to complete.
crs.publishingWaitGroup.Wait()
Expand All @@ -262,6 +275,7 @@ func (crs *checkingResourceStream) resourcePublisher() {
for {
select {
case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return

case isStillRunning := <-crs.availableForPublishing:
Expand Down Expand Up @@ -342,6 +356,7 @@ func (crs *checkingResourceStream) process() {
for {
select {
case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return

case <-crs.reachableResourcesCompleted:
Expand Down Expand Up @@ -464,8 +479,14 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error)
crs.rq.updateToBePublished(rai)
}

crs.availableForPublishing <- true
return true, nil
select {
case crs.availableForPublishing <- true:
return true, nil

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return false, nil
}
}

// spawnIfAvailable spawns a processing working, if the concurrency limit has not been reached.
Expand All @@ -480,6 +501,7 @@ func (crs *checkingResourceStream) spawnIfAvailable() {
go crs.process()

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return

default:
Expand All @@ -496,8 +518,15 @@ func (crs *checkingResourceStream) queue(result *v1.DispatchReachableResourcesRe
})
crs.reachableResourcesCount++
crs.lastResourceCursor = result.AfterResponseCursor
crs.reachableResourceAvailable <- struct{}{}
return true

select {
case crs.reachableResourceAvailable <- struct{}{}:
return true

case <-crs.ctx.Done():
crs.setError(crs.ctx.Err())
return false
}
}

// Publish implements the Stream interface and is invoked by the ReachableResources call.
Expand Down
4 changes: 2 additions & 2 deletions internal/graph/lookupresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func (cl *CursoredLookupResources) LookupResources(
reachableContext, cancelReachable := context.WithCancel(newContextForReachable)
defer cancelReachable()

limits, lCtx := newLimitTracker(lookupContext, req.OptionalLimit)
limits, _ := newLimitTracker(lookupContext, req.OptionalLimit)
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
reachableResourcesCursor := req.OptionalCursor

// Loop until the limit has been exhausted or no additional reachable resources are found (see below)
for !limits.hasExhaustedLimit() {
// Create a new handling stream that consumes the reachable resources results and publishes them
// to the parent stream, as found resources if they are properly checked.
checkingStream := newCheckingResourceStream(lCtx, reachableContext, req, cl.c, parentStream, limits, cl.concurrencyLimit)
checkingStream := newCheckingResourceStream(lookupContext, reachableContext, req, cl.c, parentStream, limits, cl.concurrencyLimit)

err := cl.r.DispatchReachableResources(&v1.DispatchReachableResourcesRequest{
ResourceRelation: req.ObjectRelation,
Expand Down
6 changes: 6 additions & 0 deletions internal/graph/reachableresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ func (crr *CursoredReachableResources) chunkedRedispatch(
return nil, nil
}

// If the number of results returned was less than the limit specified, then this is
// the final iteration and no cursor should be returned for the next iteration.
if rsm.len() < int(queryLimit) {
lastTpl = nil
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
}

return lastTpl, handler(ctx, ci, rsm.asReadOnly())
})
}
Expand Down
19 changes: 12 additions & 7 deletions internal/middleware/streamtimeout/streamtimeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package streamtimeout

import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"

"github.com/authzed/spicedb/pkg/spiceerrors"
)

// MustStreamServerInterceptor returns a new stream server interceptor that cancels the context
Expand All @@ -17,20 +21,21 @@ func MustStreamServerInterceptor(timeout time.Duration) grpc.StreamServerInterce

return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()
withCancel, cancelFn := context.WithCancel(ctx)
timer := time.AfterFunc(timeout, cancelFn)
wrapper := &sendWrapper{stream, withCancel, cancelFn, timer, timeout}
withCancel, internalCancelFn := context.WithCancelCause(ctx)
timer := time.AfterFunc(timeout, func() {
internalCancelFn(spiceerrors.WithCodeAndDetailsAsError(fmt.Errorf("operation took longer than allowed %v to complete", timeout), codes.DeadlineExceeded))
})
wrapper := &sendWrapper{stream, withCancel, timer, timeout}
return handler(srv, wrapper)
}
}

type sendWrapper struct {
grpc.ServerStream

ctx context.Context
cancelFn func()
timer *time.Timer
timeout time.Duration
ctx context.Context
timer *time.Timer
timeout time.Duration
}

func (s *sendWrapper) Context() context.Context {
Expand Down
7 changes: 7 additions & 0 deletions internal/services/shared/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ func RewriteError(ctx context.Context, err error) error {
case errors.Is(err, context.DeadlineExceeded):
return status.Errorf(codes.DeadlineExceeded, "%s", err)
case errors.Is(err, context.Canceled):
err := context.Cause(ctx)
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if _, ok := status.FromError(err); ok {
return err
}
}

return status.Errorf(codes.Canceled, "%s", err)
default:
log.Ctx(ctx).Err(err).Msg("received unexpected error")
Expand Down
4 changes: 2 additions & 2 deletions internal/services/v1/relationships_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,8 +1344,8 @@ func TestReadRelationshipsWithTimeout(t *testing.T) {
continue
}

require.ErrorContains(err, "context canceled")
grpcutil.RequireStatus(t, codes.Canceled, err)
require.ErrorContains(err, "operation took longer than allowed 1ns to complete")
grpcutil.RequireStatus(t, codes.DeadlineExceeded, err)
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/spiceerrors/withstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func WithCodeAndDetails(err error, code codes.Code, details ...protoiface.Messag
return created
}

// WithCodeAndDetailsAsError returns an error containing the error's message, the given
// status code and any supplied details.
func WithCodeAndDetailsAsError(err error, code codes.Code, details ...protoiface.MessageV1) error {
status := WithCodeAndDetails(err, code, details...)
return errWithStatus{err, status}
}

// ForReason returns an ErrorInfo block for a specific error reason as defined in the V1 API.
func ForReason(reason v1.ErrorReason, metadata map[string]string) *errdetails.ErrorInfo {
return &errdetails.ErrorInfo{
Expand Down