-
Notifications
You must be signed in to change notification settings - Fork 244
/
lookupresources.go
94 lines (79 loc) · 3.58 KB
/
lookupresources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package graph
import (
"context"
"errors"
"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
"github.com/authzed/spicedb/pkg/tuple"
)
// NewCursoredLookupResources creates and instance of CursoredLookupResources.
func NewCursoredLookupResources(c dispatch.Check, r dispatch.ReachableResources, concurrencyLimit uint16) *CursoredLookupResources {
return &CursoredLookupResources{c, r, concurrencyLimit}
}
// CursoredLookupResources exposes a method to perform LookupResources requests, and delegates subproblems to the
// provided dispatch.Lookup instance.
type CursoredLookupResources struct {
c dispatch.Check
r dispatch.ReachableResources
concurrencyLimit uint16
}
// ValidatedLookupResourcesRequest represents a request after it has been validated and parsed for internal
// consumption.
type ValidatedLookupResourcesRequest struct {
*v1.DispatchLookupResourcesRequest
Revision datastore.Revision
}
func (cl *CursoredLookupResources) LookupResources(
req ValidatedLookupResourcesRequest,
parentStream dispatch.LookupResourcesStream,
) error {
if req.Subject.ObjectId == tuple.PublicWildcard {
return NewErrInvalidArgument(errors.New("cannot perform lookup resources on wildcard"))
}
lookupContext := parentStream.Context()
limits := newLimitTracker(req.OptionalLimit)
reachableResourcesCursor := req.OptionalCursor
// Loop until the limit has been exhausted or no additional reachable resources are found (see below)
for !limits.hasExhaustedLimit() {
errCanceledBecauseNoAdditionalResourcesNeeded := errors.New("canceled because no additional reachable resources are needed")
// Create a new context for just the reachable resources. This is necessary because we don't want the cancelation
// of the reachable resources to cancel the lookup resources. The checking stream manually cancels the reachable
// resources context once the expected number of results has been reached.
reachableContext, cancelReachable := branchContext(lookupContext)
// Create a new handling stream that consumes the reachable resources results and publishes them
// to the parent stream, as found resources if they are properly checked.
checkingStream := newCheckingResourceStream(lookupContext, reachableContext, func() {
cancelReachable(errCanceledBecauseNoAdditionalResourcesNeeded)
}, req, cl.c, parentStream, limits, cl.concurrencyLimit)
err := cl.r.DispatchReachableResources(&v1.DispatchReachableResourcesRequest{
ResourceRelation: req.ObjectRelation,
SubjectRelation: &core.RelationReference{
Namespace: req.Subject.Namespace,
Relation: req.Subject.Relation,
},
SubjectIds: []string{req.Subject.ObjectId},
Metadata: req.Metadata,
OptionalCursor: reachableResourcesCursor,
}, checkingStream)
if err != nil {
// If the reachable resources was canceled explicitly by the checking stream because the limit has been
// reached, then this error can safely be ignored. Otherwise, it must be returned.
isAllowedCancelErr := errors.Is(context.Cause(reachableContext), errCanceledBecauseNoAdditionalResourcesNeeded)
if !isAllowedCancelErr {
return err
}
}
reachableCount, newCursor, err := checkingStream.waitForPublishing()
if err != nil {
return err
}
reachableResourcesCursor = newCursor
// If no additional reachable results were found or the request was unlimited, then we can stop.
if reachableCount == 0 || req.OptionalLimit == 0 {
return nil
}
}
return nil
}