-
Notifications
You must be signed in to change notification settings - Fork 246
/
cluster.go
83 lines (70 loc) · 2.51 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package cluster
import (
"time"
"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/keys"
"github.com/authzed/spicedb/internal/dispatch/singleflight"
"github.com/authzed/spicedb/pkg/cache"
)
// Option is a function-style option for configuring a combined Dispatcher.
type Option func(*optionState)
type optionState struct {
metricsEnabled bool
prometheusSubsystem string
cache cache.Cache
concurrencyLimits graph.ConcurrencyLimits
remoteDispatchTimeout time.Duration
}
// MetricsEnabled enables issuing prometheus metrics
func MetricsEnabled(enabled bool) Option {
return func(state *optionState) {
state.metricsEnabled = enabled
}
}
// PrometheusSubsystem sets the subsystem name for the prometheus metrics
func PrometheusSubsystem(name string) Option {
return func(state *optionState) {
state.prometheusSubsystem = name
}
}
// Cache sets the cache for the remote dispatcher.
func Cache(c cache.Cache) Option {
return func(state *optionState) {
state.cache = c
}
}
// ConcurrencyLimits sets the max number of goroutines per operation
func ConcurrencyLimits(limits graph.ConcurrencyLimits) Option {
return func(state *optionState) {
state.concurrencyLimits = limits
}
}
// RemoteDispatchTimeout sets the maximum timeout for a remote dispatch.
// Defaults to 60s (as defined in the remote dispatcher).
func RemoteDispatchTimeout(remoteDispatchTimeout time.Duration) Option {
return func(state *optionState) {
state.remoteDispatchTimeout = remoteDispatchTimeout
}
}
// NewClusterDispatcher takes a dispatcher (such as one created by
// combined.NewDispatcher) and returns a cluster dispatcher suitable for use as
// the dispatcher for the dispatch grpc server.
func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (dispatch.Dispatcher, error) {
var opts optionState
for _, fn := range options {
fn(&opts)
}
clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits)
clusterDispatch = singleflight.New(clusterDispatch, &keys.CanonicalKeyHandler{})
if opts.prometheusSubsystem == "" {
opts.prometheusSubsystem = "dispatch"
}
cachingClusterDispatch, err := caching.NewCachingDispatcher(opts.cache, opts.metricsEnabled, opts.prometheusSubsystem, &keys.CanonicalKeyHandler{})
if err != nil {
return nil, err
}
cachingClusterDispatch.SetDelegate(clusterDispatch)
return cachingClusterDispatch, nil
}