forked from sensu/sensu-go
/
agentd.go
183 lines (159 loc) · 4.42 KB
/
agentd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package agentd
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/sensu/sensu-go/backend/apid/middlewares"
"github.com/sensu/sensu-go/backend/messaging"
"github.com/sensu/sensu-go/backend/ringv2"
"github.com/sensu/sensu-go/backend/store"
"github.com/sensu/sensu-go/transport"
"github.com/sensu/sensu-go/types"
)
var (
// upgrader is safe for concurrent use, and we don't need any particularly
// specialized configurations for different uses.
upgrader = &websocket.Upgrader{}
)
// Store specifies storage requirements for Agentd.
type Store interface {
middlewares.AuthStore
SessionStore
}
// Agentd is the backend HTTP API.
type Agentd struct {
// Host is the hostname Agentd is running on.
Host string
// Port is the port Agentd is running on.
Port int
stopping chan struct{}
running *atomic.Value
wg *sync.WaitGroup
errChan chan error
httpServer *http.Server
store Store
bus messaging.MessageBus
tls *types.TLSOptions
ringPool *ringv2.Pool
}
// Config configures an Agentd.
type Config struct {
Host string
Port int
Bus messaging.MessageBus
Store store.Store
TLS *types.TLSOptions
RingPool *ringv2.Pool
}
// Option is a functional option.
type Option func(*Agentd) error
// New creates a new Agentd.
func New(c Config, opts ...Option) (*Agentd, error) {
a := &Agentd{
Host: c.Host,
Port: c.Port,
bus: c.Bus,
store: c.Store,
tls: c.TLS,
stopping: make(chan struct{}, 1),
running: &atomic.Value{},
wg: &sync.WaitGroup{},
errChan: make(chan error, 1),
ringPool: c.RingPool,
}
// prepare server TLS config
tlsServerConfig, err := c.TLS.ToServerTLSConfig()
if err != nil {
return nil, err
}
handler := middlewares.BasicAuthentication(http.HandlerFunc(a.webSocketHandler), a.store)
a.httpServer = &http.Server{
Addr: fmt.Sprintf("%s:%d", a.Host, a.Port),
Handler: handler,
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
TLSConfig: tlsServerConfig,
}
for _, o := range opts {
if err := o(a); err != nil {
return nil, err
}
}
return a, nil
}
// Start Agentd.
func (a *Agentd) Start() error {
logger.Info("starting agentd on address: ", a.httpServer.Addr)
a.wg.Add(1)
go func() {
defer a.wg.Done()
var err error
if a.tls != nil {
// TLS configuration comes from ToServerTLSConfig
err = a.httpServer.ListenAndServeTLS("", "")
} else {
err = a.httpServer.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
logger.WithError(err).Error("failed to start http/https server")
}
}()
return nil
}
// Stop Agentd.
func (a *Agentd) Stop() error {
if err := a.httpServer.Shutdown(context.TODO()); err != nil {
// failure/timeout shutting down the server gracefully
logger.Error("failed to shutdown http server gracefully - forcing shutdown")
if closeErr := a.httpServer.Close(); closeErr != nil {
logger.Error("failed to shutdown http server forcefully")
}
}
a.running.Store(false)
close(a.stopping)
a.wg.Wait()
close(a.errChan)
return nil
}
// Err returns a channel to listen for terminal errors on.
func (a *Agentd) Err() <-chan error {
return a.errChan
}
// Name returns the daemon name
func (a *Agentd) Name() string {
return "agentd"
}
func (a *Agentd) webSocketHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.WithField("addr", r.RemoteAddr).WithError(err).Error("transport error on websocket upgrade")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
cfg := SessionConfig{
AgentAddr: r.RemoteAddr,
AgentName: r.Header.Get(transport.HeaderKeyAgentName),
Namespace: r.Header.Get(transport.HeaderKeyNamespace),
User: r.Header.Get(transport.HeaderKeyUser),
Subscriptions: strings.Split(r.Header.Get(transport.HeaderKeySubscriptions), ","),
RingPool: a.ringPool,
}
cfg.Subscriptions = addEntitySubscription(cfg.AgentName, cfg.Subscriptions)
session, err := NewSession(cfg, transport.NewTransport(conn), a.bus, a.store)
if err != nil {
logger.WithError(err).Error("failed to create session")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = session.Start()
if err != nil {
logger.WithError(err).Error("failed to start session")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}