forked from knative/serving
/
main.go
339 lines (301 loc) · 9.82 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"context"
"encoding/gob"
"flag"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/knative/serving/cmd/util"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
"github.com/knative/serving/pkg/autoscaler"
h2cutil "github.com/knative/serving/pkg/h2c"
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/logging/logkey"
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/pkg/system"
"github.com/knative/serving/third_party/h2c"
"go.uber.org/zap"
"github.com/gorilla/websocket"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
const (
// Add a little buffer space between request handling and stat
// reporting so that latency in the stat pipeline doesn't
// interfere with request handling.
statReportingQueueLength = 10
// Add enough buffer to not block request serving on stats collection
requestCountingQueueLength = 100
// Number of seconds the /quitquitquit handler should wait before
// returning. The purpose is to kill the container alive a little
// bit longer, that it doesn't go away until the pod is truly
// removed from service.
quitSleepSecs = 20
// Single concurency queue depth. The maximum number of requests
// to enqueue before returing 503 overload.
singleConcurrencyQueueDepth = 10
)
var (
podName string
servingNamespace string
servingConfiguration string
// servingRevision is the revision name prepended with its namespace, e.g.
// namespace/name.
servingRevision string
servingAutoscaler string
servingAutoscalerPort string
statChan = make(chan *autoscaler.Stat, statReportingQueueLength)
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
kubeClient *kubernetes.Clientset
statSink *websocket.Conn
logger *zap.SugaredLogger
h2cProxy *httputil.ReverseProxy
httpProxy *httputil.ReverseProxy
concurrencyQuantumOfTime = flag.Duration("concurrencyQuantumOfTime", 100*time.Millisecond, "")
concurrencyModel = flag.String("concurrencyModel", string(v1alpha1.RevisionRequestConcurrencyModelMulti), "")
singleConcurrencyBreaker = queue.NewBreaker(singleConcurrencyQueueDepth, 1)
)
func initEnv() {
podName = util.GetRequiredEnvOrFatal("SERVING_POD", logger)
servingNamespace = util.GetRequiredEnvOrFatal("SERVING_NAMESPACE", logger)
servingConfiguration = util.GetRequiredEnvOrFatal("SERVING_CONFIGURATION", logger)
servingRevision = util.GetRequiredEnvOrFatal("SERVING_REVISION", logger)
servingAutoscaler = util.GetRequiredEnvOrFatal("SERVING_AUTOSCALER", logger)
servingAutoscalerPort = util.GetRequiredEnvOrFatal("SERVING_AUTOSCALER_PORT", logger)
}
func connectStatSink() {
autoscalerEndpoint := fmt.Sprintf("ws://%s.%s.svc.cluster.local:%s",
servingAutoscaler, system.Namespace, servingAutoscalerPort)
logger.Infof("Connecting to autoscaler at %s.", autoscalerEndpoint)
for {
// TODO: use exponential backoff here
time.Sleep(time.Second)
dialer := &websocket.Dialer{
HandshakeTimeout: 3 * time.Second,
}
conn, _, err := dialer.Dial(autoscalerEndpoint, nil)
if err != nil {
logger.Error("Retrying connection to autoscaler.", zap.Error(err))
} else {
logger.Info("Connected to stat sink.")
statSink = conn
waitForClose(conn)
}
}
}
func waitForClose(c *websocket.Conn) {
for {
if _, _, err := c.NextReader(); err != nil {
logger.Error("Error reading from websocket", zap.Error(err))
c.Close()
return
}
}
}
func statReporter() {
for {
s := <-statChan
if statSink == nil {
logger.Error("Stat sink not connected.")
continue
}
sm := autoscaler.StatMessage{
Stat: *s,
RevisionKey: servingRevision,
}
var b bytes.Buffer
enc := gob.NewEncoder(&b)
err := enc.Encode(sm)
if err != nil {
logger.Error("Failed to encode data from stats channel", zap.Error(err))
continue
}
err = statSink.WriteMessage(websocket.BinaryMessage, b.Bytes())
if err != nil {
logger.Error("Failed to write to stat sink.", zap.Error(err))
}
}
}
func proxyForRequest(req *http.Request) *httputil.ReverseProxy {
if req.ProtoMajor == 2 {
return h2cProxy
}
return httpProxy
}
func isProbe(r *http.Request) bool {
// Since K8s 1.8, prober requests have
// User-Agent = "kube-probe/{major-version}.{minor-version}".
return strings.HasPrefix(r.Header.Get("User-Agent"), "kube-probe/")
}
func handler(w http.ResponseWriter, r *http.Request) {
proxy := proxyForRequest(r)
if isProbe(r) {
// Do not count health checks for concurrency metrics
proxy.ServeHTTP(w, r)
return
}
// Metrics for autoscaling
reqChan <- queue.ReqIn
defer func() {
reqChan <- queue.ReqOut
}()
if *concurrencyModel == string(v1alpha1.RevisionRequestConcurrencyModelSingle) {
// Enforce single concurrency and breaking
ok := singleConcurrencyBreaker.Maybe(func() {
proxy.ServeHTTP(w, r)
})
if !ok {
http.Error(w, "overload", http.StatusServiceUnavailable)
}
} else {
proxy.ServeHTTP(w, r)
}
}
// healthServer registers whether a PreStop hook has been called.
type healthServer struct {
alive bool
mutex sync.RWMutex
}
// isAlive() returns true until a PreStop hook has been called.
func (h *healthServer) isAlive() bool {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.alive
}
// kill() marks that a PreStop hook has been called.
func (h *healthServer) kill() {
h.mutex.Lock()
h.alive = false
h.mutex.Unlock()
}
// healthHandler is used for readinessProbe/livenessCheck of
// queue-proxy.
func (h *healthServer) healthHandler(w http.ResponseWriter, r *http.Request) {
if h.isAlive() {
w.WriteHeader(http.StatusOK)
io.WriteString(w, "alive: true")
} else {
w.WriteHeader(http.StatusBadRequest)
io.WriteString(w, "alive: false")
}
}
// quitHandler() is used for preStop hook of queue-proxy. It:
// - marks the service as not ready, so that requests will no longer
// be routed to it,
// - adds a small delay, so that the container doesn't get killed at
// the same time the pod is marked for removal.
func (h *healthServer) quitHandler(w http.ResponseWriter, r *http.Request) {
// First, we want to mark the container as not ready, so that even
// if the pod removal (from service) isn't yet effective, the
// readinessCheck will still prevent traffic to be routed to this
// pod.
h.kill()
// However, since both readinessCheck and pod removal from service
// is eventually consistent, we add here a small delay to have the
// container stay alive a little bit longer after. We still have
// no guarantee that container termination is done only after
// removal from service is effective, but this has been showed to
// alleviate the issue.
time.Sleep(quitSleepSecs * time.Second)
w.WriteHeader(http.StatusOK)
io.WriteString(w, "alive: false")
}
// Sets up /health and /quitquitquit endpoints.
func setupAdminHandlers(server *http.Server) {
h := healthServer{
alive: true,
}
mux := http.NewServeMux()
mux.HandleFunc(fmt.Sprintf("/%s", queue.RequestQueueHealthPath), h.healthHandler)
mux.HandleFunc(fmt.Sprintf("/%s", queue.RequestQueueQuitPath), h.quitHandler)
server.Handler = mux
server.ListenAndServe()
}
func main() {
flag.Parse()
logger, _ = logging.NewLogger(os.Getenv("SERVING_LOGGING_CONFIG"), os.Getenv("SERVING_LOGGING_LEVEL"))
logger = logger.Named("queueproxy")
defer logger.Sync()
initEnv()
logger = logger.With(
zap.String(logkey.Namespace, servingNamespace),
zap.String(logkey.Configuration, servingConfiguration),
zap.String(logkey.Revision, servingRevision),
zap.String(logkey.Pod, podName))
target, err := url.Parse("http://localhost:8080")
if err != nil {
logger.Fatal("Failed to parse localhost url", zap.Error(err))
}
httpProxy = httputil.NewSingleHostReverseProxy(target)
h2cProxy = httputil.NewSingleHostReverseProxy(target)
h2cProxy.Transport = h2cutil.NewTransport()
logger.Infof("Queue container is starting, concurrencyModel: %s", *concurrencyModel)
config, err := rest.InClusterConfig()
if err != nil {
logger.Fatal("Error getting in cluster config", zap.Error(err))
}
kc, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Fatal("Error creating new config", zap.Error(err))
}
kubeClient = kc
go connectStatSink()
go statReporter()
bucketTicker := time.NewTicker(*concurrencyQuantumOfTime).C
reportTicker := time.NewTicker(time.Second).C
queue.NewStats(podName, queue.Channels{
ReqChan: reqChan,
QuantizationChan: bucketTicker,
ReportChan: reportTicker,
StatChan: statChan,
})
defer func() {
if statSink != nil {
statSink.Close()
}
}()
adminServer := &http.Server{
Addr: fmt.Sprintf(":%d", queue.RequestQueueAdminPort),
Handler: nil,
}
h2cServer := h2c.Server{Server: &http.Server{
Addr: fmt.Sprintf(":%d", queue.RequestQueuePort),
Handler: http.HandlerFunc(handler),
}}
// Add a SIGTERM handler to gracefully shutdown the servers during
// pod termination.
sigTermChan := make(chan os.Signal)
signal.Notify(sigTermChan, syscall.SIGTERM)
go func() {
<-sigTermChan
// Calling server.Shutdown() allows pending requests to
// complete, while no new work is accepted.
h2cServer.Shutdown(context.Background())
adminServer.Shutdown(context.Background())
os.Exit(0)
}()
go h2cServer.ListenAndServe()
setupAdminHandlers(adminServer)
}