Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Better OTEL traces, add metrics #1751

Merged
merged 10 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/simple_plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ require (
github.com/yosssi/ace v0.0.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/arch v0.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions examples/simple_plugin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 h1:CIHWikMsN3wO+wq1Tp5VGdVRTcON+DmOJSfDjXypKOc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0/go.mod h1:TNupZ6cxqyFEpLXAZW7On+mLFL0/g0TE3unIYL91xWc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 h1:R9DE4kQ4k+YtfLI2ULwX82VtNQ2J8yZmA7ZIF/D+7Mc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0/go.mod h1:OQFyQVrDlbe+R7xrEyDr/2Wr67Ol0hRUgsfA+V5A95s=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0 h1:QY7/0NeRPKlzusf40ZE4t1VlMKbqSNT7cJRYzWuja0s=
Expand All @@ -278,6 +280,8 @@ go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI=
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI=
go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ require (
github.com/stretchr/testify v1.9.0
github.com/thoas/go-funk v0.9.3
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc
golang.org/x/sync v0.7.0
golang.org/x/text v0.15.0
Expand Down Expand Up @@ -106,8 +110,6 @@ require (
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/yosssi/ace v0.0.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0 h1:CIHWikMsN3wO+wq1Tp5VGdVRTcON+DmOJSfDjXypKOc=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0/go.mod h1:TNupZ6cxqyFEpLXAZW7On+mLFL0/g0TE3unIYL91xWc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 h1:R9DE4kQ4k+YtfLI2ULwX82VtNQ2J8yZmA7ZIF/D+7Mc=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0/go.mod h1:OQFyQVrDlbe+R7xrEyDr/2Wr67Ol0hRUgsfA+V5A95s=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0 h1:QY7/0NeRPKlzusf40ZE4t1VlMKbqSNT7cJRYzWuja0s=
Expand All @@ -280,6 +282,8 @@ go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI=
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI=
go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
Expand Down
15 changes: 15 additions & 0 deletions internal/servers/plugin/v3/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -147,6 +150,17 @@ func (s *Server) Read(req *pb.Read_Request, stream pb.Plugin_ReadServer) error {
return readErr
}

func flushMetrics() {
traceProvider, ok := otel.GetTracerProvider().(*trace.TracerProvider)
if ok && traceProvider != nil {
traceProvider.ForceFlush(context.Background())
}
meterProvider, ok := otel.GetMeterProvider().(*metric.MeterProvider)
if ok && meterProvider != nil {
meterProvider.ForceFlush(context.Background())
}
}

func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
msgs := make(chan message.SyncMessage)
var syncErr error
Expand All @@ -166,6 +180,7 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
}

go func() {
defer flushMetrics()
defer close(msgs)
err := s.Plugin.Sync(ctx, syncOptions, msgs)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions plugin/plugin_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,10 @@ func (p *Plugin) Sync(ctx context.Context, options SyncOptions, res chan<- messa
if p.client == nil {
return fmt.Errorf("plugin not initialized. call Init() first")
}
// startTime := time.Now()

if err := p.client.Sync(ctx, options, res); err != nil {
return fmt.Errorf("failed to sync unmanaged client: %w", err)
}

// p.logger.Info().Uint64("resources", p.metrics.TotalResources()).Uint64("errors", p.metrics.TotalErrors()).Uint64("panics", p.metrics.TotalPanics()).TimeDiff("duration", time.Now(), startTime).Msg("sync finished")
erezrokah marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
120 changes: 117 additions & 3 deletions scheduler/metrics.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
package scheduler

import (
"context"
"sync/atomic"
"time"

"github.com/cloudquery/plugin-sdk/v4/schema"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// Metrics is deprecated as we move toward open telemetry for tracing and metrics
type Metrics struct {
TableClient map[string]map[string]*TableClientMetrics
}

type OtelMeters struct {
resources metric.Int64Counter
errors metric.Int64Counter
panics metric.Int64Counter
startTime metric.Int64Counter
endTime metric.Int64Counter
attributes []attribute.KeyValue
}

type TableClientMetrics struct {
Resources uint64
Errors uint64
Panics uint64
Duration atomic.Pointer[time.Duration]

otelMeters *OtelMeters
}

func durationPointerEqual(a, b *time.Duration) bool {
Expand All @@ -26,8 +41,8 @@ func durationPointerEqual(a, b *time.Duration) bool {
return b != nil && *a == *b
}

func (s *TableClientMetrics) Equal(other *TableClientMetrics) bool {
return s.Resources == other.Resources && s.Errors == other.Errors && s.Panics == other.Panics && durationPointerEqual(s.Duration.Load(), other.Duration.Load())
func (m *TableClientMetrics) Equal(other *TableClientMetrics) bool {
return m.Resources == other.Resources && m.Errors == other.Errors && m.Panics == other.Panics && durationPointerEqual(m.Duration.Load(), other.Duration.Load())
}

// Equal compares to stats. Mostly useful in testing
Expand Down Expand Up @@ -61,10 +76,69 @@ func (s *Metrics) Equal(other *Metrics) bool {
return true
}

func getOtelMeters(tableName string, clientID string) *OtelMeters {
resources, err := otel.Meter(otelName).Int64Counter("sync.table.resources",
metric.WithDescription("Number of resources synced for a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

errors, err := otel.Meter(otelName).Int64Counter("sync.table.errors",
metric.WithDescription("Number of errors encountered while syncing a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

panics, err := otel.Meter(otelName).Int64Counter("sync.table.panics",
metric.WithDescription("Number of panics encountered while syncing a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

startTime, err := otel.Meter(otelName).Int64Counter("sync.table.start_time",
metric.WithDescription("Start time of syncing a table"),
metric.WithUnit("ns"),
)
if err != nil {
return nil
}

endTime, err := otel.Meter(otelName).Int64Counter("sync.table.end_time",
metric.WithDescription("End time of syncing a table"),
metric.WithUnit("ns"),
)

if err != nil {
return nil
}

return &OtelMeters{
resources: resources,
errors: errors,
panics: panics,
startTime: startTime,
endTime: endTime,
attributes: []attribute.KeyValue{
attribute.Key("sync.client.id").String(clientID),
attribute.Key("sync.table.name").String(tableName),
},
}
}

func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMeta) {
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
for _, client := range clients {
s.TableClient[table.Name][client.ID()] = &TableClientMetrics{}
tableName := table.Name
clientID := client.ID()
s.TableClient[tableName][clientID] = &TableClientMetrics{
otelMeters: getOtelMeters(tableName, clientID),
}
}
for _, relation := range table.Relations {
s.initWithClients(relation, clients)
Expand Down Expand Up @@ -130,3 +204,43 @@ func (s *Metrics) TotalResourcesAtomic() uint64 {
}
return total
}

func (m *TableClientMetrics) OtelResourcesAdd(ctx context.Context, count int64) {
if m.otelMeters == nil {
return
}

m.otelMeters.resources.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...))
}

func (m *TableClientMetrics) OtelErrorsAdd(ctx context.Context, count int64) {
if m.otelMeters == nil {
return
}

m.otelMeters.errors.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...))
}

func (m *TableClientMetrics) OtelPanicsAdd(ctx context.Context, count int64) {
if m.otelMeters == nil {
return
}

m.otelMeters.panics.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...))
}

func (m *TableClientMetrics) OtelStartTime(ctx context.Context, start time.Time) {
if m.otelMeters == nil {
return
}

m.otelMeters.startTime.Add(ctx, start.UnixNano(), metric.WithAttributes(m.otelMeters.attributes...))
}

func (m *TableClientMetrics) OtelEndTime(ctx context.Context, end time.Time) {
if m.otelMeters == nil {
return
}

m.otelMeters.endTime.Add(ctx, end.UnixNano(), metric.WithAttributes(m.otelMeters.attributes...))
}
35 changes: 26 additions & 9 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/rs/zerolog"
"github.com/thoas/go-funk"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
)

Expand All @@ -25,7 +27,7 @@ const (
DefaultMaxDepth = 4
minTableConcurrency = 1
minResourceConcurrency = 100
otelName = "schedule"
otelName = "io.cloudquery"
)

var ErrNoTables = errors.New("no tables specified for syncing, review `tables` and `skip_tables` in your config and specify at least one table to sync")
Expand Down Expand Up @@ -82,6 +84,12 @@ func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption {
}
}

func WithInvocationID(invocationID string) Option {
return func(s *Scheduler) {
s.invocationID = invocationID
}
}

type Client interface {
ID() string
}
Expand All @@ -107,6 +115,8 @@ type Scheduler struct {

// Controls how records are constructed on the source side.
batchSettings *BatchSettings

invocationID string
}

type syncClient struct {
Expand All @@ -115,8 +125,9 @@ type syncClient struct {
scheduler *Scheduler
deterministicCQID bool
// status sync metrics
metrics *Metrics
logger zerolog.Logger
metrics *Metrics
logger zerolog.Logger
invocationID string
}

func NewScheduler(opts ...Option) *Scheduler {
Expand Down Expand Up @@ -167,18 +178,22 @@ func (s *Scheduler) SyncAll(ctx context.Context, client schema.ClientMeta, table
}

func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables schema.Tables, res chan<- message.SyncMessage, opts ...SyncOption) error {
ctx, span := otel.Tracer(otelName).Start(ctx, "Sync")
ctx, span := otel.Tracer(otelName).Start(ctx,
"sync",
trace.WithAttributes(attribute.Key("sync.invocation.id").String(s.invocationID)),
)
defer span.End()
if len(tables) == 0 {
return ErrNoTables
}

syncClient := &syncClient{
metrics: &Metrics{TableClient: make(map[string]map[string]*TableClientMetrics)},
tables: tables,
client: client,
scheduler: s,
logger: s.logger,
metrics: &Metrics{TableClient: make(map[string]map[string]*TableClientMetrics)},
tables: tables,
client: client,
scheduler: s,
logger: s.logger,
invocationID: s.invocationID,
}
for _, opt := range opts {
opt(syncClient)
Expand Down Expand Up @@ -276,6 +291,8 @@ func (s *syncClient) resolveResource(ctx context.Context, table *schema.Table, c
atomic.AddUint64(&tableMetrics.Errors, 1)
}
}

tableMetrics.OtelResourcesAdd(ctx, 1)
atomic.AddUint64(&tableMetrics.Resources, 1)
return resource
}
Expand Down
Loading
Loading