-
Notifications
You must be signed in to change notification settings - Fork 420
/
transport.go
90 lines (83 loc) · 2.38 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package datastreams
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"net/http"
"net/url"
"runtime"
"strings"
"github.com/DataDog/dd-trace-go/v2/internal"
"github.com/tinylib/msgp/msgp"
)
type httpTransport struct {
url string // the delivery URL for stats
client *http.Client // the HTTP client used in the POST
headers map[string]string // the Transport headers
}
func newHTTPTransport(agentURL *url.URL, client *http.Client) *httpTransport {
// initialize the default EncoderPool with Encoder headers
defaultHeaders := map[string]string{
"Datadog-Meta-Lang": "go",
"Datadog-Meta-Lang-Version": strings.TrimPrefix(runtime.Version(), "go"),
"Datadog-Meta-Lang-Interpreter": runtime.Compiler + "-" + runtime.GOARCH + "-" + runtime.GOOS,
"Content-Type": "application/msgpack",
"Content-Encoding": "gzip",
}
if cid := internal.ContainerID(); cid != "" {
defaultHeaders["Datadog-Container-ID"] = cid
}
if entityID := internal.ContainerID(); entityID != "" {
defaultHeaders["Datadog-Entity-ID"] = entityID
}
url := fmt.Sprintf("%s/v0.1/pipeline_stats", agentURL.String())
return &httpTransport{
url: url,
client: client,
headers: defaultHeaders,
}
}
func (t *httpTransport) sendPipelineStats(p *StatsPayload) error {
var buf bytes.Buffer
gzipWriter, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
if err != nil {
return err
}
if err := msgp.Encode(gzipWriter, p); err != nil {
return err
}
err = gzipWriter.Close()
if err != nil {
return err
}
req, err := http.NewRequest("POST", t.url, &buf)
if err != nil {
return err
}
for header, value := range t.headers {
req.Header.Set(header, value)
}
resp, err := t.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
defer io.Copy(io.Discard, req.Body)
if code := resp.StatusCode; code >= 400 {
// error, check the body for context information and
// return a nice error.
txt := http.StatusText(code)
msg := make([]byte, 100)
n, _ := resp.Body.Read(msg)
if n > 0 {
return fmt.Errorf("%s (Status: %s)", msg[:n], txt)
}
return fmt.Errorf("%s", txt)
}
return nil
}