Skip to content

Commit

Permalink
contrib/confluentinc/confluent-kafka-go: add confluent kafka integrat…
Browse files Browse the repository at this point in the history
…ion (#310)
  • Loading branch information
dd-caleb authored and gbbr committed Sep 11, 2018
1 parent ba8f967 commit 8c7e8ff
Show file tree
Hide file tree
Showing 6 changed files with 514 additions and 0 deletions.
31 changes: 31 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: 2

jobs:
build:
working_directory: /go/src/gopkg.in/DataDog/dd-trace-go.v1
Expand Down Expand Up @@ -32,9 +33,39 @@ jobs:
DD_API_KEY: invalid_key_but_this_is_fine
- image: circleci/mongo:latest-ram
- image: memcached:1.5.9
- image: confluentinc/cp-zookeeper:5.0.0
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
- image: confluentinc/cp-kafka:5.0.0
environment:
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"

steps:
- checkout

- restore_cache:
keys:
- v1-librdkafka-v0.11.5
- run:
name: Install rdkafka
command: |
if [ ! -d /tmp/librdkafka-v0.11.5 ] ; then
echo "building librdkafka"
git clone --branch v0.11.5 https://github.com/edenhill/librdkafka.git /tmp/librdkafka-v0.11.5
(cd /tmp/librdkafka-v0.11.5 && ./configure && make)
fi
echo "installing librdkafka"
(cd /tmp/librdkafka-v0.11.5 && sudo make install)
sudo ldconfig
- save_cache:
key: v1-librdkafka-v0.11.5
paths:
- /tmp/librdkafka-v0.11.5

- run:
name: Vendor gRPC v1.2.0
# This step vendors gRPC v1.2.0 inside our gRPC.v12 contrib
Expand Down
47 changes: 47 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package kafka

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// A MessageCarrier injects and extracts traces from a sarama.ProducerMessage.
type MessageCarrier struct {
msg *kafka.Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*MessageCarrier)(nil)

// ForeachKey iterates over every header.
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.Headers {
err := handler(string(h.Key), string(h.Value))
if err != nil {
return err
}
}
return nil
}

// Set sets a header.
func (c MessageCarrier) Set(key, val string) {
// ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(val),
})
}

// NewMessageCarrier creates a new MessageCarrier.
func NewMessageCarrier(msg *kafka.Message) MessageCarrier {
return MessageCarrier{msg}
}
223 changes: 223 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).
package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka"

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
c, err := kafka.NewConsumer(conf)
if err != nil {
return nil, err
}
return WrapConsumer(c, opts...), nil
}

// NewProducer calls kafka.NewProducer and wraps the resulting Producer.
func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
p, err := kafka.NewProducer(conf)
if err != nil {
return nil, err
}
return WrapProducer(p, opts...), nil
}

// A Consumer wraps a kafka.Consumer.
type Consumer struct {
*kafka.Consumer
cfg *config
events chan kafka.Event
prev ddtrace.Span
}

// WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
wrapped := &Consumer{
Consumer: c,
cfg: newConfig(opts...),
}
wrapped.events = wrapped.traceEventsChannel(c.Events())
return wrapped
}

func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
// in will be nil when consuming via the events channel is not enabled
if in == nil {
return nil
}

out := make(chan kafka.Event, 1)
go func() {
defer close(out)
for evt := range in {
var next ddtrace.Span

// only trace messages
if msg, ok := evt.(*kafka.Message); ok {
next = c.startSpan(msg)
}

out <- evt

if c.prev != nil {
c.prev.Finish()
}
c.prev = next
}
// finish any remaining span
if c.prev != nil {
c.prev.Finish()
c.prev = nil
}
}()

return out
}

func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(c.cfg.serviceName),
tracer.ResourceName("Consume Topic " + *msg.TopicPartition.Topic),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("partition", msg.TopicPartition.Partition),
tracer.Tag("offset", msg.TopicPartition.Offset),
}
// kafka supports headers, so try to extract a span context
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(c.cfg.ctx, "kafka.consume", opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
return span
}

// Close calls the underlying Consumer.Close and if polling is enabled, finishes
// any remaining span.
func (c *Consumer) Close() error {
err := c.Consumer.Close()
// we only close the previous span if consuming via the events channel is
// not enabled, because otherwise there would be a data race from the
// consuming goroutine.
if c.events == nil && c.prev != nil {
c.prev.Finish()
c.prev = nil
}
return err
}

// Events returns the kafka Events channel (if enabled). Message events will be
// traced.
func (c *Consumer) Events() chan kafka.Event {
return c.events
}

// Poll polls the consumer for messages or events. Message events will be
// traced.
func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
if c.prev != nil {
c.prev.Finish()
c.prev = nil
}
evt := c.Consumer.Poll(timeoutMS)
if msg, ok := evt.(*kafka.Message); ok {
c.prev = c.startSpan(msg)
}
return evt
}

// A Producer wraps a kafka.Producer.
type Producer struct {
*kafka.Producer
cfg *config
produceChannel chan *kafka.Message
}

// WrapProducer wraps a kafka.Producer so requests are traced.
func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
wrapped := &Producer{
Producer: p,
cfg: newConfig(opts...),
}
wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
return wrapped
}

func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message {
if out == nil {
return out
}

in := make(chan *kafka.Message, 1)
go func() {
for msg := range in {
span := p.startSpan(msg)
out <- msg
span.Finish()
}
}()

return in
}

func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(p.cfg.serviceName),
tracer.ResourceName("Produce Topic " + *msg.TopicPartition.Topic),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag("partition", msg.TopicPartition.Partition),
}
carrier := NewMessageCarrier(msg)
span, _ := tracer.StartSpanFromContext(p.cfg.ctx, "kafka.produce", opts...)
// inject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
return span
}

// Close calls the underlying Producer.Close and also closes the internal
// wrapping producer channel.
func (p *Producer) Close() {
close(p.produceChannel)
p.Producer.Close()
}

// Produce calls the underlying Producer.Produce and traces the request.
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
span := p.startSpan(msg)

// if the user has selected a delivery channel, we will wrap it and
// wait for the delivery event to finish the span
if deliveryChan != nil {
oldDeliveryChan := deliveryChan
deliveryChan = make(chan kafka.Event)
go func() {
var err error
evt := <-deliveryChan
if msg, ok := evt.(*kafka.Message); ok {
// delivery errors are returned via TopicPartition.Error
err = msg.TopicPartition.Error
}
span.Finish(tracer.WithError(err))
oldDeliveryChan <- evt
}()
}

err := p.Producer.Produce(msg, deliveryChan)
// with no delivery channel, finish immediately
if deliveryChan == nil {
span.Finish(tracer.WithError(err))
}

return err
}

// ProduceChannel returns a channel which can receive kafka Messages and will
// send them to the underlying producer channel.
func (p *Producer) ProduceChannel() chan *kafka.Message {
return p.produceChannel
}
Loading

0 comments on commit 8c7e8ff

Please sign in to comment.