/
egress_server.go
163 lines (145 loc) · 4.15 KB
/
egress_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package v2
import (
"log"
"time"
gendiode "code.cloudfoundry.org/go-diodes"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
"code.cloudfoundry.org/loggregator-release/src/diodes"
"code.cloudfoundry.org/loggregator-release/src/metricemitter"
"code.cloudfoundry.org/loggregator-release/src/plumbing/batching"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Subscriber registers stream DataSetters to accept reads.
type Subscriber interface {
Subscribe(req *loggregator_v2.EgressBatchRequest, setter DataSetter) (unsubscribe func())
}
// DataSetter accepts writes of v2.Envelopes
// TODO: This could be a named function. This will be a performance bump.
type DataSetter interface {
Set(*loggregator_v2.Envelope)
}
// EgressServer implements the loggregator_v2.EgressServer interface.
type EgressServer struct {
loggregator_v2.EgressServer
subscriber Subscriber
egressMetric *metricemitter.Counter
droppedMetric *metricemitter.Counter
subscriptionsMetric *metricemitter.Gauge
batchInterval time.Duration
batchSize uint
}
// NewEgressServer is the constructor for EgressServer.
func NewEgressServer(
s Subscriber,
m MetricClient,
droppedMetric *metricemitter.Counter,
subscriptionsMetric *metricemitter.Gauge,
batchInterval time.Duration,
batchSize uint,
) *EgressServer {
// metric-documentation-v2: (loggregator.doppler.egress) Number of
// envelopes read from a diode to be sent to subscriptions.
egressMetric := m.NewCounter("egress",
metricemitter.WithVersion(2, 0),
)
return &EgressServer{
subscriber: s,
egressMetric: egressMetric,
droppedMetric: droppedMetric,
subscriptionsMetric: subscriptionsMetric,
batchInterval: batchInterval,
batchSize: batchSize,
}
}
// Alert logs dropped message counts to stderr.
func (s *EgressServer) Alert(missed int) {
s.droppedMetric.Increment(uint64(missed))
}
// Receiver implements loggregator_v2.EgressServer.
func (s *EgressServer) Receiver(
req *loggregator_v2.EgressRequest,
sender loggregator_v2.Egress_ReceiverServer,
) error {
return status.Errorf(codes.Unimplemented, "use BatchedReceiver instead")
}
// BatchedReceiver implements loggregator_v2.EgressServer.
func (s *EgressServer) BatchedReceiver(
req *loggregator_v2.EgressBatchRequest,
sender loggregator_v2.Egress_BatchedReceiverServer,
) error {
s.subscriptionsMetric.Increment(1.0)
defer s.subscriptionsMetric.Decrement(1.0)
d := diodes.NewOneToOneWaiterEnvelopeV2(
1000,
gendiode.AlertFunc(func(missed int) {
log.Printf("Dropped %d envelopes (v2 buffer) ShardID: %s", missed, req.ShardId)
s.Alert(missed)
}),
gendiode.WithWaiterContext(sender.Context()),
)
cancel := s.subscriber.Subscribe(req, d)
defer cancel()
errStream := make(chan error, 1)
batcher := batching.NewV2EnvelopeBatcher(
int(s.batchSize),
s.batchInterval,
&batchWriter{
sender: sender,
errStream: errStream,
egressMetric: s.egressMetric,
},
)
c := make(chan *loggregator_v2.Envelope)
go func() {
for {
env := d.Next()
if env == nil {
return
}
select {
case c <- env:
case <-sender.Context().Done():
return
}
}
}()
resetDuration := 250 * time.Millisecond
timer := time.NewTimer(resetDuration)
for {
select {
case <-sender.Context().Done():
return sender.Context().Err()
case err := <-errStream:
return err
case <-timer.C:
batcher.ForcedFlush()
// Don't call stop like the documentation recommends because this
// case implies the timer has infact been triggered.
timer.Reset(resetDuration)
case env := <-c:
batcher.Write(env)
if !timer.Stop() {
<-timer.C
}
timer.Reset(resetDuration)
}
}
}
type batchWriter struct {
sender loggregator_v2.Egress_BatchedReceiverServer
errStream chan<- error
egressMetric *metricemitter.Counter
}
// Write adds an entry to the batch. If the batch conditions are met, the
// batch is flushed.
func (b *batchWriter) Write(batch []*loggregator_v2.Envelope) {
err := b.sender.Send(&loggregator_v2.EnvelopeBatch{
Batch: batch,
})
if err != nil {
b.errStream <- err
return
}
b.egressMetric.Increment(uint64(len(batch)))
}