diff --git a/internal/datastreams/fast_queue.go b/internal/datastreams/fast_queue.go new file mode 100644 index 0000000000..7218859461 --- /dev/null +++ b/internal/datastreams/fast_queue.go @@ -0,0 +1,63 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "sync/atomic" + "time" +) + +const ( + queueSize = 10000 +) + +// there are many writers, there is only 1 reader. +// each value will be read at most once. +// reader will stop if it catches up with writer +// if reader is too slow, there is no guarantee in which order values will be dropped. +type fastQueue struct { + elements [queueSize]atomic.Pointer[processorInput] + writePos int64 + readPos int64 +} + +func newFastQueue() *fastQueue { + return &fastQueue{} +} + +func (q *fastQueue) push(p *processorInput) { + ind := atomic.AddInt64(&q.writePos, 1) + p.queuePos = ind - 1 + q.elements[(ind-1)%queueSize].Store(p) +} + +func (q *fastQueue) pop() *processorInput { + writePos := atomic.LoadInt64(&q.writePos) + if writePos <= q.readPos { + return nil + } + loaded := q.elements[q.readPos%queueSize].Load() + if loaded == nil || loaded.queuePos < q.readPos { + // the write started, but hasn't finished yet, the element we read + // is the one from the previous cycle. + return nil + } + q.readPos++ + return loaded +} + +func (q *fastQueue) poll(timeout time.Duration) *processorInput { + deadline := time.Now().Add(timeout) + for { + if p := q.pop(); p != nil { + return p + } + if time.Now().After(deadline) { + return nil + } + time.Sleep(10 * time.Millisecond) + } +} diff --git a/internal/datastreams/fast_queue_test.go b/internal/datastreams/fast_queue_test.go new file mode 100644 index 0000000000..27efb50c5a --- /dev/null +++ b/internal/datastreams/fast_queue_test.go @@ -0,0 +1,27 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestFastQueue(t *testing.T) { + q := newFastQueue() + q.push(&processorInput{point: statsPoint{hash: 1}}) + q.push(&processorInput{point: statsPoint{hash: 2}}) + q.push(&processorInput{point: statsPoint{hash: 3}}) + assert.Equal(t, uint64(1), q.pop().point.hash) + assert.Equal(t, uint64(2), q.pop().point.hash) + q.push(&processorInput{point: statsPoint{hash: 4}}) + assert.Equal(t, uint64(3), q.pop().point.hash) + assert.Equal(t, uint64(4), q.pop().point.hash) + for i := 0; i < queueSize; i++ { + q.push(&processorInput{point: statsPoint{hash: uint64(i)}}) + assert.Equal(t, uint64(i), q.pop().point.hash) + } +} diff --git a/internal/datastreams/hash_cache.go b/internal/datastreams/hash_cache.go new file mode 100644 index 0000000000..2c5f047305 --- /dev/null +++ b/internal/datastreams/hash_cache.go @@ -0,0 +1,70 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "strings" + "sync" +) + +const ( + maxHashCacheSize = 1000 +) + +type hashCache struct { + mu sync.RWMutex + m map[string]uint64 +} + +func getHashKey(edgeTags []string, parentHash uint64) string { + var s strings.Builder + l := 0 + for _, t := range edgeTags { + l += len(t) + } + l += 8 + s.Grow(l) + for _, t := range edgeTags { + s.WriteString(t) + } + s.WriteByte(byte(parentHash)) + s.WriteByte(byte(parentHash >> 8)) + s.WriteByte(byte(parentHash >> 16)) + s.WriteByte(byte(parentHash >> 24)) + s.WriteByte(byte(parentHash >> 32)) + s.WriteByte(byte(parentHash >> 40)) + s.WriteByte(byte(parentHash >> 48)) + s.WriteByte(byte(parentHash >> 56)) + return s.String() +} + +func (c *hashCache) computeAndGet(key string, parentHash uint64, service, env string, edgeTags []string) uint64 { + hash := pathwayHash(nodeHash(service, env, edgeTags), parentHash) + c.mu.Lock() + defer c.mu.Unlock() + if len(c.m) >= maxHashCacheSize { + // high cardinality of hashes shouldn't happen in practice, due to a limited amount of topics consumed + // by each service. + c.m = make(map[string]uint64) + } + c.m[key] = hash + return hash +} + +func (c *hashCache) get(service, env string, edgeTags []string, parentHash uint64) uint64 { + key := getHashKey(edgeTags, parentHash) + c.mu.RLock() + if hash, ok := c.m[key]; ok { + c.mu.RUnlock() + return hash + } + c.mu.RUnlock() + return c.computeAndGet(key, parentHash, service, env, edgeTags) +} + +func newHashCache() *hashCache { + return &hashCache{m: make(map[string]uint64)} +} diff --git a/internal/datastreams/hash_cache_test.go b/internal/datastreams/hash_cache_test.go new file mode 100644 index 0000000000..797decd77e --- /dev/null +++ b/internal/datastreams/hash_cache_test.go @@ -0,0 +1,31 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHashCache(t *testing.T) { + cache := newHashCache() + assert.Equal(t, pathwayHash(nodeHash("service", "env", []string{"type:kafka"}), 1234), cache.get("service", "env", []string{"type:kafka"}, 1234)) + assert.Len(t, cache.m, 1) + assert.Equal(t, pathwayHash(nodeHash("service", "env", []string{"type:kafka"}), 1234), cache.get("service", "env", []string{"type:kafka"}, 1234)) + assert.Len(t, cache.m, 1) + assert.Equal(t, pathwayHash(nodeHash("service", "env", []string{"type:kafka2"}), 1234), cache.get("service", "env", []string{"type:kafka2"}, 1234)) + assert.Len(t, cache.m, 2) +} + +func TestGetHashKey(t *testing.T) { + parentHash := uint64(87234) + key := getHashKey([]string{"type:kafka", "topic:topic1", "group:group1"}, parentHash) + hash := make([]byte, 8) + binary.LittleEndian.PutUint64(hash, parentHash) + assert.Equal(t, "type:kafkatopic:topic1group:group1"+string(hash), key) +} diff --git a/internal/datastreams/pathway_test.go b/internal/datastreams/pathway_test.go index eefaf6f58d..eee60be131 100644 --- a/internal/datastreams/pathway_test.go +++ b/internal/datastreams/pathway_test.go @@ -18,8 +18,9 @@ func TestPathway(t *testing.T) { t.Run("test SetCheckpoint", func(t *testing.T) { start := time.Now() processor := Processor{ + hashCache: newHashCache(), stopped: 1, - in: make(chan statsPoint, 10), + in: newFastQueue(), service: "service-1", env: "env", timeSource: func() time.Time { return start }, @@ -27,13 +28,13 @@ func TestPathway(t *testing.T) { ctx := processor.SetCheckpoint(context.Background()) middle := start.Add(time.Hour) processor.timeSource = func() time.Time { return middle } - ctx = processor.SetCheckpoint(ctx, "edge-1") + ctx = processor.SetCheckpoint(ctx, "topic:topic1") end := middle.Add(time.Hour) processor.timeSource = func() time.Time { return end } - ctx = processor.SetCheckpoint(ctx, "edge-2") + ctx = processor.SetCheckpoint(ctx, "topic:topic2") hash1 := pathwayHash(nodeHash("service-1", "env", nil), 0) - hash2 := pathwayHash(nodeHash("service-1", "env", []string{"edge-1"}), hash1) - hash3 := pathwayHash(nodeHash("service-1", "env", []string{"edge-2"}), hash2) + hash2 := pathwayHash(nodeHash("service-1", "env", []string{"topic:topic1"}), hash1) + hash3 := pathwayHash(nodeHash("service-1", "env", []string{"topic:topic2"}), hash2) p, _ := PathwayFromContext(ctx) assert.Equal(t, hash3, p.GetHash()) assert.Equal(t, start, p.PathwayStart()) @@ -45,29 +46,30 @@ func TestPathway(t *testing.T) { timestamp: start.UnixNano(), pathwayLatency: 0, edgeLatency: 0, - }, <-processor.in) + }, processor.in.poll(time.Second).point) assert.Equal(t, statsPoint{ - edgeTags: []string{"edge-1"}, + edgeTags: []string{"topic:topic1"}, hash: hash2, parentHash: hash1, timestamp: middle.UnixNano(), pathwayLatency: middle.Sub(start).Nanoseconds(), edgeLatency: middle.Sub(start).Nanoseconds(), - }, <-processor.in) + }, processor.in.poll(time.Second).point) assert.Equal(t, statsPoint{ - edgeTags: []string{"edge-2"}, + edgeTags: []string{"topic:topic2"}, hash: hash3, parentHash: hash2, timestamp: end.UnixNano(), pathwayLatency: end.Sub(start).Nanoseconds(), edgeLatency: end.Sub(middle).Nanoseconds(), - }, <-processor.in) + }, processor.in.poll(time.Second).point) }) t.Run("test new pathway creation", func(t *testing.T) { processor := Processor{ + hashCache: newHashCache(), stopped: 1, - in: make(chan statsPoint, 10), + in: newFastQueue(), service: "service-1", env: "env", timeSource: time.Now, @@ -84,9 +86,9 @@ func TestPathway(t *testing.T) { assert.Equal(t, hash2, pathwayWith1EdgeTag.GetHash()) assert.Equal(t, hash3, pathwayWith2EdgeTags.GetHash()) - var statsPointWithNoEdgeTags = <-processor.in - var statsPointWith1EdgeTag = <-processor.in - var statsPointWith2EdgeTags = <-processor.in + var statsPointWithNoEdgeTags = processor.in.poll(time.Second).point + var statsPointWith1EdgeTag = processor.in.poll(time.Second).point + var statsPointWith2EdgeTags = processor.in.poll(time.Second).point assert.Equal(t, hash1, statsPointWithNoEdgeTags.hash) assert.Equal(t, []string(nil), statsPointWithNoEdgeTags.edgeTags) assert.Equal(t, hash2, statsPointWith1EdgeTag.hash) diff --git a/internal/datastreams/processor.go b/internal/datastreams/processor.go index 1114f10564..9cec202199 100644 --- a/internal/datastreams/processor.go +++ b/internal/datastreams/processor.go @@ -116,6 +116,20 @@ func (b bucket) export(timestampType TimestampType) StatsBucket { return exported } +type pointType int + +const ( + pointTypeStats pointType = iota + pointTypeKafkaOffset +) + +type processorInput struct { + point statsPoint + kafkaOffset kafkaOffset + typ pointType + queuePos int64 +} + type processorStats struct { payloadsIn int64 flushedPayloads int64 @@ -152,7 +166,8 @@ type kafkaOffset struct { } type Processor struct { - in chan statsPoint + in *fastQueue + hashCache *hashCache inKafka chan kafkaOffset tsTypeCurrentBuckets map[int64]bucket tsTypeOriginBuckets map[int64]bucket @@ -187,8 +202,7 @@ func NewProcessor(statsd internal.StatsdClient, env, service, version string, ag p := &Processor{ tsTypeCurrentBuckets: make(map[int64]bucket), tsTypeOriginBuckets: make(map[int64]bucket), - in: make(chan statsPoint, 10000), - inKafka: make(chan kafkaOffset, 10000), + in: newFastQueue(), stopped: 1, statsd: statsd, env: env, @@ -267,11 +281,6 @@ func (p *Processor) addKafkaOffset(o kafkaOffset) { func (p *Processor) run(tick <-chan time.Time) { for { select { - case s := <-p.in: - atomic.AddInt64(&p.stats.payloadsIn, 1) - p.add(s) - case o := <-p.inKafka: - p.addKafkaOffset(o) case now := <-tick: p.sendToAgent(p.flush(now)) case done := <-p.flushRequest: @@ -281,6 +290,18 @@ func (p *Processor) run(tick <-chan time.Time) { // drop in flight payloads on the input channel p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) return + default: + s := p.in.pop() + if s == nil { + time.Sleep(time.Millisecond * 10) + continue + } + atomic.AddInt64(&p.stats.payloadsIn, 1) + if s.typ == pointTypeStats { + p.add(s.point) + } else if s.typ == pointTypeKafkaOffset { + p.addKafkaOffset(s.kafkaOffset) + } } } } @@ -397,12 +418,11 @@ func (p *Processor) SetCheckpointWithParams(ctx context.Context, params options. parentHash = parent.GetHash() } child := Pathway{ - hash: pathwayHash(nodeHash(p.service, p.env, edgeTags), parentHash), + hash: p.hashCache.get(p.service, p.env, edgeTags, parentHash), pathwayStart: pathwayStart, edgeStart: now, } - select { - case p.in <- statsPoint{ + p.in.push(&processorInput{typ: pointTypeStats, point: statsPoint{ edgeTags: edgeTags, parentHash: parentHash, hash: child.hash, @@ -410,40 +430,28 @@ func (p *Processor) SetCheckpointWithParams(ctx context.Context, params options. pathwayLatency: now.Sub(pathwayStart).Nanoseconds(), edgeLatency: now.Sub(edgeStart).Nanoseconds(), payloadSize: params.PayloadSize, - }: - default: - atomic.AddInt64(&p.stats.dropped, 1) - } + }}) return ContextWithPathway(ctx, child) } func (p *Processor) TrackKafkaCommitOffset(group string, topic string, partition int32, offset int64) { - select { - case p.inKafka <- kafkaOffset{ + p.in.push(&processorInput{typ: pointTypeKafkaOffset, kafkaOffset: kafkaOffset{ offset: offset, group: group, topic: topic, partition: partition, offsetType: commitOffset, - timestamp: p.time().UnixNano(), - }: - default: - atomic.AddInt64(&p.stats.dropped, 1) - } + timestamp: p.time().UnixNano()}}) } func (p *Processor) TrackKafkaProduceOffset(topic string, partition int32, offset int64) { - select { - case p.inKafka <- kafkaOffset{ + p.in.push(&processorInput{typ: pointTypeKafkaOffset, kafkaOffset: kafkaOffset{ offset: offset, topic: topic, partition: partition, offsetType: produceOffset, timestamp: p.time().UnixNano(), - }: - default: - atomic.AddInt64(&p.stats.dropped, 1) - } + }}) } func (p *Processor) runLoadAgentFeatures(tick <-chan time.Time) { diff --git a/internal/datastreams/processor_test.go b/internal/datastreams/processor_test.go index d7d9798ebd..b2166a201e 100644 --- a/internal/datastreams/processor_test.go +++ b/internal/datastreams/processor_test.go @@ -7,6 +7,8 @@ package datastreams import ( "context" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "net/http" "net/url" "sort" "strings" @@ -15,6 +17,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/version" + "github.com/DataDog/datadog-go/v5/statsd" "github.com/DataDog/sketches-go/ddsketch" "github.com/DataDog/sketches-go/ddsketch/store" "github.com/golang/protobuf/proto" @@ -186,8 +189,9 @@ func TestProcessor(t *testing.T) { func TestSetCheckpoint(t *testing.T) { processor := Processor{ + hashCache: newHashCache(), stopped: 1, - in: make(chan statsPoint, 10), + in: newFastQueue(), service: "service-1", env: "env", timeSource: time.Now, @@ -198,8 +202,8 @@ func TestSetCheckpoint(t *testing.T) { ctx := processor.SetCheckpoint(context.Background(), "direction:in", "type:kafka") pathway, _ := PathwayFromContext(processor.SetCheckpoint(ctx, "direction:out", "type:kafka")) - statsPt1 := <-processor.in - statsPt2 := <-processor.in + statsPt1 := processor.in.pop().point + statsPt2 := processor.in.pop().point assert.Equal(t, []string{"direction:in", "type:kafka"}, statsPt1.edgeTags) assert.Equal(t, hash1, statsPt1.hash) @@ -239,3 +243,31 @@ func TestKafkaLag(t *testing.T) { } assert.Equal(t, expectedBacklogs, point.Stats[0].Backlogs) } + +type noOpTransport struct{} + +// RoundTrip does nothing and returns a dummy response. +func (t *noOpTransport) RoundTrip(req *http.Request) (*http.Response, error) { + // You can customize the dummy response if needed. + return &http.Response{ + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Request: req, + ContentLength: -1, + Body: http.NoBody, + }, nil +} + +func BenchmarkSetCheckpoint(b *testing.B) { + client := &http.Client{ + Transport: &noOpTransport{}, + } + p := NewProcessor(&statsd.NoOpClient{}, "env", "service", "v1", &url.URL{Scheme: "http", Host: "agent-address"}, client, func() bool { return true }) + p.Start() + for i := 0; i < b.N; i++ { + p.SetCheckpointWithParams(context.Background(), options.CheckpointParams{PayloadSize: 1000}, "type:edge-1", "direction:in", "type:kafka", "topic:topic1", "group:group1") + } + p.Stop() +}