/
sender.go
116 lines (104 loc) · 2.54 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package agent
import (
"context"
"fmt"
"github.com/clambin/go-common/set"
"github.com/clambin/uptime/internal/monitor"
"github.com/clambin/uptime/pkg/retry"
"io"
"log/slog"
"net/http"
"time"
)
type sender struct {
in <-chan event
configuration Configuration
metrics *Metrics
httpClient *http.Client
logger *slog.Logger
}
func (s sender) Run(ctx context.Context) {
for {
select {
case ev := <-s.in:
s.process(ctx, ev)
case <-ctx.Done():
return
}
}
}
func (s sender) process(ctx context.Context, ev event) {
l := s.logger.With("event", ev)
l.Debug("sending request")
method := getMethod(ev.eventType)
for _, request := range s.makeRequests(ev) {
waiter := retry.MultiplyingWaiter{InitialWait: time.Second, MaxWait: time.Millisecond, Factor: 2}
for {
err := s.send(ctx, method, request)
if err == nil {
return
}
l.Warn("request failed. waiting to retry", "err", err)
if waiter.Wait(ctx) != nil {
return
}
}
}
}
func getMethod(ev eventType) string {
switch ev {
case addEvent:
return http.MethodPost
case deleteEvent:
return http.MethodDelete
default:
panic("invalid event type: " + ev)
}
}
func (s sender) makeRequests(ev event) []monitor.Request {
targets := ev.targetHosts()
requests := make([]monitor.Request, len(targets))
for i := range targets {
requests[i] = s.makeRequest(targets[i])
}
return requests
}
func (s sender) makeRequest(host string) monitor.Request {
ep := s.configuration.Global
if custom, ok := s.configuration.Hosts[host]; ok {
if custom.Method != "" {
ep.Method = custom.Method
}
if custom.Interval != 0 {
ep.Interval = custom.Interval
}
if custom.ValidStatusCodes != nil {
ep.ValidStatusCodes = custom.ValidStatusCodes
}
}
return monitor.Request{
Target: host,
Method: ep.Method,
ValidCodes: set.New(ep.ValidStatusCodes...),
Interval: ep.Interval,
}
}
func (s sender) send(ctx context.Context, method string, request monitor.Request) error {
start := time.Now()
r, _ := http.NewRequestWithContext(ctx, method, s.configuration.Monitor+"/target?"+request.Encode(), nil)
if s.configuration.Token != "" {
r.Header.Set("Authorization", "Bearer "+s.configuration.Token)
}
resp, err := s.httpClient.Do(r)
if err != nil {
return err
}
defer func(Body io.ReadCloser) { _ = Body.Close() }(resp.Body)
if s.metrics != nil {
s.metrics.ObserveRequest(resp.StatusCode, time.Since(start))
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected http status: %s", resp.Status)
}
return nil
}