/
wrapper.go
246 lines (230 loc) · 7.92 KB
/
wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package client
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"strings"
"time"
ratelimit "github.com/MovieStoreGuy/detector-doctor/pkg/rate_limit"
"github.com/MovieStoreGuy/detector-doctor/pkg/types"
"github.com/gorilla/websocket"
)
const (
// DefaultRealm is used when no realm is given as this is the default realm used by SignalFx
DefaultRealm = "us0"
// DefaultAPIEndpoint is the domain to be used when querying the API with the realm set
DefaultAPIEndpoint = `https://api.%s.signalfx.com/v2`
// DefaultStreamEndpoint is the domain to be used when using websockets to get data
DefaultStreamEndpoint = `https://stream.%s.signalfx.com/v2`
)
func toUnixMilliseconds(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}
// SignalFx is the wrapper around the developer API
type SignalFx struct {
api string
stream string
client *http.Client
limiter ratelimit.Limiter
requestFunc func(ctx context.Context, method, url string, body io.Reader) (*http.Request, error)
websocFunc func(ctx context.Context, url string) (*websocket.Conn, error)
}
// NewSignalFxClient returns a configured client that will interact with the API
// using the access token realm set. If client or realm are not configured the defaults are used
func NewSignalFxClient(realm, accessToken string, client *http.Client) *SignalFx {
sfx := &SignalFx{
client: client,
limiter: &ratelimit.NullLimiter{},
requestFunc: NewConfiguredRequestFunc(accessToken),
websocFunc: NewConfiguredWebsocketFunc(accessToken),
}
if realm == "" {
realm = DefaultRealm
}
sfx.api = fmt.Sprintf(DefaultAPIEndpoint, realm)
sfx.stream = fmt.Sprintf(DefaultStreamEndpoint, realm)
if sfx.client == nil {
sfx.client = NewConfiguredClient()
}
return sfx
}
// NewRateLimitedSignalFxClient returns a SignalFx client with a configured limiter to be used on each outbound request
func NewRateLimitedSignalFxClient(realm, accessToken string, client *http.Client, limit ratelimit.Limiter) *SignalFx {
sfx := NewSignalFxClient(realm, accessToken, client)
if limit != nil {
sfx.limiter = limit
}
return sfx
}
// makeRequest abstracts needing to handle the requests to and from SignalFx and returns the buffer read from the body
func (sfx *SignalFx) makeRequest(ctx context.Context, method string, data io.Reader, queryParams map[string]interface{}, pathByParts ...string) ([]byte, error) {
if err := sfx.limiter.Consume(ctx); err != nil {
return nil, err
}
domain, err := url.Parse(sfx.api)
if err != nil {
return nil, err
}
domain.Path = path.Join(domain.Path, path.Join(pathByParts...))
q := domain.Query()
for name, value := range queryParams {
q.Set(name, fmt.Sprint(value))
}
domain.RawQuery = q.Encode()
req, err := sfx.requestFunc(ctx, method, domain.String(), data)
if err != nil {
return nil, err
}
resp, err := sfx.client.Do(req)
if err != nil {
return nil, err
}
switch resp.StatusCode {
case http.StatusOK:
// Do Nothing
case http.StatusUnauthorized:
return nil, types.ErrFailedAuth
case http.StatusBadRequest:
return nil, types.ErrAPIIssue
case http.StatusNotFound:
return nil, types.ErrNoDetectorFound
}
return ioutil.ReadAll(resp.Body)
}
func (sfx *SignalFx) readStreamData(ctx context.Context, programText string, opts map[string]interface{}) ([]types.Message, []*types.MetricDataPoint, error) {
if err := sfx.limiter.Consume(ctx); err != nil {
return nil, nil, err
}
domain, err := url.Parse(sfx.stream)
if err != nil {
return nil, nil, err
}
domain.Path = path.Join(domain.Path, "signalflow/connect")
domain.Scheme = strings.Replace(domain.Scheme, "http", "ws", 1)
conn, err := sfx.websocFunc(ctx, domain.String())
if err != nil {
return nil, nil, err
}
defer conn.Close()
jobSpec := map[string]interface{}{
"type": "execute",
"channel": "detdoc-0",
"program": programText,
"timezone": "UTC",
}
for field, value := range opts {
if _, exist := jobSpec[field]; !exist && value != nil {
jobSpec[field] = value
}
}
now := time.Now().UTC()
if _, exist := jobSpec["start"]; !exist {
jobSpec["start"] = toUnixMilliseconds(now.Add(-1 * 24 * time.Hour))
}
if _, exist := jobSpec["stop"]; !exist {
if _, set := jobSpec["immediate"]; !set {
jobSpec["stop"] = toUnixMilliseconds(now)
}
}
err = conn.WriteJSON(jobSpec)
if err != nil {
return nil, nil, err
}
messages := make([]types.Message, 0)
datapoints := make([]*types.MetricDataPoint, 0)
for stopped := false; !stopped; {
msgType, message, err := conn.ReadMessage()
if err != nil {
return nil, nil, err
}
if errRead := types.ReadWebsocketError(message); errRead != nil {
return nil, nil, errRead
}
if types.IsEndofMessages(types.ReadControlMessage(message)) {
break
}
switch msgType {
case websocket.TextMessage:
if control := types.ReadControlMessage(message); control != nil {
if types.IsEndofMessages(control) {
stopped = true
break
}
} else if meta := types.ReadMetadataMessage(message); meta != nil {
messages = append(messages, meta)
} else if gen := types.ReadGeneralMessage(message); gen != nil {
messages = append(messages, gen)
}
case websocket.BinaryMessage:
dp, err := types.ReadMetricDataPoint(message)
if err != nil {
return nil, nil, err
}
datapoints = append(datapoints, dp)
case websocket.CloseMessage:
return messages, datapoints, nil
}
}
return messages, datapoints, nil
}
// GetAllDetectors will fetch all V2 detectors from the SignalFx api with a limit of fetching 100 per request
// in order to keep performance high.
func (sfx *SignalFx) GetAllDetectors(ctx context.Context) ([]*types.Detector, error) {
limit, count := 100, 100
results := make([]*types.Detector, 0)
for offset := 0; offset < count; offset += limit {
opts := map[string]interface{}{
"offset": offset,
"limit": limit,
}
buff, err := sfx.makeRequest(ctx, http.MethodGet, nil, opts, "detector")
if err != nil {
return nil, err
}
var bulk types.QueryResults
if err := json.Unmarshal(buff, &bulk); err != nil {
return nil, err
}
// Count is static given the query provided, it should
// never change once it has been updated the first time
count = int(bulk.Count)
for _, raw := range bulk.Results {
var det types.Detector
if err := json.Unmarshal(raw, &det); err != nil {
return nil, err
}
results = append(results, &det)
}
}
return results, nil
}
// GetDetectorByID retrives the provided detector as defined by https://developers.signalfx.com/detectors_reference.html#tag/Retrieve-Detector-ID
func (sfx *SignalFx) GetDetectorByID(ctx context.Context, detectorID string) (*types.Detector, error) {
buff, err := sfx.makeRequest(ctx, http.MethodGet, nil, nil, "detector", detectorID)
if err != nil {
return nil, err
}
var det types.Detector
return &det, json.Unmarshal(buff, &det)
}
// GetIncidentsByDetectorID retrives the provided incidents as defined by https://developers.signalfx.com/detectors_reference.html#tag/Retrieve-Incidents-Single-Detector
func (sfx *SignalFx) GetIncidentsByDetectorID(ctx context.Context, detectorID string, query map[string]interface{}) ([]*types.Incident, error) {
return nil, types.ErrNotImplemented
}
// GetMetricTimeSeries returns the messages and data provided by the websocket API.
// The list of allowed parameters are documented here: https://developers.signalfx.com/signalflow_analytics/websocket_request_messages.html#_syntax_2
// Some of the values are predefined for you to avoid causing issues with handling computation
// All time values passed should be in UTC and all time values will be configured to be int64 values
func (sfx *SignalFx) GetMetricTimeSeries(ctx context.Context, programText string, params map[string]interface{}) ([]types.Message, []*types.MetricDataPoint, error) {
for field, value := range params {
if t, cast := value.(time.Time); cast {
params[field] = toUnixMilliseconds(t)
}
}
return sfx.readStreamData(ctx, programText, params)
}