-
Notifications
You must be signed in to change notification settings - Fork 31
/
http.go
76 lines (71 loc) · 2.23 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package http
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
"github.com/opentracing/opentracing-go"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
type httpSink struct {
sinkName string
header http.Header
client *http.Client
url string
}
func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInterface, x dfv1.HTTPSink) (sink.Interface, error) {
header := http.Header{}
for _, h := range x.Headers {
if h.Value != "" {
header.Add(h.Name, h.Value)
} else if h.ValueFrom != nil {
r := h.ValueFrom.SecretKeyRef
secret, err := secretInterface.Get(ctx, r.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get secret %q: %w", r.Name, err)
}
header.Add(h.Name, string(secret.Data[r.Key]))
}
}
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = 32
t.MaxConnsPerHost = 32
t.MaxIdleConnsPerHost = 32
t.TLSClientConfig.InsecureSkipVerify = x.InsecureSkipVerify
return httpSink{
sinkName,
header,
&http.Client{Timeout: 10 * time.Second, Transport: t},
x.URL,
}, nil
}
func (h httpSink) Sink(ctx context.Context, msg []byte) error {
span, ctx := opentracing.StartSpanFromContext(ctx, fmt.Sprintf("http-sink-%s", h.sinkName))
defer span.Finish()
req, err := http.NewRequestWithContext(ctx, "POST", h.url, bytes.NewBuffer(msg))
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header = h.header.Clone() // must clone to prevent concurrency issues
if err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)); err != nil {
return fmt.Errorf("failed to inject tracing headers: %w", err)
}
if err := dfv1.MetaInject(ctx, req.Header); err != nil {
return err
}
if resp, err := h.client.Do(req); err != nil {
return fmt.Errorf("failed to send HTTP request: %w", err)
} else {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("failed to send HTTP request: %q", resp.Status)
}
}
return nil
}