/
sink_concurrent.go
80 lines (72 loc) · 1.76 KB
/
sink_concurrent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package pipeline
import (
"context"
"errors"
"fmt"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type sinkConcurrent struct {
id string
threads int
sink SinkWorker
srcChan chan interface{}
srcMetrics *metrics
sinkMetrics *metrics
logger *zap.Logger
}
func NewSinkConcurrent(id string, threads int, worker SinkWorker) (sink, error) {
if threads <= 1 {
return nil, errors.New("thread setting must be greater than 1")
}
return &sinkConcurrent{
id: id,
threads: threads,
sink: worker,
srcMetrics: newMetrics(true),
sinkMetrics: newMetrics(true),
}, nil
}
func (s *sinkConcurrent) getID() string {
return s.id
}
func (s *sinkConcurrent) setup(srcChan chan interface{}, logger *zap.Logger) {
s.srcChan = srcChan
s.logger = logger
}
func (s *sinkConcurrent) run(ctx context.Context) error {
g, gctx := errgroup.WithContext(ctx)
for i := 0; i < s.threads; i++ {
tid := fmt.Sprintf("%s:%d", s.id, i)
g.Go(func() error {
s.logger.Debug("sink concurrent starting", zap.String("id", tid))
defer s.logger.Debug("sink concurrent exiting", zap.String("id", tid))
for {
srcStartTime := time.Now()
select {
case <-gctx.Done():
return nil
case item, open := <-s.srcChan:
if !open {
return nil
}
s.srcMetrics.recordDuration(time.Now().Sub(srcStartTime))
sinkStartTime := time.Now()
err := s.sink.Sink(ctx, tid, item)
if err != nil {
return fmt.Errorf("sink '%s' error: %v", tid, err)
}
s.sinkMetrics.recordDuration(time.Now().Sub(sinkStartTime))
}
}
})
}
return g.Wait()
}
func (s *sinkConcurrent) metrics() *sinkMetrics {
return &sinkMetrics{
SrcWait: s.srcMetrics.results(),
Sink: s.sinkMetrics.results(),
}
}