/
upstreamer_options.go
150 lines (134 loc) · 4.84 KB
/
upstreamer_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package push
import (
"time"
"golang.org/x/time/rate"
)
// An UpstreamerOption represents a configuration option
// for the Upstreamer.
type UpstreamerOption func(*upstreamConfig)
type upstreamConfig struct {
overrideEndpointAddress string
exposePrivateAPIs bool
eventsAPIs map[string]string
latencySampleSize int
requiredServices []string
serviceTimeout time.Duration
serviceTimeoutCheckInterval time.Duration
peerTimeout time.Duration
peerTimeoutCheckInterval time.Duration
peerPingInterval time.Duration
randomizer Randomizer
tokenLimitingBurst int
tokenLimitingRPS rate.Limit
globalServiceTopic string
}
func newUpstreamConfig() upstreamConfig {
return upstreamConfig{
eventsAPIs: map[string]string{},
latencySampleSize: 20,
serviceTimeout: 30 * time.Second,
serviceTimeoutCheckInterval: 5 * time.Second,
peerTimeout: 30 * time.Second,
peerTimeoutCheckInterval: 5 * time.Second,
peerPingInterval: 10 * time.Second,
randomizer: newRandomizer(),
tokenLimitingBurst: 2000,
tokenLimitingRPS: 500,
}
}
// OptionUpstreamerExposePrivateAPIs configures the Upstreamer to expose
// the private APIs.
func OptionUpstreamerExposePrivateAPIs(enabled bool) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.exposePrivateAPIs = enabled
}
}
// OptionUpstreamerOverrideEndpointsAddresses configures the Upstreamer
// to always ignore what IP address the services are reporting
// and always use the provided address.
func OptionUpstreamerOverrideEndpointsAddresses(override string) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.overrideEndpointAddress = override
}
}
// OptionUpstreamerRegisterEventAPI registers an event API for the given serviceName
// on the given endpoint.
// For instance is serviceA exposes an event API on /events, you can use
//
// OptionUpstreamerRegisterEventAPI("serviceA", "events")
func OptionUpstreamerRegisterEventAPI(serviceName string, eventEndpoint string) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.eventsAPIs[serviceName] = eventEndpoint
}
}
// OptionRequiredServices sets the list of services
// that must be ready before starting the upstreamer.
func OptionRequiredServices(required []string) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.requiredServices = required
}
}
// OptionUpstreamerServiceTimeout sets the time to wait for the upstream
// to consider a service that did not ping to be outdated and removed
// in the case no goodbye was sent. Default is 30s.
// The check interval parameters defines how often the upstream
// will check for outdated services. The default is 5s.
func OptionUpstreamerServiceTimeout(timeout time.Duration, checkInterval time.Duration) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.serviceTimeout = timeout
cfg.serviceTimeoutCheckInterval = checkInterval
}
}
// OptionUpstreamerRandomizer set a custom Randomizer
// that must implement the Randomizer interface
// and be safe for concurrent use by multiple goroutines.
func OptionUpstreamerRandomizer(randomizer Randomizer) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.randomizer = randomizer
}
}
// OptionUpstreamerPeersTimeout sets for how long a peer ping
// should stay valid after receiving it.
// The default is 30s.
func OptionUpstreamerPeersTimeout(t time.Duration) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.peerTimeout = t
}
}
// OptionUpstreamerPeersCheckInterval sets the frequency at which the upstreamer
// will check for outdated peers.
// The default is 5s.
func OptionUpstreamerPeersCheckInterval(t time.Duration) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.peerTimeoutCheckInterval = t
}
}
// OptionUpstreamerPeersPingInterval sets how often the upstreamer will
// ping its peers.
// The default is 10s.
func OptionUpstreamerPeersPingInterval(t time.Duration) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.peerPingInterval = t
}
}
// OptionUpstreamerTokenRateLimiting configures the per source rate limiting.
// The default is cps:500/burst:2000
func OptionUpstreamerTokenRateLimiting(rps rate.Limit, burst int) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.tokenLimitingRPS = rps
cfg.tokenLimitingBurst = burst
if cfg.tokenLimitingRPS <= 0 {
panic("rps cannot be <= 0")
}
if cfg.tokenLimitingBurst <= 0 {
panic("burst cannot be <= 0")
}
}
}
// OptionUpstreamerGlobalServiceTopic sets the global topic that the gateway
// will use to listen for service pings coming from global services.
func OptionUpstreamerGlobalServiceTopic(topic string) UpstreamerOption {
return func(cfg *upstreamConfig) {
cfg.globalServiceTopic = topic
}
}