forked from openzipkin/zipkin-go
/
kafka.go
102 lines (86 loc) · 2.41 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/*
Package kafka implements a Kafka reporter to send spans to a Kafka server/cluster.
*/
package kafka
import (
"encoding/json"
"log"
"os"
"github.com/Shopify/sarama"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
)
// defaultKafkaTopic sets the standard Kafka topic our Reporter will publish
// on. The default topic for zipkin-receiver-kafka is "zipkin", see:
// https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka
const defaultKafkaTopic = "zipkin"
// kafkaReporter implements Reporter by publishing spans to a Kafka
// broker.
type kafkaReporter struct {
producer sarama.AsyncProducer
logger *log.Logger
topic string
}
// ReporterOption sets a parameter for the kafkaReporter
type ReporterOption func(c *kafkaReporter)
// Logger sets the logger used to report errors in the collection
// process.
func Logger(logger *log.Logger) ReporterOption {
return func(c *kafkaReporter) {
c.logger = logger
}
}
// Producer sets the producer used to produce to Kafka.
func Producer(p sarama.AsyncProducer) ReporterOption {
return func(c *kafkaReporter) {
c.producer = p
}
}
// Topic sets the kafka topic to attach the reporter producer on.
func Topic(t string) ReporterOption {
return func(c *kafkaReporter) {
c.topic = t
}
}
// NewReporter returns a new Kafka-backed Reporter. address should be a slice of
// TCP endpoints of the form "host:port".
func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) {
r := &kafkaReporter{
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
}
for _, option := range options {
option(r)
}
if r.producer == nil {
p, err := sarama.NewAsyncProducer(address, nil)
if err != nil {
return nil, err
}
r.producer = p
}
go r.logErrors()
return r, nil
}
func (r *kafkaReporter) logErrors() {
for pe := range r.producer.Errors() {
r.logger.Print("msg", pe.Msg, "err", pe.Err, "result", "failed to produce msg")
}
}
func (r *kafkaReporter) Send(s model.SpanModel) {
// Zipkin expects the message to be wrapped in an array
ss := []model.SpanModel{s}
m, err := json.Marshal(ss)
if err != nil {
r.logger.Printf("failed when marshalling the span: %s\n", err.Error())
return
}
r.producer.Input() <- &sarama.ProducerMessage{
Topic: r.topic,
Key: nil,
Value: sarama.ByteEncoder(m),
}
}
func (r *kafkaReporter) Close() error {
return r.producer.Close()
}