forked from heroku/hsup
/
shuttle.go
91 lines (81 loc) · 2.42 KB
/
shuttle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package shuttle
import (
"io/ioutil"
"log"
"sync"
metrics "github.com/rcrowley/go-metrics"
)
// Default logger to /dev/null
var (
discardLogger = log.New(ioutil.Discard, "", 0)
)
// Shuttle is the main entry point into the library
type Shuttle struct {
LogLineReader
config Config
LogLines chan LogLine
Batches chan Batch
MetricsRegistry metrics.Registry
bWaiter, oWaiter *sync.WaitGroup
Drops, Lost *Counter
NewFormatterFunc NewHTTPFormatterFunc
Logger *log.Logger
ErrLogger *log.Logger
}
// NewShuttle returns a properly constructed Shuttle with a given config
func NewShuttle(config Config) *Shuttle {
ll := make(chan LogLine, config.FrontBuff)
mr := metrics.NewRegistry()
return &Shuttle{
config: config,
LogLineReader: NewLogLineReader(ll, mr),
LogLines: ll,
Batches: make(chan Batch, config.BackBuff),
Drops: NewCounter(0),
Lost: NewCounter(0),
MetricsRegistry: mr,
NewFormatterFunc: config.FormatterFunc,
oWaiter: new(sync.WaitGroup),
bWaiter: new(sync.WaitGroup),
Logger: discardLogger,
ErrLogger: discardLogger,
}
}
// Launch a shuttle by spawing it's outlets and batchers (in that order), which
// is the reverse of shutdown.
func (s *Shuttle) Launch() {
s.startOutlets()
s.startBatchers()
}
// startOutlet launches config.NumOutlets number of outlets. When inbox is
// closed the outlets will finish up their output and exit.
func (s *Shuttle) startOutlets() {
for i := 0; i < s.config.NumOutlets; i++ {
s.oWaiter.Add(1)
go func() {
defer s.oWaiter.Done()
outlet := NewHTTPOutlet(s)
outlet.Outlet()
}()
}
}
// startBatchers starts config.NumBatchers number of batchers. When inLogs is
// closed the batchers will finsih up and exit.
func (s *Shuttle) startBatchers() {
for i := 0; i < s.config.NumBatchers; i++ {
s.bWaiter.Add(1)
go func() {
defer s.bWaiter.Done()
batcher := NewBatcher(s)
batcher.Batch()
}()
}
}
// Land gracefully terminates the shuttle instance, ensuring that anything
// read is batched and delivered
func (s *Shuttle) Land() {
close(s.LogLines) // Close the log line channel, all of the batchers will stop once they are done
s.bWaiter.Wait() // Wait for them to be done
close(s.Batches) // Close the batch channel, all of the outlets will stop once they are done
s.oWaiter.Wait() // Wait for them to be done
}