forked from jaegertracing/jaeger
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tbuffered_server.go
129 lines (112 loc) · 3.77 KB
/
tbuffered_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package servers
import (
"sync"
"sync/atomic"
"github.com/apache/thrift/lib/go/thrift"
"github.com/uber/jaeger-lib/metrics"
)
// TBufferedServer is a custom thrift server that reads traffic using the transport provided
// and places messages into a buffered channel to be processed by the processor provided
type TBufferedServer struct {
// NB. queueLength HAS to be at the top of the struct or it will SIGSEV for certain architectures.
// See https://github.com/golang/go/issues/13868
queueSize int64
dataChan chan *ReadBuf
maxPacketSize int
maxQueueSize int
serving uint32
transport thrift.TTransport
readBufPool *sync.Pool
metrics struct {
// Size of the current server queue
QueueSize metrics.Gauge `metric:"thrift.udp.server.queue_size"`
// Size (in bytes) of packets received by server
PacketSize metrics.Gauge `metric:"thrift.udp.server.packet_size"`
// Number of packets dropped by server
PacketsDropped metrics.Counter `metric:"thrift.udp.server.packets.dropped"`
// Number of packets processed by server
PacketsProcessed metrics.Counter `metric:"thrift.udp.server.packets.processed"`
// Number of malformed packets the server received
ReadError metrics.Counter `metric:"thrift.udp.server.read.errors"`
}
}
// NewTBufferedServer creates a TBufferedServer
func NewTBufferedServer(
transport thrift.TTransport,
maxQueueSize int,
maxPacketSize int,
mFactory metrics.Factory,
) (*TBufferedServer, error) {
dataChan := make(chan *ReadBuf, maxQueueSize)
var readBufPool = &sync.Pool{
New: func() interface{} {
return &ReadBuf{bytes: make([]byte, maxPacketSize)}
},
}
res := &TBufferedServer{dataChan: dataChan,
transport: transport,
maxQueueSize: maxQueueSize,
maxPacketSize: maxPacketSize,
readBufPool: readBufPool,
}
metrics.Init(&res.metrics, mFactory, nil)
return res, nil
}
// Serve initiates the readers and starts serving traffic
func (s *TBufferedServer) Serve() {
atomic.StoreUint32(&s.serving, 1)
for s.IsServing() {
readBuf := s.readBufPool.Get().(*ReadBuf)
n, err := s.transport.Read(readBuf.bytes)
if err == nil {
readBuf.n = n
s.metrics.PacketSize.Update(int64(n))
select {
case s.dataChan <- readBuf:
s.metrics.PacketsProcessed.Inc(1)
s.updateQueueSize(1)
default:
s.metrics.PacketsDropped.Inc(1)
}
} else {
s.metrics.ReadError.Inc(1)
}
}
}
func (s *TBufferedServer) updateQueueSize(delta int64) {
atomic.AddInt64(&s.queueSize, delta)
s.metrics.QueueSize.Update(atomic.LoadInt64(&s.queueSize))
}
// IsServing indicates whether the server is currently serving traffic
func (s *TBufferedServer) IsServing() bool {
return atomic.LoadUint32(&s.serving) == 1
}
// Stop stops the serving of traffic and waits until the queue is
// emptied by the readers
func (s *TBufferedServer) Stop() {
atomic.StoreUint32(&s.serving, 0)
s.transport.Close()
close(s.dataChan)
}
// DataChan returns the data chan of the buffered server
func (s *TBufferedServer) DataChan() chan *ReadBuf {
return s.dataChan
}
// DataRecd is called by the consumers every time they read a data item from DataChan
func (s *TBufferedServer) DataRecd(buf *ReadBuf) {
s.updateQueueSize(-1)
s.readBufPool.Put(buf)
}