/
client.go
136 lines (117 loc) · 2.8 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package client
import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"os/signal"
"time"
"github.com/go-logr/glogr"
"github.com/go-logr/logr"
"github.com/aka-bo/loqu/pkg/util"
)
// Options is used to configure the client
type Options struct {
Protocol string
Host string
Path string
Port int
RequestID string
Verb string
TimeoutSeconds int
UseWebSocket bool
IntervalSeconds int
Data *string
ExitMode bool
}
func (o *Options) dataOrDefault(data fmt.Stringer) []byte {
if o.Data != nil {
return []byte(*o.Data)
}
return []byte(data.String())
}
// Run the client
func Run(o *Options) {
logger := glogr.New().WithName("Client")
logger.Info("Run called", "options", o)
if o.UseWebSocket {
o.dial(logger)
} else {
o.postContinuously(logger)
}
}
func (o *Options) postContinuously(logger logr.Logger) {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(o.TimeoutSeconds/2) * time.Second,
KeepAlive: 5 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 4,
MaxIdleConnsPerHost: 2,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 1 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// TODO: maybe expose with flag?
// DisableKeepAlives: true,
}
timeout := time.Duration(o.TimeoutSeconds) * time.Second
client := http.Client{
Timeout: timeout,
Transport: transport,
}
o.post(logger, &client)
if o.IntervalSeconds <= 0 {
return
}
ticker := time.NewTicker(time.Duration(o.IntervalSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
o.post(logger, &client)
case <-interrupt:
logger.Info("interupt")
return
}
}
}
func (o *Options) post(logger logr.Logger, client *http.Client) {
url := fmt.Sprintf("%s://%s:%d/%s", o.Protocol, o.Host, o.Port, o.Path)
id := o.RequestID
if len(id) == 0 {
id = util.NewRequestID()
}
logger = logger.WithValues("requestID", id, "url", url)
logger.Info("post")
req, err := http.NewRequest(o.Verb, url, bytes.NewBuffer(o.dataOrDefault(time.Now())))
if err != nil {
o.handleError(logger, err, "failed to create new request")
return
}
req.Header.Set(util.KeyRequestID, id)
resp, err := client.Do(req)
if err != nil {
o.handleError(logger, err, "error sending http request")
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
o.handleError(logger, err, "error reading response")
return
}
logger.Info("response received", "code", resp.StatusCode)
fmt.Println(string(body))
}
func (o *Options) handleError(logger logr.Logger, err error, msg string) {
logger.Error(err, msg)
if o.ExitMode {
os.Exit(1)
}
}