Skip to content

Commit

Permalink
Add open tracing middleware
Browse files Browse the repository at this point in the history
This implementation use a few sync maps to store local defined opentracing
span's across several function scopes.
A nicer solution would be appreciated, but looking to the dotnet counterpart
the same behavior is implemented using thread static variables which isn't
an option in go.
  • Loading branch information
emilingerslev committed Feb 27, 2019
1 parent 0b59164 commit b6dca27
Show file tree
Hide file tree
Showing 16 changed files with 443 additions and 5 deletions.
13 changes: 9 additions & 4 deletions actor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ type Context interface {
senderPart
receiverPart
spawnerPart
messagePart
}

type SenderContext interface {
infoPart
senderPart
messagePart
}

type ReceiverContext interface {
infoPart
receiverPart
messagePart
}

type SpawnerContext interface {
Expand Down Expand Up @@ -72,15 +75,17 @@ type basePart interface {
AwaitFuture(f *Future, continuation func(res interface{}, err error))
}

type senderPart interface {
// Sender returns the PID of actor that sent currently processed message
Sender() *PID

type messagePart interface {
// Message returns the current message to be processed
Message() interface{}

// MessageHeader returns the meta information for the currently processed message
MessageHeader() ReadonlyMessageHeader
}

type senderPart interface {
// Sender returns the PID of actor that sent currently processed message
Sender() *PID

// Send sends a message to the given PID
Send(pid *PID, message interface{})
Expand Down
35 changes: 35 additions & 0 deletions actor/middleware/opentracing/activespan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package opentracing

import (
"fmt"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/opentracing/opentracing-go"
"sync"
)

var activeSpan = sync.Map{}

func getActiveSpan(pid *actor.PID) opentracing.Span {
value, ok := activeSpan.Load(pid)
if !ok {
return nil
}
return value.(opentracing.Span)
}

func clearActiveSpan(pid *actor.PID) {
activeSpan.Delete(pid)
}

func setActiveSpan(pid *actor.PID, span opentracing.Span) {
activeSpan.Store(pid, span)
}

func GetActiveSpan(context actor.Context) opentracing.Span {
span := getActiveSpan(context.Self())
if span == nil {
// TODO: Fix finding the real span always or handle no-span better on receiving side
span = opentracing.StartSpan(fmt.Sprintf("%T/%T", context.Actor(), context.Message()))
}
return span
}
35 changes: 35 additions & 0 deletions actor/middleware/opentracing/envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package opentracing

import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/opentracing/opentracing-go"
)

type messageHeaderReader struct {
ReadOnlyMessageHeader actor.ReadonlyMessageHeader
}

func (reader *messageHeaderReader) ForeachKey(handler func(key, val string) error) error {
if reader.ReadOnlyMessageHeader == nil {
return nil
}
for _, key := range reader.ReadOnlyMessageHeader.Keys() {
err := handler(key, reader.ReadOnlyMessageHeader.Get(key))
if err != nil {
return err
}
}
return nil
}

var _ opentracing.TextMapReader = &messageHeaderReader{}

type messageEnvelopeWriter struct {
MessageEnvelope *actor.MessageEnvelope
}

func (writer *messageEnvelopeWriter) Set(key, val string) {
writer.MessageEnvelope.SetHeader(key, val)
}

var _ opentracing.TextMapWriter = &messageEnvelopeWriter{}
5 changes: 5 additions & 0 deletions actor/middleware/opentracing/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package opentracing

import "github.com/AsynkronIT/protoactor-go/log"

var logger = log.New(log.ErrorLevel, "[TRACING]")
15 changes: 15 additions & 0 deletions actor/middleware/opentracing/middlewarepropagation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package opentracing

import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/actor/middleware/propagator"
)

func TracingMiddleware() actor.SpawnMiddleware {
return propagator.New().
WithItselfForwarded().
WithSpawnMiddleware(SpawnMiddleware()).
WithSenderMiddleware(SenderMiddleware()).
WithReceiverMiddleware(ReceiverMiddleware()).
SpawnMiddleware
}
22 changes: 22 additions & 0 deletions actor/middleware/opentracing/parentspan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package opentracing

import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/opentracing/opentracing-go"
"sync"
)

