-
Notifications
You must be signed in to change notification settings - Fork 246
/
dispatch.go
113 lines (91 loc) · 3.83 KB
/
dispatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package dispatch
import (
"context"
"fmt"
"github.com/rs/zerolog"
log "github.com/authzed/spicedb/internal/logging"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
)
// ReadyState represents the ready state of the dispatcher.
type ReadyState struct {
// Message is a human-readable status message for the current state.
Message string
// IsReady indicates whether the datastore is ready.
IsReady bool
}
// Dispatcher interface describes a method for passing subchecks off to additional machines.
type Dispatcher interface {
Check
Expand
ReachableResources
LookupResources
LookupSubjects
// Close closes the dispatcher.
Close() error
// ReadyState returns true when dispatcher is able to respond to requests
ReadyState() ReadyState
}
// Check interface describes just the methods required to dispatch check requests.
type Check interface {
// DispatchCheck submits a single check request and returns its result.
DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error)
}
// Expand interface describes just the methods required to dispatch expand requests.
type Expand interface {
// DispatchExpand submits a single expand request and returns its result.
DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error)
}
// ReachableResourcesStream is an alias for the stream to which reachable resources will be written.
type ReachableResourcesStream = Stream[*v1.DispatchReachableResourcesResponse]
// ReachableResources interface describes just the methods required to dispatch reachable resources requests.
type ReachableResources interface {
// DispatchReachableResources submits a single reachable resources request, writing its results to the specified stream.
DispatchReachableResources(
req *v1.DispatchReachableResourcesRequest,
stream ReachableResourcesStream,
) error
}
// LookupResourcesStream is an alias for the stream to which found resources will be written.
type LookupResourcesStream = Stream[*v1.DispatchLookupResourcesResponse]
// LookupResources interface describes just the methods required to dispatch LookupResources requests.
type LookupResources interface {
// DispatchLookupResources submits a single lookup request and returns its result.
DispatchLookupResources(
req *v1.DispatchLookupResourcesRequest,
stream LookupResourcesStream,
) error
}
// LookupSubjectsStream is an alias for the stream to which found subjects will be written.
type LookupSubjectsStream = Stream[*v1.DispatchLookupSubjectsResponse]
// LookupSubjects interface describes just the methods required to dispatch lookup subjects requests.
type LookupSubjects interface {
// DispatchLookupSubjects submits a single lookup subjects request, writing its results to the specified stream.
DispatchLookupSubjects(
req *v1.DispatchLookupSubjectsRequest,
stream LookupSubjectsStream,
) error
}
// DispatchableRequest is an interface for requests.
type DispatchableRequest interface {
zerolog.LogObjectMarshaler
GetMetadata() *v1.ResolverMeta
}
// CheckDepth returns ErrMaxDepth if there is insufficient depth remaining to dispatch.
func CheckDepth(ctx context.Context, req DispatchableRequest) error {
metadata := req.GetMetadata()
if metadata == nil {
log.Ctx(ctx).Warn().Object("request", req).Msg("request missing metadata")
return fmt.Errorf("request missing metadata")
}
if metadata.DepthRemaining == 0 {
return NewMaxDepthExceededError(req)
}
return nil
}
// AddResponseMetadata adds the metadata found in the incoming metadata to the existing
// metadata, *modifying it in place*.
func AddResponseMetadata(existing *v1.ResponseMeta, incoming *v1.ResponseMeta) {
existing.DispatchCount += incoming.DispatchCount
existing.CachedDispatchCount += incoming.CachedDispatchCount
existing.DepthRequired = max(existing.DepthRequired, incoming.DepthRequired)
}