-
Notifications
You must be signed in to change notification settings - Fork 31
/
loadbalanced.go
154 lines (146 loc) · 4.38 KB
/
loadbalanced.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package loadbalanced
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
httpsource "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source/http"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/util/runtime"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
)
type loadBalanced struct {
httpSource source.Interface
jobs workqueue.Interface
}
func (s *loadBalanced) GetPending(context.Context) (uint64, error) {
// (a) if the jobs have yet to be polled, then this will be zero
// (b) if polling results in fewer results that the true pending amount (e.g. S3 ListObjectV2 returns max 1000 results)
// the the pending amount will be capped at that amount
return uint64(s.jobs.Len()), nil
}
type NewReq struct {
Logger logr.Logger
PipelineName string
StepName string
SourceName string
SourceURN string
LeadReplica bool
Concurrency int
PollPeriod time.Duration
Process source.Process
RemoveItem func(item interface{}) error
ListItems func() ([]interface{}, error)
}
func New(ctx context.Context, secretInterface corev1.SecretInterface, r NewReq) (source.HasPending, error) {
logger := r.Logger.WithValues("sourceName", r.SourceName)
// (a) in the future we could use a named queue to expose metrics
// (b) it would be good to limit the size of this work queue and have the `Add
jobs := workqueue.New()
authorization, httpSource, err := httpsource.New(ctx, secretInterface, r.PipelineName, r.StepName, r.SourceURN, r.SourceName, r.Process)
if err != nil {
return nil, err
}
if r.LeadReplica {
endpoint := "https://" + r.PipelineName + "-" + r.StepName + "/sources/" + r.SourceName
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxIdleConns = 32
t.MaxConnsPerHost = 32
t.MaxIdleConnsPerHost = 32
t.TLSClientConfig.InsecureSkipVerify = true
httpClient := &http.Client{Timeout: 10 * time.Second, Transport: t}
logger.Info("starting lead replica's workers", "source", r.SourceName, "endpoint", endpoint)
for w := 0; w < r.Concurrency; w++ {
go func() {
defer runtime.HandleCrash()
for {
item, shutdown := jobs.Get()
if shutdown {
return
}
func() {
defer jobs.Done(item)
itemS := item.(string)
req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBufferString(itemS))
if err != nil {
logger.Error(err, "failed to create request", "item", item)
} else {
req.Header.Set("Authorization", authorization)
resp, err := httpClient.Do(req)
if err != nil {
logger.Error(err, "failed to process item", "item", item)
} else {
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if resp.StatusCode >= 300 {
err := fmt.Errorf("%q: %q", resp.Status, body)
logger.Error(err, "failed to process item", "item", item)
} else {
logger.Info("deleting item", "item", item)
if err := r.RemoveItem(item); err != nil {
logger.Error(err, "failed to delete item", "item", item)
}
}
}
}
}()
}
}()
}
logger.Info("starting lead replica's change poller")
go func() {
defer runtime.HandleCrash()
OUTER:
for {
select {
case <-ctx.Done():
return
default:
endpoint := "https://" + r.PipelineName + "-" + r.StepName + "/ready"
logger.Info("waiting for HTTP service to be ready", "endpoint", endpoint)
resp, err := httpClient.Get(endpoint)
if err == nil && resp.StatusCode < 300 {
break OUTER
}
time.Sleep(3 * time.Second)
}
}
poll := func() {
list, err := r.ListItems()
if err != nil {
logger.Error(err, "failed to list items")
} else {
for _, item := range list {
jobs.Add(item)
}
}
}
logger.Info("executing initial poll")
poll()
if r.PollPeriod > 0 {
logger.Info("starting polling loop", "pollPeriod", r.PollPeriod)
ticker := time.NewTicker(r.PollPeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
poll()
}
}
} else {
logger.Info("polling loop disabled", "pollPeriod", r.PollPeriod)
}
}()
}
return &loadBalanced{httpSource, jobs}, nil
}
func (s *loadBalanced) Close() error {
s.jobs.ShutDown()
return s.httpSource.Close()
}