/
async_group.go
119 lines (100 loc) · 2.38 KB
/
async_group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package send
import (
"context"
"errors"
"fmt"
"strings"
"cdr.dev/grip/message"
)
type asyncGroupSender struct {
pipes []chan message.Composer
senders []Sender
cancel context.CancelFunc
*Base
}
// NewAsyncGroupSender produces an implementation of the Sender interface that,
// like the MultiSender, distributes a single message to a group of underlying
// sender implementations.
//
// This sender does not guarantee ordering of messages, and Send operations may
// if the underlying senders fall behind the buffer size.
//
// The sender takes ownership of the underlying Senders, so closing this sender
// closes all underlying Senders.
func NewAsyncGroupSender(ctx context.Context, bufferSize int, senders ...Sender) Sender {
s := &asyncGroupSender{
senders: senders,
Base: NewBase(""),
}
ctx, s.cancel = context.WithCancel(ctx)
for i := 0; i < len(senders); i++ {
p := make(chan message.Composer, bufferSize)
s.pipes = append(s.pipes, p)
go func(pipe chan message.Composer, sender Sender) {
for {
select {
case <-ctx.Done():
return
case m := <-pipe:
if m == nil {
continue
}
sender.Send(m)
}
}
}(p, senders[i])
}
s.closer = func() error {
s.cancel()
errs := []string{}
for _, sender := range s.senders {
if err := sender.Close(); err != nil {
errs = append(errs, err.Error())
}
}
for idx, pipe := range s.pipes {
if len(pipe) > 0 {
errs = append(errs, fmt.Sprintf("buffer for sender #%d has %d items remaining",
idx, len(pipe)))
}
close(pipe)
}
if len(errs) > 0 {
return errors.New(strings.Join(errs, "\n"))
}
return nil
}
return s
}
func (s *asyncGroupSender) SetLevel(l LevelInfo) error {
// if the base level isn't valid, then we shouldn't overwrite
// constinuent senders (this is the indication that they were overridden.)
if !s.Base.Level().Valid() {
return nil
}
if err := s.Base.SetLevel(l); err != nil {
return err
}
for _, sender := range s.senders {
_ = sender.SetLevel(l)
}
return nil
}
func (s *asyncGroupSender) Send(m message.Composer) {
bl := s.Base.Level()
if bl.Valid() && !bl.ShouldLog(m) {
return
}
for _, p := range s.pipes {
p <- m
}
}
func (s *asyncGroupSender) Flush(ctx context.Context) error {
var lastErr error
for _, sender := range s.senders {
if err := sender.Flush(ctx); err != nil {
lastErr = nil
}
}
return lastErr
}