var parentSpans = sync.Map{}

func getAndClearParentSpan(pid *actor.PID) opentracing.Span {
value, ok := parentSpans.Load(pid)
if !ok {
return nil
}
parentSpans.Delete(pid)
return value.(opentracing.Span)
}

func setParentSpan(pid *actor.PID, span opentracing.Span) {
parentSpans.Store(pid, span)
}
81 changes: 81 additions & 0 deletions actor/middleware/opentracing/receivermiddleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package opentracing

import (
"fmt"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/log"
"github.com/opentracing/opentracing-go"
)

func ReceiverMiddleware() actor.ReceiverMiddleware {
return func(next actor.ReceiverFunc) actor.ReceiverFunc {
return func(c actor.ReceiverContext, envelope *actor.MessageEnvelope) {
spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapReader(&messageHeaderReader{ReadOnlyMessageHeader: envelope.Header}))
if err == opentracing.ErrSpanContextNotFound {
logger.Debug("INBOUND No spanContext found", log.Stringer("PID", c.Self()), log.Error(err))
//next(c)
} else if err != nil {
logger.Debug("INBOUND Error", log.Stringer("PID", c.Self()), log.Error(err))
next(c, envelope)
return
}
var span opentracing.Span
switch envelope.Message.(type) {
case *actor.Started:
parentSpan := getAndClearParentSpan(c.Self())
if parentSpan != nil {
span = opentracing.StartSpan(fmt.Sprintf("%T/%T", c.Actor(), envelope.Message), opentracing.ChildOf(parentSpan.Context()))
logger.Debug("INBOUND Found parent span", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
} else {
logger.Debug("INBOUND No parent span", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
}
case *actor.Stopping:
var parentSpan opentracing.Span
if c.Parent() != nil {
parentSpan = getStoppingSpan(c.Parent())
}
if parentSpan != nil {
span = opentracing.StartSpan(fmt.Sprintf("%T/stopping", c.Actor()), opentracing.ChildOf(parentSpan.Context()))
} else {
span = opentracing.StartSpan(fmt.Sprintf("%T/stopping", c.Actor()))
}
setStoppingSpan(c.Self(), span)
span.SetTag("ActorPID", c.Self())
span.SetTag("ActorType", fmt.Sprintf("%T", c.Actor()))
span.SetTag("MessageType", fmt.Sprintf("%T", envelope.Message))
stoppingHandlingSpan := opentracing.StartSpan("stopping-handling", opentracing.ChildOf(span.Context()))
next(c, envelope)
stoppingHandlingSpan.Finish()
return
case *actor.Stopped:
span = getAndClearStoppingSpan(c.Self())
next(c, envelope)
if span != nil {
span.Finish()
}
return
}
if span == nil && spanContext == nil {
logger.Debug("INBOUND No spanContext. Starting new span", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
span = opentracing.StartSpan(fmt.Sprintf("%T/%T", c.Actor(), envelope.Message))
}
if span == nil {
logger.Debug("INBOUND Starting span from parent", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
span = opentracing.StartSpan(fmt.Sprintf("%T/%T", c.Actor(), envelope.Message), opentracing.ChildOf(spanContext))
}

setActiveSpan(c.Self(), span)
span.SetTag("ActorPID", c.Self())
span.SetTag("ActorType", fmt.Sprintf("%T", c.Actor()))
span.SetTag("MessageType", fmt.Sprintf("%T", envelope.Message))

defer func() {
logger.Debug("INBOUND Finishing span", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
span.Finish()
clearActiveSpan(c.Self())
}()

next(c, envelope)
}
}
}
33 changes: 33 additions & 0 deletions actor/middleware/opentracing/sendermiddleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package opentracing

import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/log"
"github.com/opentracing/opentracing-go"
)

func SenderMiddleware() actor.SenderMiddleware {

return func(next actor.SenderFunc) actor.SenderFunc {
return func(c actor.SenderContext, target *actor.PID, envelope *actor.MessageEnvelope) {

span := getActiveSpan(c.Self())

if span == nil {
logger.Debug("OUTBOUND No active span", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
next(c, target, envelope)
return
}

err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, opentracing.TextMapWriter(&messageEnvelopeWriter{MessageEnvelope: envelope}))
if err != nil {
logger.Debug("OUTBOUND Error injecting", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
next(c, target, envelope)
return
}

logger.Debug("OUTBOUND Successfully injected", log.Stringer("PID", c.Self()), log.TypeOf("ActorType", c.Actor()), log.TypeOf("MessageType", envelope.Message))
next(c, target, envelope)
}
}
}
33 changes: 33 additions & 0 deletions actor/middleware/opentracing/spawnmiddleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package opentracing

import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/log"
olog "github.com/opentracing/opentracing-go/log"
)

func SpawnMiddleware() actor.SpawnMiddleware {
return func(next actor.SpawnFunc) actor.SpawnFunc {
return func(id string, props *actor.Props, parentContext actor.SpawnerContext) (pid *actor.PID, e error) {
self := parentContext.Self()
pid, err := next(id, props, parentContext)
if err != nil {
logger.Debug("SPAWN got error trying to spawn", log.Stringer("PID", self), log.TypeOf("ActorType", parentContext.Actor()), log.Error(err))
return pid, err
}
if self != nil {
span := getActiveSpan(self)
if span != nil {
setParentSpan(pid, span)
span.LogFields(olog.String("SpawnPID", pid.String()))
logger.Debug("SPAWN found active span", log.Stringer("PID", self), log.TypeOf("ActorType", parentContext.Actor()), log.Stringer("SpawnedPID", pid))
} else {
logger.Debug("SPAWN no active span on parent", log.Stringer("PID", self), log.TypeOf("ActorType", parentContext.Actor()), log.Stringer("SpawnedPID", pid))
}
} else {
logger.Debug("SPAWN no parent pid", log.Stringer("SpawnedPID", pid))
}
return pid, err
}
}
}
30 changes: 30 additions & 0 deletions actor/middleware/opentracing/stoppingspan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package opentracing

import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/opentracing/opentracing-go"
"sync"
)

var stoppingSpans = sync.Map{}

func getAndClearStoppingSpan(pid *actor.PID) opentracing.Span {
value, ok := stoppingSpans.Load(pid)
if !ok {
return nil
}
stoppingSpans.Delete(pid)
return value.(opentracing.Span)
}

func getStoppingSpan(pid *actor.PID) opentracing.Span {
value, ok := stoppingSpans.Load(pid)
if !ok {
return nil
}
return value.(opentracing.Span)
}

func setStoppingSpan(pid *actor.PID, span opentracing.Span) {
stoppingSpans.Store(pid, span)
}
1 change: 1 addition & 0 deletions actor/middleware/opentracing/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package opentracing
17 changes: 17 additions & 0 deletions examples/jaegertracing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Jeager Tracing / OpenTracing example

To run the example an instance of Jaeger server is required running locally. The easiest way to run a jaeger server
instance is starting it using the included docker-compose file like this

```bash
docker-compose -f ./examples/jaegertracing/docker-compose.yaml up -d
```

And the just run the example:

```bash
go run ./examples/jaegertracing/main.go
```

After the test has run (and also during), traces can found using the Jaeger UI started at http://localhost:16686.

14 changes: 14 additions & 0 deletions examples/jaegertracing/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: '3'
services:
jaeger:
image: jaegertracing/all-in-one:1.7
environment:
COLLECTOR_ZIPKIN_HTTP_PORT: 9411
ports:
- '5775:5775/udp'
- '6831:6831/udp'
- '6832:6832/udp'
- '5778:5778'
- '16686:16686'
- '14268:14268'
- '9411:9411'
Loading

0 comments on commit b6dca27

Please sign in to comment.