forked from timescale/tsbs
-
Notifications
You must be signed in to change notification settings - Fork 2
/
http_client.go
133 lines (115 loc) · 3.22 KB
/
http_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/benchant/tsbs/pkg/query"
)
var bytesSlash = []byte("/") // heap optimization
// HTTPClient is a reusable HTTP Client.
type HTTPClient struct {
//client fasthttp.Client
client *http.Client
Host []byte
HostString string
uri []byte
}
// HTTPClientDoOptions wraps options uses when calling `Do`.
type HTTPClientDoOptions struct {
Debug int
PrettyPrintResponses bool
chunkSize uint64
database string
}
var httpClientOnce = sync.Once{}
var httpClient *http.Client
func getHttpClient() *http.Client {
httpClientOnce.Do(func() {
tr := &http.Transport{
MaxIdleConnsPerHost: 1024,
}
httpClient = &http.Client{Transport: tr}
})
return httpClient
}
// NewHTTPClient creates a new HTTPClient.
func NewHTTPClient(host string) *HTTPClient {
if strings.HasSuffix(host, "/") {
host = host[:len(host)-1]
}
return &HTTPClient{
client: getHttpClient(),
Host: []byte(host),
HostString: host,
uri: []byte{}, // heap optimization
}
}
// Do performs the action specified by the given Query. It uses fasthttp, and
// tries to minimize heap allocations.
func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, err error) {
// populate uri from the reusable byte slice:
w.uri = w.uri[:0]
w.uri = append(w.uri, w.Host...)
w.uri = append(w.uri, q.Path...)
// populate a request with data from the Query:
req, err := http.NewRequest(string(q.Method), string(w.uri), nil)
if err != nil {
panic(err)
}
// Perform the request while tracking latency:
start := time.Now()
resp, err := w.client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
panic("http request did not return status 200 OK")
}
var body []byte
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
lag = float64(time.Since(start).Nanoseconds()) / 1e6 // milliseconds
if opts != nil {
// Print debug messages, if applicable:
switch opts.Debug {
case 1:
fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms\n", q.HumanLabel, lag)
case 2:
fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription)
case 3:
fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription)
fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String()))
case 4:
fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription)
fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String()))
fmt.Fprintf(os.Stderr, "debug: response: %s\n", string(body))
default:
}
// Pretty print JSON responses, if applicable:
if opts.PrettyPrintResponses {
// Assumes the response is JSON! This holds for Influx
// and Elastic.
prefix := fmt.Sprintf("ID %d: ", q.GetID())
var v interface{}
var line []byte
full := make(map[string]interface{})
full["influxql"] = string(q.RawQuery)
json.Unmarshal(body, &v)
full["response"] = v
line, err = json.MarshalIndent(full, prefix, " ")
if err != nil {
return
}
fmt.Println(string(line) + "\n")
}
}
return lag, err
}