-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
143 lines (125 loc) · 4.76 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"crypto/md5"
"flag"
"fmt"
"github.com/bitly/nsq/nsq"
"github.com/bitly/nsq/util"
"hash/crc32"
"io"
"log"
"net"
"os"
"os/signal"
"regexp"
"strconv"
"strings"
"syscall"
"time"
)
var (
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")
memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)")
maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling")
syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs")
msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message")
maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes")
maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body")
maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout")
maxHeartbeatInterval = flag.Duration("max-heartbeat-interval", 60*time.Second, "maximum duration of time between heartbeats that a client can configure")
maxRdyCount = flag.Int64("max-rdy-count", 2500, "maximum RDY count for a single client")
dataPath = flag.String("data-path", "", "path to store disk-backed messages")
workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)")
verbose = flag.Bool("verbose", false, "enable verbose logging")
statsdAddress = flag.String("statsd-address", "", "UDP <addr>:<port> of a statsd daemon for writing stats")
statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd")
broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd, (default to the OS hostname)")
lookupdTCPAddrs = util.StringArray{}
)
func init() {
flag.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
}
var nsqd *NSQd
var protocols = map[string]nsq.Protocol{}
func main() {
flag.Parse()
hostname, err := os.Hostname()
if err != nil {
log.Fatal(err)
}
if *showVersion {
fmt.Println(util.Version("nsqd"))
return
}
if *workerId == 0 {
h := md5.New()
io.WriteString(h, hostname)
*workerId = int64(crc32.ChecksumIEEE(h.Sum(nil)) % 1024)
}
tcpAddr, err := net.ResolveTCPAddr("tcp", *tcpAddress)
if err != nil {
log.Fatal(err)
}
httpAddr, err := net.ResolveTCPAddr("tcp", *httpAddress)
if err != nil {
log.Fatal(err)
}
if *broadcastAddress == "" {
*broadcastAddress = hostname
}
log.Println(util.Version("nsqd"))
log.Printf("worker id %d", *workerId)
exitChan := make(chan int)
signalChan := make(chan os.Signal, 1)
go func() {
<-signalChan
exitChan <- 1
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
if *statsdAddress != "" {
underHostname := fmt.Sprintf("%s_%d", strings.Replace(hostname, ".", "_", -1), httpAddr.Port)
prefix := fmt.Sprintf("nsq.%s.", underHostname)
go statsdLoop(*statsdAddress, prefix, *statsdInterval)
}
// for backwards compatibility if --msg-timeout only
// contains numbers then default to ms
var msgTimeoutDuration time.Duration
if regexp.MustCompile(`^[0-9]+$`).MatchString(*msgTimeout) {
intMsgTimeout, err := strconv.Atoi(*msgTimeout)
if err != nil {
log.Fatalf("ERROR: failed to Atoi --msg-timeout %s - %s", *msgTimeout, err.Error())
}
msgTimeoutDuration = time.Duration(intMsgTimeout) * time.Millisecond
} else {
msgTimeoutDuration, err = time.ParseDuration(*msgTimeout)
if err != nil {
log.Fatalf("ERROR: failed to ParseDuration --msg-timeout %s - %s", *msgTimeout, err.Error())
}
}
options := NewNsqdOptions()
options.maxRdyCount = *maxRdyCount
options.maxMessageSize = *maxMessageSize
options.maxBodySize = *maxBodySize
options.memQueueSize = *memQueueSize
options.dataPath = *dataPath
options.maxBytesPerFile = *maxBytesPerFile
options.syncEvery = *syncEvery
options.msgTimeout = msgTimeoutDuration
options.maxMsgTimeout = *maxMsgTimeout
options.broadcastAddress = *broadcastAddress
options.maxHeartbeatInterval = *maxHeartbeatInterval
nsqd = NewNSQd(*workerId, options)
nsqd.tcpAddr = tcpAddr
nsqd.httpAddr = httpAddr
nsqd.lookupdTCPAddrs = lookupdTCPAddrs
nsqd.LoadMetadata()
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-exitChan
nsqd.Exit()
}