-
Notifications
You must be signed in to change notification settings - Fork 0
NSQ golang MQ 3
NSQ分三大模块:nsqd、nsqlookupd、nsqadmin,从哪个开始着手很重要。
本人阅读源码习惯从功能少的、尽量闭环的程序着手。纵观上面三个模块,最提纲挈领,最有桥梁作用的应该是nsqlookupd,但nsqlookupd仍然有很多的功能开发,在没有搞清楚其功能之前,不适合直接阅读。因此,我把第一个阅读的源码定为nsq-to-file。
注意:本地源码地址为:github.com/nsqio/nsq/apps/nsq_to_file
nsq-to-file作为NSQ的一种客户端,其实是在go-nsq的基础上做的功能开发。
两个文件:nsq_to_file.go 和 strftime.go,均为main package。
先找到main函数
func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Parse()第一个函数调用flag.Parse()是golang标准库中的命令行参数解析,可以追出允许使用什么命令行参数。
nsq_to_file肚子opt参数设置:
var (
showVersion = flag.Bool("version", false, "print version string")
channel = flag.String("channel", "nsq_to_file", "nsq channel")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
outputDir = flag.String("output-dir", "/tmp", "directory to write output files to")
datetimeFormat = flag.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
filenameFormat = flag.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")
hostIdentifier = flag.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens")
gzipLevel = flag.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
gzipEnabled = flag.Bool("gzip", false, "gzip output files.")
skipEmptyFiles = flag.Bool("skip-empty-files", false, "Skip writing empty files")
topicPollRate = flag.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed")
topicPattern = flag.String("topic-pattern", ".*", "Only log topics matching the following pattern")
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
// TODO: remove, deprecated
gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)")
)最近两个是reader-opt和consumer-opt,其实功能相同,建议使用consumer-opt。
这个命令行功能是为go-nsq设置config参数,因为nsq_to_file是在其基础上开发出来的。而且参数说明中也说了,有可能已经被设置好了的。
以下是config结构体的定义(文件:github.com/nsqio/go-nsq/config.go):
type Config struct {
initialized bool
// used to Initialize, Validate
configHandlers []configHandler
DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
// LocalAddr is the local address to use when dialing an nsqd.
// If empty, a local address is automatically chosen.
LocalAddr net.Addr `opt:"local_addr"`
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
// Duration to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`
// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
// To set TLS config, use the following options:
//
// tls_v1 - Bool enable TLS negotiation
// tls_root_ca_file - String path to file containing root CA
// tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
// tls_cert - String path to file containing public key for certificate
// tls_key - String path to file containing private key for certificate
// tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
//
TlsV1 bool `opt:"tls_v1"`
TlsConfig *tls.Config `opt:"tls_config"`
// Compression Settings
Deflate bool `opt:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
Snappy bool `opt:"snappy"`
// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
}从这里看出,使用golang默认标识的方式,已经设置好了参数的key(即opt后的值)和default值了。暂且不看各功能的含义(太多了),我们先看如何设置。
原文解释:
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).
Use Set(option string, value interface{}) as an alternate way to set parameters.
Set的代码:
// Set takes a comma separated value and follows the rules in Config.Set
// using the first field as the option key, and the second (if present) as the value
func (c *ConfigFlag) Set(opt string) (err error) {
parts := strings.SplitN(opt, ",", 2)
key := parts[0]
switch len(parts) {
case 1:
// default options specified without a value to boolean true
err = c.Config.Set(key, true)
case 2:
err = c.Config.Set(key, parts[1])
}
return
}可以看出,对命令行的要求是“key,value”,即使用“,”分隔。
如设置timeout的启动命令:
./nsq_to_file --lookupd-http-address=127.0.0.1:4161 --consumer-opt="read_timeout,100s"
继续main函数代码阅读
if *showVersion { //显示版本信息
fmt.Printf("nsq_to_file v%s\n", version.Binary)
return
}
if *channel == "" { //要求必须设置channel名称
log.Fatal("--channel is required")
}showVersion没什么好说的,关键是channelopt。
通过之前的准备工作,了解到NSQ消息分发是无单点的分布式拓扑结构,其中Topic和channel是其重要的组成单元,一个Topic下的信息会同时发送给不同的channel中,而同一个channel下的多个消费者中,只有一个可以获得此channel下的消息。
做一个测试:
再开启一个消费者终端,与之前不同的是,此终端监听Channel为"test-channel":
nsq_to_file --lookupd-http-address=127.0.0.1:4161 --channel="test-channel" --output-dir=tmp
向NSQ-http端口发送消息:
curl -d "hello world test5" "http://127.0.0.1:4151/put?topic=test"
可以看到输出文件中,有两行hello world test5,这是因为channel nsq_to_file(默认channel)和test-channel中都会有此条消息,而虽然channel nsq_to_file中有三个消费者在等待接收,但只有一个消费者获取到,其余继续等待。test-channel中的消费者获取信息后,也会写到/tmp目录下的文件中,因此两条消息均会被记录。
明白channel的作用后,在继续阅读channel的使用代码之前,将main后面几个同类型opt判定也看完:
var topicsFromNSQLookupd bool //判断消息来源是否为NSQ-LOOKUPD
if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.")
}
if len(nsqdTCPAddrs) != 0 && len(lookupdHTTPAddrs) != 0 {
log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both")
}
//要求必须从TCP端口与HTTP端口中必须且只能监听其中一个,不可同时监听,也不可都不监听
if *gzipLevel < 1 || *gzipLevel > 9 {
log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel)
}
// TODO: remove, deprecated
if hasArg("gzip-compression") {
log.Printf("WARNING: --gzip-compression is deprecated in favor of --gzip-level")
switch *gzipCompression {
case 1:
*gzipLevel = gzip.BestSpeed
case 2:
*gzipLevel = gzip.BestCompression
case 3:
*gzipLevel = gzip.DefaultCompression
default:
log.Fatalf("invalid --gzip-compression value (%d), should be 1,2,3", *gzipCompression)
}
} //根据之前设置的压缩等级进行压缩
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight //允许缓存的消息条数上限