Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add processing timings for each step in traces pipeline #318

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/SigNoz/signoz-otel-collector/usage"
"github.com/SigNoz/signoz-otel-collector/utils"
Expand Down Expand Up @@ -416,6 +417,7 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {
case <-s.closeChan:
return errors.New("shutdown has been called")
default:
start := time.Now()
rss := td.ResourceSpans()
var batchOfSpans []*Span
for i := 0; i < rss.Len(); i++ {
Expand All @@ -436,6 +438,7 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {
}
}
}
zap.L().Info("Time taken to convert spans to structured spans", zap.Int64("time", time.Since(start).Milliseconds()))
err := s.Writer.WriteBatchOfSpans(ctx, batchOfSpans)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
Expand Down
19 changes: 19 additions & 0 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewSpanWriter(options WriterOptions) *SpanWriter {
func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error
var prepareStart time.Time = time.Now()

defer func() {
if statement != nil {
Expand Down Expand Up @@ -151,6 +152,8 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er
}
}

w.logger.Info("Time taken to prepare batch for index table", zap.Int64("time", time.Since(prepareStart).Milliseconds()))

start := time.Now()

err = statement.Send()
Expand All @@ -167,6 +170,7 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro
var tagKeyStatement driver.Batch
var tagStatement driver.Batch
var err error
var prepareStart time.Time = time.Now()

defer func() {
if tagKeyStatement != nil {
Expand Down Expand Up @@ -264,6 +268,8 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro
}
}

w.logger.Info("Time taken to prepare batch for tag/tagKey tables", zap.Int64("time", time.Since(prepareStart).Milliseconds()))

tagStart := time.Now()
err = tagStatement.Send()
stats.RecordWithTags(ctx,
Expand Down Expand Up @@ -298,6 +304,7 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro
func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error
var prepareStart time.Time = time.Now()

defer func() {
if statement != nil {
Expand Down Expand Up @@ -333,6 +340,8 @@ func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) er
}
}

w.logger.Info("Time taken to prepare batch for error table", zap.Int64("time", time.Since(prepareStart).Milliseconds()))

start := time.Now()

err = statement.Send()
Expand All @@ -352,6 +361,7 @@ func stringToBool(s string) bool {
func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error
var marshalStart time.Time = time.Now()

defer func() {
if statement != nil {
Expand Down Expand Up @@ -388,6 +398,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er

usage.AddMetric(metrics, *span.Tenant, 1, int64(len(serializedUsage)))
}
w.logger.Info("Time taken to marshal spans for model batch", zap.Int64("time", time.Since(marshalStart).Milliseconds()))
start := time.Now()

err = statement.Send()
Expand All @@ -408,30 +419,38 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er

// WriteBatchOfSpans writes the encoded batch of spans
func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error {
start := time.Now()
if w.spansTable != "" {
if err := w.writeModelBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to model table", zap.Int64("time", time.Since(start).Milliseconds()))
start = time.Now()
if w.indexTable != "" {
if err := w.writeIndexBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to index table", zap.Int64("time", time.Since(start).Milliseconds()))
start = time.Now()
if w.errorTable != "" {
if err := w.writeErrorBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to error table: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to error table", zap.Int64("time", time.Since(start).Milliseconds()))
start = time.Now()
if w.attributeTable != "" && w.attributeKeyTable != "" {
if err := w.writeTagBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to tag/tagKey tables", zap.Int64("time", time.Since(start).Milliseconds()))
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions processor/signozspanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,11 @@ func (p *processorImp) Capabilities() consumer.Capabilities {
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter.
// The original input trace data will be forwarded to the next consumer, unmodified.
func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
start := time.Now()
p.lock.Lock()
p.aggregateMetrics(traces)
p.lock.Unlock()
p.logger.Info("Time taken to aggregate metrics", zap.Int64("time", time.Since(start).Milliseconds()))

// Forward trace data unmodified and propagate trace pipeline errors, if any.
return p.tracesConsumer.ConsumeTraces(ctx, traces)
Expand Down
8 changes: 6 additions & 2 deletions receiver/signozkafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers
func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: c.settings.ID,
Transport: transport,
Expand Down Expand Up @@ -275,7 +275,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
// set sarama library's logger to get detailed logs from the library
sarama.Logger = zap.NewStdLog(set.Logger)

c := sarama.NewConfig()
c = setSaramaConsumerConfig(c, &config.SaramaConsumerConfig)
c.ClientID = config.ClientID
Expand Down Expand Up @@ -481,6 +481,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if !c.messageMarking.After {
session.MarkMessage(message, "")
}
c.logger.Info("Time taken to claim message", zap.Int64("time", time.Since(start).Milliseconds()))

ctx := c.obsrecv.StartTracesOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
Expand All @@ -499,7 +500,9 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
}

spanCount := traces.SpanCount()
c.logger.Info("Time taken to unmarshal traces message", zap.Int64("time", time.Since(start).Milliseconds()), zap.Int("spanCount", spanCount))
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.logger.Info("Time taken to consume traces message", zap.Int64("time", time.Since(start).Milliseconds()))
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
c.logger.Error("kafka receiver: failed to export traces", zap.Error(err), zap.Int32("partition", claim.Partition()), zap.String("topic", claim.Topic()))
Expand All @@ -514,6 +517,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if !c.autocommitEnabled {
session.Commit()
}
c.logger.Info("Time taken to process traces message", zap.Int64("time", time.Since(start).Milliseconds()))
err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds()))
if err != nil {
c.logger.Error("failed to record processing time", zap.Error(err))
Expand Down
Loading