-
Notifications
You must be signed in to change notification settings - Fork 0
/
option_nsq.go
172 lines (152 loc) · 6.04 KB
/
option_nsq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
* @Author: lwnmengjing
* @Date: 2021/5/31 9:10 上午
* @Last Modified by: lwnmengjing
* @Last Modified time: 2021/5/31 9:10 上午
*/
package config
import (
"time"
"github.com/nsqio/go-nsq"
)
type NSQOptions struct {
DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
// Addresses is the local address to use when dialing an nsqd.
Addresses []string `opt:"addresses"`
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
// Duration to wait for a message from an nsqd when in a state where RDY
// counts are re-distributed (e.g. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
// Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`
// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
Tls *Tls `yaml:"tls" json:"tls"`
// Compression Settings
Deflate bool `opt:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
Snappy bool `opt:"snappy"`
// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
}
func (e NSQOptions) GetNSQOptions() (*nsq.Config, error) {
cfg := nsq.NewConfig()
var err error
cfg.TlsConfig, err = getTLS(e.Tls)
if err != nil {
return nil, err
}
if e.DialTimeout > 0 {
cfg.DialTimeout = e.DialTimeout * time.Second
}
if e.ReadTimeout > 0 {
cfg.ReadTimeout = e.ReadTimeout * time.Second
}
if e.WriteTimeout > 0 {
cfg.WriteTimeout = e.WriteTimeout * time.Second
}
if e.LookupdPollInterval > 0 {
cfg.LookupdPollInterval = e.LookupdPollInterval * time.Second
}
if e.MaxRequeueDelay > 0 {
cfg.MaxRequeueDelay = e.MaxRequeueDelay * time.Second
}
if e.DefaultRequeueDelay > 0 {
cfg.DefaultRequeueDelay = e.DefaultRequeueDelay * time.Second
}
if e.MaxBackoffDuration > 0 {
cfg.MaxBackoffDuration = e.MaxBackoffDuration * time.Millisecond
}
if e.BackoffMultiplier > 0 {
cfg.BackoffMultiplier = e.BackoffMultiplier * time.Second
}
if e.LowRdyIdleTimeout > 0 {
cfg.LowRdyIdleTimeout = e.LowRdyIdleTimeout * time.Second
}
if e.LowRdyTimeout > 0 {
cfg.LowRdyTimeout = e.LowRdyTimeout * time.Second
}
if e.RDYRedistributeInterval > 0 {
cfg.RDYRedistributeInterval = e.RDYRedistributeInterval * time.Second
}
if e.HeartbeatInterval > 0 {
cfg.HeartbeatInterval = e.HeartbeatInterval * time.Second
}
if e.OutputBufferTimeout > 0 {
cfg.OutputBufferTimeout = e.OutputBufferTimeout * time.Second
}
if e.MsgTimeout > 0 {
cfg.MsgTimeout = e.MsgTimeout * time.Second
}
if e.LookupdPollJitter > 0 {
cfg.LookupdPollJitter = e.LookupdPollJitter
}
cfg.MaxAttempts = e.MaxAttempts
if e.ClientID != "" {
cfg.ClientID = e.ClientID
}
if e.Hostname != "" {
cfg.Hostname = e.Hostname
}
if e.UserAgent != "" {
cfg.UserAgent = e.UserAgent
}
if e.SampleRate > 0 {
cfg.SampleRate = e.SampleRate
}
cfg.Deflate = e.Deflate
if e.DeflateLevel >= 6 && e.DeflateLevel <= 9 {
cfg.DeflateLevel = e.DeflateLevel
}
cfg.Snappy = e.Snappy
if e.OutputBufferSize > 0 {
cfg.OutputBufferSize = e.OutputBufferSize
}
if e.MaxInFlight > 0 {
cfg.MaxInFlight = e.MaxInFlight
}
if e.AuthSecret != "" {
cfg.AuthSecret = e.AuthSecret
}
return cfg, nil
}