-
Notifications
You must be signed in to change notification settings - Fork 13
/
zipkin.go
156 lines (126 loc) · 4.2 KB
/
zipkin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package zipkin
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"strconv"
"sync"
"github.com/circonus-labs/circonus-unified-agent/cua"
"github.com/circonus-labs/circonus-unified-agent/plugins/inputs"
"github.com/circonus-labs/circonus-unified-agent/plugins/inputs/zipkin/trace"
"github.com/gorilla/mux"
)
const (
// DefaultPort is the default port zipkin listens on, which zipkin implementations
// expect.
DefaultPort = 9411
// DefaultRoute is the default route zipkin uses, and zipkin implementations
// expect.
DefaultRoute = "/api/v1/spans"
// DefaultShutdownTimeout is the max amount of time to wait
// for the plugin to shutdown
DefaultShutdownTimeout = 5
)
var (
// DefaultNetwork is the network to listen on; use only in tests.
DefaultNetwork = "tcp"
)
// Recorder represents a type which can record zipkin trace data as well as
// any accompanying errors, and process that data.
type Recorder interface {
Record(trace.Trace) error
Error(error)
}
// Handler represents a type which can register itself with a router for
// http routing, and a Recorder for trace data collection.
type Handler interface {
Register(router *mux.Router, recorder Recorder) error
}
const sampleConfig = `
instance_id = "" # unique instance identifier (REQUIRED)
# path = "/api/v1/spans" # URL path for span data
# port = 9411 # Port on which agent listens
`
// Zipkin is a configuration structure for the zipkin input plugin,
// but it also contains fields for the management of a separate, concurrent
// zipkin http server
type Zipkin struct {
ServiceAddress string
Port int
Path string
Log cua.Logger
address string
handler Handler
server *http.Server
waitGroup *sync.WaitGroup
}
// Description is a necessary method implementation from cua.ServiceInput
func (z Zipkin) Description() string {
return "This plugin implements the Zipkin http server to gather trace and timing data needed to troubleshoot latency problems in microservice architectures."
}
// SampleConfig is a necessary method implementation from cua.ServiceInput
func (z Zipkin) SampleConfig() string {
return sampleConfig
}
// Gather is empty for the zipkin plugin; all gathering is done through
// the separate goroutine launched in (*Zipkin).Start()
func (z *Zipkin) Gather(_ context.Context, _ cua.Accumulator) error { return nil }
// Start launches a separate goroutine for collecting zipkin client http requests,
// passing in a cua.Accumulator such that data can be collected.
func (z *Zipkin) Start(ctx context.Context, acc cua.Accumulator) error {
z.handler = NewSpanHandler(z.Path)
var wg sync.WaitGroup
z.waitGroup = &wg
router := mux.NewRouter()
converter := NewLineProtocolConverter(acc)
if err := z.handler.Register(router, converter); err != nil {
return fmt.Errorf("handler register: %w", err)
}
z.server = &http.Server{
Handler: router,
}
addr := ":" + strconv.Itoa(z.Port)
ln, err := net.Listen(DefaultNetwork, addr)
if err != nil {
return fmt.Errorf("net listen (%s): %w", addr, err)
}
z.address = ln.Addr().String()
z.Log.Infof("Started the zipkin listener on %s", z.address)
z.waitGroup.Add(1)
go func() {
defer z.waitGroup.Done()
z.Listen(ln, acc)
}()
return nil
}
// Stop shuts the internal http server down with via context.Context
func (z *Zipkin) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), DefaultShutdownTimeout)
defer z.waitGroup.Wait()
defer cancel()
_ = z.server.Shutdown(ctx)
}
// Listen creates an http server on the zipkin instance it is called with, and
// serves http until it is stopped by Zipkin's (*Zipkin).Stop() method.
func (z *Zipkin) Listen(ln net.Listener, acc cua.Accumulator) {
if err := z.server.Serve(ln); err != nil {
// Because of the clean shutdown in `(*Zipkin).Stop()`
// We're expecting a server closed error at some point
// So we don't want to display it as an error.
// This interferes with internal data collection,
// by making it appear as if a serious error occurred.
if !errors.Is(err, http.ErrServerClosed) {
acc.AddError(fmt.Errorf("E! Error listening: %w", err))
}
}
}
func init() {
inputs.Add("zipkin", func() cua.Input {
return &Zipkin{
Path: DefaultRoute,
Port: DefaultPort,
}
})
}