forked from deislabs/osiris
-
Notifications
You must be signed in to change notification settings - Fork 8
/
proxy.go
113 lines (101 loc) · 2.84 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package proxy
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/golang/glog"
uuid "github.com/satori/go.uuid"
"github.com/dailymotion-oss/osiris/pkg/healthz"
"github.com/dailymotion-oss/osiris/pkg/metrics"
)
type Proxy interface {
Run(ctx context.Context)
}
type proxy struct {
proxyID string
requestCount *uint64
singlePortProxies []*singlePortProxy
healthzAndMetricsSvr *http.Server
ignoredPaths map[string]struct{}
}
func NewProxy(cfg Config) (Proxy, error) {
var requestCount uint64
healthzAndMetricsMux := http.NewServeMux()
p := &proxy{
proxyID: uuid.NewV4().String(),
requestCount: &requestCount,
singlePortProxies: []*singlePortProxy{},
healthzAndMetricsSvr: &http.Server{
Addr: fmt.Sprintf(":%d", cfg.MetricsAndHealthPort),
Handler: healthzAndMetricsMux,
},
ignoredPaths: cfg.IgnoredPaths,
}
for proxyPort, appPort := range cfg.PortMappings {
singlePortProxy, err :=
newSinglePortProxy(proxyPort, appPort, p.requestCount, p.ignoredPaths)
if err != nil {
return nil, err
}
p.singlePortProxies = append(
p.singlePortProxies,
singlePortProxy,
)
}
healthzAndMetricsMux.HandleFunc("/metrics", p.handleMetricsRequest)
healthzAndMetricsMux.HandleFunc("/healthz", healthz.HandleHealthCheckRequest)
return p, nil
}
func (p *proxy) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
// Start proxies for each port
for _, spp := range p.singlePortProxies {
go func(spp *singlePortProxy) {
spp.run(ctx)
cancel()
}(spp)
}
doneCh := make(chan struct{})
go func() {
select {
case <-ctx.Done(): // Context was canceled or expired
glog.Info("Healthz and metrics server is shutting down")
// Allow up to five seconds for requests in progress to be completed
shutdownCtx, shutdownCancel := context.WithTimeout(
context.Background(),
time.Second*5,
)
defer shutdownCancel()
p.healthzAndMetricsSvr.Shutdown(shutdownCtx) // nolint: errcheck
case <-doneCh: // The server shut down on its own, perhaps due to error
}
cancel()
}()
glog.Infof(
"Healthz and metrics server is listening on %s",
p.healthzAndMetricsSvr.Addr,
)
err := p.healthzAndMetricsSvr.ListenAndServe()
if err != http.ErrServerClosed {
glog.Errorf("Error from healthz and metrics server: %s", err)
}
close(doneCh)
}
func (p *proxy) handleMetricsRequest(w http.ResponseWriter, _ *http.Request) {
prc := metrics.ProxyRequestCount{
ProxyID: p.proxyID,
RequestCount: atomic.LoadUint64(p.requestCount),
}
prcBytes, err := json.Marshal(prc)
if err != nil {
glog.Errorf("Error marshaling metrics request response: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := w.Write(prcBytes); err != nil {
glog.Errorf("Error writing metrics request response body: %s", err)
}
}