forked from influxdata/influxdb
/
service.go
230 lines (197 loc) · 6.8 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// Package httpd implements the HTTP service and REST API for InfluxDB.
package httpd // import "github.com/influxdata/influxdb/services/httpd"
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"path"
"runtime"
"strings"
"syscall"
"time"
"github.com/influxdata/influxdb/models"
"go.uber.org/zap"
)
// statistics gathered by the httpd package.
const (
statRequest = "req" // Number of HTTP requests served.
statQueryRequest = "queryReq" // Number of query requests served.
statWriteRequest = "writeReq" // Number of write requests serverd.
statPingRequest = "pingReq" // Number of ping requests served.
statStatusRequest = "statusReq" // Number of status requests served.
statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests.
statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses.
statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK.
statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine.
statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written.
statAuthFail = "authFail" // Number of authentication failures.
statRequestDuration = "reqDurationNs" // Number of (wall-time) nanoseconds spent inside requests.
statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests.
statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests.
statRequestsActive = "reqActive" // Number of currently active requests.
statWriteRequestsActive = "writeReqActive" // Number of currently active write requests.
statClientError = "clientError" // Number of HTTP responses due to client error.
statServerError = "serverError" // Number of HTTP responses due to server error.
statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler.
// Prometheus stats
statPromWriteRequest = "promWriteReq" // Number of write requests to the promtheus endpoint
statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint
)
// Service manages the listener and handler for an HTTP endpoint.
type Service struct {
ln net.Listener
addr string
https bool
cert string
key string
limit int
err chan error
unixSocket bool
bindSocket string
unixSocketListener net.Listener
Handler *Handler
Logger *zap.Logger
}
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
s := &Service{
addr: c.BindAddress,
https: c.HTTPSEnabled,
cert: c.HTTPSCertificate,
key: c.HTTPSPrivateKey,
limit: c.MaxConnectionLimit,
err: make(chan error),
unixSocket: c.UnixSocketEnabled,
bindSocket: c.BindSocket,
Handler: NewHandler(c),
Logger: zap.NewNop(),
}
if s.key == "" {
s.key = s.cert
}
s.Handler.Logger = s.Logger
return s
}
// Open starts the service.
func (s *Service) Open() error {
s.Logger.Info("Starting HTTP service", zap.Bool("authentication", s.Handler.Config.AuthEnabled))
s.Handler.Open()
// Open listener.
if s.https {
cert, err := tls.LoadX509KeyPair(s.cert, s.key)
if err != nil {
return err
}
listener, err := tls.Listen("tcp", s.addr, &tls.Config{
Certificates: []tls.Certificate{cert},
})
if err != nil {
return err
}
s.ln = listener
} else {
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.ln = listener
}
s.Logger.Info("Listening on HTTP",
zap.Stringer("addr", s.ln.Addr()),
zap.Bool("https", s.https))
// Open unix socket listener.
if s.unixSocket {
if runtime.GOOS == "windows" {
return fmt.Errorf("unable to use unix socket on windows")
}
if err := os.MkdirAll(path.Dir(s.bindSocket), 0777); err != nil {
return err
}
if err := syscall.Unlink(s.bindSocket); err != nil && !os.IsNotExist(err) {
return err
}
listener, err := net.Listen("unix", s.bindSocket)
if err != nil {
return err
}
s.Logger.Info("Listening on unix socket",
zap.Stringer("addr", listener.Addr()))
s.unixSocketListener = listener
go s.serveUnixSocket()
}
// Enforce a connection limit if one has been given.
if s.limit > 0 {
s.ln = LimitListener(s.ln, s.limit)
}
// wait for the listeners to start
timeout := time.Now().Add(time.Second)
for {
if s.ln.Addr() != nil {
break
}
if time.Now().After(timeout) {
return fmt.Errorf("unable to open without http listener running")
}
time.Sleep(10 * time.Millisecond)
}
// Begin listening for requests in a separate goroutine.
go s.serveTCP()
return nil
}
// Close closes the underlying listener.
func (s *Service) Close() error {
s.Handler.Close()
if s.ln != nil {
if err := s.ln.Close(); err != nil {
return err
}
}
if s.unixSocketListener != nil {
if err := s.unixSocketListener.Close(); err != nil {
return err
}
}
return nil
}
// WithLogger sets the logger for the service.
func (s *Service) WithLogger(log *zap.Logger) {
s.Logger = log.With(zap.String("service", "httpd"))
s.Handler.Logger = s.Logger
}
// Err returns a channel for fatal errors that occur on the listener.
func (s *Service) Err() <-chan error { return s.err }
// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
if s.ln != nil {
return s.ln.Addr()
}
return nil
}
// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
return s.Handler.Statistics(models.NewTags(map[string]string{"bind": s.addr}).Merge(tags).Map())
}
// BoundHTTPAddr returns the string version of the address that the HTTP server is listening on.
// This is useful if you start an ephemeral server in test with bind address localhost:0.
func (s *Service) BoundHTTPAddr() string {
return s.ln.Addr().String()
}
// serveTCP serves the handler from the TCP listener.
func (s *Service) serveTCP() {
s.serve(s.ln)
}
// serveUnixSocket serves the handler from the unix socket listener.
func (s *Service) serveUnixSocket() {
s.serve(s.unixSocketListener)
}
// serve serves the handler from the listener.
func (s *Service) serve(listener net.Listener) {
// The listener was closed so exit
// See https://github.com/golang/go/issues/4373
err := http.Serve(listener, s.Handler)
if err != nil && !strings.Contains(err.Error(), "closed") {
s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
}
}