forked from DataDog/datadog-agent
-
Notifications
You must be signed in to change notification settings - Fork 2
/
backend.go
158 lines (138 loc) · 3.69 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package test
import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"sync/atomic"
"time"
"github.com/StackVista/stackstate-agent/pkg/trace/pb"
"github.com/gogo/protobuf/proto"
"github.com/tinylib/msgp/msgp"
)
// defaultBackendAddress is the default listening address for the fake
// backend.
const defaultBackendAddress = "localhost:8888"
// defaultChannelSize is the default size of the buffered channel
// receiving any payloads sent by the trace-agent to the backend.
const defaultChannelSize = 100
type fakeBackend struct {
started uint64 // 0 if server is stopped
out chan interface{} // payload output
srv http.Server
}
func newFakeBackend(channelSize int) *fakeBackend {
size := defaultChannelSize
if channelSize != 0 {
size = channelSize
}
fb := fakeBackend{
out: make(chan interface{}, size),
}
mux := http.NewServeMux()
mux.HandleFunc("/api/v0.2/traces", fb.handleTraces)
mux.HandleFunc("/api/v0.2/stats", fb.handleStats)
mux.HandleFunc("/_health", fb.handleHealth)
fb.srv = http.Server{
Addr: defaultBackendAddress,
Handler: mux,
}
return &fb
}
func (s *fakeBackend) Start() error {
if atomic.LoadUint64(&s.started) > 0 {
// already running
return nil
}
go func() {
atomic.StoreUint64(&s.started, 1)
defer atomic.StoreUint64(&s.started, 0)
if err := s.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("server: %v", err)
}
}()
timeout := time.After(5 * time.Second)
for {
select {
case <-timeout:
return errors.New("server: timed out out waiting for start")
default:
resp, err := http.Get(fmt.Sprintf("http://%s/_health", s.srv.Addr))
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
time.Sleep(5 * time.Millisecond)
}
}
}
func (s *fakeBackend) Out() <-chan interface{} { return s.out }
// Shutdown shuts down the backend and stops any running agent.
func (s *fakeBackend) Shutdown(wait time.Duration) error {
defer close(s.out)
ctx, cancel := context.WithTimeout(context.Background(), wait)
defer cancel()
return s.srv.Shutdown(ctx)
}
func (s *fakeBackend) handleHealth(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (s *fakeBackend) handleStats(w http.ResponseWriter, req *http.Request) {
var payload pb.StatsPayload
if err := readMsgPRequest(req, &payload); err != nil {
log.Println("server: error reading stats: ", err)
}
s.out <- payload
}
func (s *fakeBackend) handleTraces(w http.ResponseWriter, req *http.Request) {
var payload pb.AgentPayload
if err := readProtoRequest(req, &payload); err != nil {
log.Println("server: error reading traces: ", err)
}
s.out <- payload
}
func readMsgPRequest(req *http.Request, msg msgp.Decodable) error {
rc, err := readCloserFromRequest(req)
if err != nil {
return err
}
defer rc.Close()
return msgp.Decode(rc, msg)
}
func readProtoRequest(req *http.Request, msg proto.Message) error {
rc, err := readCloserFromRequest(req)
if err != nil {
return err
}
slurp, err := ioutil.ReadAll(rc)
defer rc.Close()
if err != nil {
return err
}
return proto.Unmarshal(slurp, msg)
}
func readCloserFromRequest(req *http.Request) (io.ReadCloser, error) {
rc := struct {
io.Reader
io.Closer
}{
Reader: req.Body,
Closer: req.Body,
}
if req.Header.Get("Accept-Encoding") == "gzip" {
gz, err := gzip.NewReader(req.Body)
if err != nil {
return nil, err
}
defer gz.Close()
rc.Reader = gz
}
return rc, nil
}