/
main.go
135 lines (110 loc) · 3.21 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package main
import (
"net"
"os"
"github.com/codegangsta/cli"
)
const (
connType = "tcp"
connHost = "localhost"
connPort = "5667"
defaultConfLocation = "/etc/nbad/conf.json"
errBinding = 1
errAccptIncomingConn = 2
)
func main() {
app := cli.NewApp()
app.Name = "nbad"
app.Usage = "NSCA Buffering Agent (daemon) - Emulates NSCA interface as local buffer/proxy"
var configFile string
var trace bool
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "config, c",
Value: defaultConfLocation,
Usage: "Location of config file on disk",
Destination: &configFile,
},
cli.BoolFlag{
Name: "trace, t",
EnvVar: "NBAD_TRACE",
Usage: "Turn on trace-logging",
Destination: &trace,
},
}
app.Version = "1.0"
app.Action = func(c *cli.Context) {
// load configuration
InitConfig(configFile, TempLogger("STARTUP"))
Config().TraceLogging = trace
startServer()
}
app.Run(os.Args)
}
func startServer() {
Logger().Info.Println("Starting up NBAd")
address := connHost + ":" + connPort
listener, err := net.Listen(connType, address)
if err != nil {
Logger().Error.Println("Could not bind to "+address, err.Error())
os.Exit(errBinding)
}
Logger().Info.Printf("Listening at %s\n", address)
// close listener on program exit
defer listener.Close()
// sping up message registry
messageChannel := startGateway()
// listen for incoming connections
for {
conn, err := listener.Accept()
if err != nil {
Logger().Error.Println("Error accepting connection", err.Error())
// FIXME should we do more than just print out an error here?
}
go handleIncomingConn(conn, messageChannel)
}
}
// handles incoming requests
func handleIncomingConn(conn net.Conn, messageChannel chan *GatewayEvent) {
defer conn.Close()
// TODO send an initialization message (see https://github.com/Syncbak-Git/nsca/blob/master/packet.go#L163)
// Make a buffer to hold incoming data.
buf := make([]byte, 1024)
// Read the incoming connection into the buffer.
n, err := conn.Read(buf)
if err != nil {
Logger().Warning.Println("Error reading incoming request", err.Error())
return
}
// attempt to parse the message
if n < 1024 {
buf = buf[:n]
}
message, err := parseMessage(buf)
// continue down processing pipeline
if err != nil {
Logger().Warning.Println("Failed to parse message", err.Error())
// TODO: determine how to send proper error response
conn.Write([]byte("Message could not be processed."))
} else {
Logger().Trace.Printf("Processing message: %v\n", message)
messageChannel <- newMessageEvent(message)
}
conn.Close()
}
// Starts a gateway process. Returns a channel to send new messages to the gateway.
func startGateway() chan *GatewayEvent {
// channel for sending new messages to the Gateway
gatewayChan := make(chan *GatewayEvent, Config().GatewayMessageBufferSize)
registry := &Registry{
cache: make(map[string]*MessageEntry),
ttlInSeconds: Config().MessageCacheTTLInSeconds,
initBufferTTLInSeconds: Config().MessageInitBufferTimeSeconds,
}
gateway := newGateway(registry, gatewayChan)
go gateway.run()
return gatewayChan
}
func newMessageEvent(m *Message) *GatewayEvent {
return &GatewayEvent{message: m}
}