Skip to content

Commit

Permalink
[otel-tracing] Added Tracing to Base package (driver) (#4196)
Browse files Browse the repository at this point in the history
  • Loading branch information
milosgajdos committed Mar 4, 2024
2 parents 95077fd + f690b3e commit 51a72c2
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 25 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ require (
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/exporters/autoexport v0.46.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
golang.org/x/crypto v0.18.0
golang.org/x/net v0.20.0
golang.org/x/oauth2 v0.11.0
google.golang.org/api v0.126.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -73,21 +78,16 @@ require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/net v0.20.0
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
131 changes: 113 additions & 18 deletions registry/storage/driver/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,20 @@ import (
"github.com/distribution/distribution/v3/internal/dcontext"
prometheus "github.com/distribution/distribution/v3/metrics"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/tracing"
"github.com/docker/go-metrics"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// storageAction is the metrics of blob related operations
var storageAction = prometheus.StorageNamespace.NewLabeledTimer("action", "The number of seconds that the storage action takes", "driver", "action")

// tracer is the OpenTelemetry tracer utilized for tracing operations within
// this package's code.
var tracer = otel.Tracer("github.com/distribution/distribution/v3/registry/storage/driver/base")

func init() {
metrics.Register(prometheus.StorageNamespace)
}
Expand Down Expand Up @@ -89,8 +97,16 @@ func (base *Base) setDriverName(e error) error {

// GetContent wraps GetContent of underlying storage driver.
func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.GetContent(%q)", base.Name(), path)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
}
ctx, span := tracer.Start(
ctx,
"GetContent",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand All @@ -104,8 +120,17 @@ func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) {

// PutContent wraps PutContent of underlying storage driver.
func (base *Base) PutContent(ctx context.Context, path string, content []byte) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.PutContent(%q)", base.Name(), path)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
attribute.Int(tracing.AttributePrefix+"storage.content.length", len(content)),
}
ctx, span := tracer.Start(
ctx,
"PutContent",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand All @@ -119,8 +144,17 @@ func (base *Base) PutContent(ctx context.Context, path string, content []byte) e

// Reader wraps Reader of underlying storage driver.
func (base *Base) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Reader(%q, %d)", base.Name(), path, offset)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
attribute.Int64(tracing.AttributePrefix+"storage.offset", offset),
}
ctx, span := tracer.Start(
ctx,
"Reader",
trace.WithAttributes(attrs...))

defer span.End()

if offset < 0 {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset, DriverName: base.StorageDriver.Name()}
Expand All @@ -136,8 +170,17 @@ func (base *Base) Reader(ctx context.Context, path string, offset int64) (io.Rea

// Writer wraps Writer of underlying storage driver.
func (base *Base) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Writer(%q, %v)", base.Name(), path, append)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
attribute.Bool(tracing.AttributePrefix+"storage.append", append),
}
ctx, span := tracer.Start(
ctx,
"Writer",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand All @@ -149,8 +192,16 @@ func (base *Base) Writer(ctx context.Context, path string, append bool) (storage

// Stat wraps Stat of underlying storage driver.
func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Stat(%q)", base.Name(), path)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
}
ctx, span := tracer.Start(
ctx,
"Stat",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand All @@ -164,8 +215,16 @@ func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo

// List wraps List of underlying storage driver.
func (base *Base) List(ctx context.Context, path string) ([]string, error) {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.List(%q)", base.Name(), path)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
}
ctx, span := tracer.Start(
ctx,
"List",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand All @@ -179,6 +238,18 @@ func (base *Base) List(ctx context.Context, path string) ([]string, error) {

// Move wraps Move of underlying storage driver.
func (base *Base) Move(ctx context.Context, sourcePath string, destPath string) error {
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.source.path", sourcePath),
attribute.String(tracing.AttributePrefix+"storage.dest.path", destPath),
}
ctx, span := tracer.Start(
ctx,
"Move",
trace.WithAttributes(attrs...))

defer span.End()

ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Move(%q, %q", base.Name(), sourcePath, destPath)

Expand All @@ -196,8 +267,16 @@ func (base *Base) Move(ctx context.Context, sourcePath string, destPath string)

// Delete wraps Delete of underlying storage driver.
func (base *Base) Delete(ctx context.Context, path string) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Delete(%q)", base.Name(), path)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
}
ctx, span := tracer.Start(
ctx,
"Delete",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand All @@ -211,8 +290,16 @@ func (base *Base) Delete(ctx context.Context, path string) error {

// RedirectURL wraps RedirectURL of the underlying storage driver.
func (base *Base) RedirectURL(r *http.Request, path string) (string, error) {
ctx, done := dcontext.WithTrace(r.Context())
defer done("%s.RedirectURL(%q)", base.Name(), path)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
}
ctx, span := tracer.Start(
r.Context(),
"RedirectURL",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) {
return "", storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand All @@ -226,8 +313,16 @@ func (base *Base) RedirectURL(r *http.Request, path string) (string, error) {

// Walk wraps Walk of underlying storage driver.
func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Walk(%q)", base.Name(), path)
attrs := []attribute.KeyValue{
attribute.String(tracing.AttributePrefix+"storage.driver.name", base.Name()),
attribute.String(tracing.AttributePrefix+"storage.path", path),
}
ctx, span := tracer.Start(
ctx,
"Walk",
trace.WithAttributes(attrs...))

defer span.End()

if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
Expand Down
42 changes: 42 additions & 0 deletions tracing/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package tracing

import (
"context"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// compositeExporter is a custom exporter that wraps multiple SpanExporters.
// It allows you to export spans to multiple destinations, e.g., different telemetry backends.
type compositeExporter struct {
exporters []sdktrace.SpanExporter
}

func newCompositeExporter(exporters ...sdktrace.SpanExporter) *compositeExporter {
return &compositeExporter{exporters: exporters}
}

// ExportSpans iterates over each SpanExporter in the compositeExporter and
// exports the spans. If any exporter returns an error, the process is stopped
// and the error is returned. This ensures that span exporting behaves correctly
// and reports errors as expected.
func (ce *compositeExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
for _, exporter := range ce.exporters {
if err := exporter.ExportSpans(ctx, spans); err != nil {
return err
}
}
return nil
}

// Shutdown iterates over each SpanExporter in the compositeExporter and
// shuts them down. If any exporter returns an error during shutdown, the process
// is stopped and the error is returned. This ensures proper shutdown of all exporters.
func (ce *compositeExporter) Shutdown(ctx context.Context) error {
for _, exporter := range ce.exporters {
if err := exporter.Shutdown(ctx); err != nil {
return err
}
}
return nil
}
82 changes: 82 additions & 0 deletions tracing/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package tracing

import (
"context"
"errors"
"testing"

"go.opentelemetry.io/otel/sdk/trace"
)

type mockSpanExporter struct {
exportSpansCalled bool
shutdownCalled bool
returnError bool
}

func (m *mockSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
m.exportSpansCalled = true
if m.returnError {
return errors.New("export error")
}
return nil
}

func (m *mockSpanExporter) Shutdown(ctx context.Context) error {
m.shutdownCalled = true
if m.returnError {
return errors.New("shutdown error")
}
return nil
}
func TestCompositeExporterExportSpans(t *testing.T) {
mockExporter1 := &mockSpanExporter{}
mockExporter2 := &mockSpanExporter{}
composite := newCompositeExporter(mockExporter1, mockExporter2)

err := composite.ExportSpans(context.Background(), nil)
if err != nil {
t.Errorf("ExportSpans() error = %v", err)
}

if !mockExporter1.exportSpansCalled || !mockExporter2.exportSpansCalled {
t.Error("ExportSpans was not called on all exporters")
}
}

func TestCompositeExporterExportSpans_Error(t *testing.T) {
mockExporter1 := &mockSpanExporter{returnError: true}
mockExporter2 := &mockSpanExporter{}
composite := newCompositeExporter(mockExporter1, mockExporter2)

err := composite.ExportSpans(context.Background(), nil)
if err == nil {
t.Error("Expected error from ExportSpans, but got none")
}
}

func TestCompositeExporterShutdown(t *testing.T) {
mockExporter1 := &mockSpanExporter{}
mockExporter2 := &mockSpanExporter{}
composite := newCompositeExporter(mockExporter1, mockExporter2)

err := composite.Shutdown(context.Background())
if err != nil {
t.Errorf("Shutdown() error = %v", err)
}

if !mockExporter1.shutdownCalled || !mockExporter2.shutdownCalled {
t.Error("Shutdown was not called on all exporters")
}
}

func TestCompositeExporterShutdown_Error(t *testing.T) {
mockExporter1 := &mockSpanExporter{returnError: true}
mockExporter2 := &mockSpanExporter{}
composite := newCompositeExporter(mockExporter1, mockExporter2)

err := composite.Shutdown(context.Background())
if err == nil {
t.Error("Expected error from Shutdown, but got none")
}
}

0 comments on commit 51a72c2

Please sign in to comment.