Skip to content

Commit

Permalink
internal/datastreams: Improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Dec 20, 2023
1 parent 4ae528b commit 4221d83
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 45 deletions.
63 changes: 63 additions & 0 deletions internal/datastreams/fast_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package datastreams

import (
"sync/atomic"
"time"
)

const (
queueSize = 10000
)

// there are many writers, there is only 1 reader.
// each value will be read at most once.
// reader will stop if it catches up with writer
// if reader is too slow, there is no guarantee in which order values will be dropped.
type fastQueue struct {
elements [queueSize]atomic.Pointer[processorInput]
writePos int64
readPos int64
}

func newFastQueue() *fastQueue {
return &fastQueue{}
}

func (q *fastQueue) push(p *processorInput) {
ind := atomic.AddInt64(&q.writePos, 1)
p.queuePos = ind - 1
q.elements[(ind-1)%queueSize].Store(p)
}

func (q *fastQueue) pop() *processorInput {
writePos := atomic.LoadInt64(&q.writePos)
if writePos <= q.readPos {
return nil
}
loaded := q.elements[q.readPos%queueSize].Load()
if loaded == nil || loaded.queuePos < q.readPos {
// the write started, but hasn't finished yet, the element we read
// is the one from the previous cycle.
return nil
}
q.readPos++
return loaded
}

func (q *fastQueue) poll(timeout time.Duration) *processorInput {
deadline := time.Now().Add(timeout)
for {
if p := q.pop(); p != nil {
return p
}
if time.Now().After(deadline) {
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
27 changes: 27 additions & 0 deletions internal/datastreams/fast_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package datastreams

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestFastQueue(t *testing.T) {
q := newFastQueue()
q.push(&processorInput{point: statsPoint{hash: 1}})
q.push(&processorInput{point: statsPoint{hash: 2}})
q.push(&processorInput{point: statsPoint{hash: 3}})
assert.Equal(t, uint64(1), q.pop().point.hash)
assert.Equal(t, uint64(2), q.pop().point.hash)
q.push(&processorInput{point: statsPoint{hash: 4}})
assert.Equal(t, uint64(3), q.pop().point.hash)
assert.Equal(t, uint64(4), q.pop().point.hash)
for i := 0; i < queueSize; i++ {
q.push(&processorInput{point: statsPoint{hash: uint64(i)}})
assert.Equal(t, uint64(i), q.pop().point.hash)
}
}
70 changes: 70 additions & 0 deletions internal/datastreams/hash_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package datastreams

import (
"strings"
"sync"
)

const (
maxHashCacheSize = 1000
)

type hashCache struct {
mu sync.RWMutex
m map[string]uint64
}

func getHashKey(edgeTags []string, parentHash uint64) string {
var s strings.Builder
l := 0
for _, t := range edgeTags {
l += len(t)
}
l += 8
s.Grow(l)
for _, t := range edgeTags {
s.WriteString(t)
}
s.WriteByte(byte(parentHash))
s.WriteByte(byte(parentHash >> 8))
s.WriteByte(byte(parentHash >> 16))
s.WriteByte(byte(parentHash >> 24))
s.WriteByte(byte(parentHash >> 32))
s.WriteByte(byte(parentHash >> 40))
s.WriteByte(byte(parentHash >> 48))
s.WriteByte(byte(parentHash >> 56))
return s.String()
}

func (c *hashCache) computeAndGet(key string, parentHash uint64, service, env string, edgeTags []string) uint64 {
hash := pathwayHash(nodeHash(service, env, edgeTags), parentHash)
c.mu.Lock()
defer c.mu.Unlock()
if len(c.m) >= maxHashCacheSize {
// high cardinality of hashes shouldn't happen in practice, due to a limited amount of topics consumed
// by each service.
c.m = make(map[string]uint64)
}
c.m[key] = hash
return hash
}

func (c *hashCache) get(service, env string, edgeTags []string, parentHash uint64) uint64 {
key := getHashKey(edgeTags, parentHash)
c.mu.RLock()
if hash, ok := c.m[key]; ok {
c.mu.RUnlock()
return hash
}
c.mu.RUnlock()
return c.computeAndGet(key, parentHash, service, env, edgeTags)
}

func newHashCache() *hashCache {
return &hashCache{m: make(map[string]uint64)}
}
31 changes: 31 additions & 0 deletions internal/datastreams/hash_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package datastreams

import (
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
)

func TestHashCache(t *testing.T) {
cache := newHashCache()
assert.Equal(t, pathwayHash(nodeHash("service", "env", []string{"type:kafka"}), 1234), cache.get("service", "env", []string{"type:kafka"}, 1234))
assert.Len(t, cache.m, 1)
assert.Equal(t, pathwayHash(nodeHash("service", "env", []string{"type:kafka"}), 1234), cache.get("service", "env", []string{"type:kafka"}, 1234))
assert.Len(t, cache.m, 1)
assert.Equal(t, pathwayHash(nodeHash("service", "env", []string{"type:kafka2"}), 1234), cache.get("service", "env", []string{"type:kafka2"}, 1234))
assert.Len(t, cache.m, 2)
}

func TestGetHashKey(t *testing.T) {
parentHash := uint64(87234)
key := getHashKey([]string{"type:kafka", "topic:topic1", "group:group1"}, parentHash)
hash := make([]byte, 8)
binary.LittleEndian.PutUint64(hash, parentHash)
assert.Equal(t, "type:kafkatopic:topic1group:group1"+string(hash), key)
}
30 changes: 16 additions & 14 deletions internal/datastreams/pathway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@ func TestPathway(t *testing.T) {
t.Run("test SetCheckpoint", func(t *testing.T) {
start := time.Now()
processor := Processor{
hashCache: newHashCache(),
stopped: 1,
in: make(chan statsPoint, 10),
in: newFastQueue(),
service: "service-1",
env: "env",
timeSource: func() time.Time { return start },
}
ctx := processor.SetCheckpoint(context.Background())
middle := start.Add(time.Hour)
processor.timeSource = func() time.Time { return middle }
ctx = processor.SetCheckpoint(ctx, "edge-1")
ctx = processor.SetCheckpoint(ctx, "topic:topic1")
end := middle.Add(time.Hour)
processor.timeSource = func() time.Time { return end }
ctx = processor.SetCheckpoint(ctx, "edge-2")
ctx = processor.SetCheckpoint(ctx, "topic:topic2")
hash1 := pathwayHash(nodeHash("service-1", "env", nil), 0)
hash2 := pathwayHash(nodeHash("service-1", "env", []string{"edge-1"}), hash1)
hash3 := pathwayHash(nodeHash("service-1", "env", []string{"edge-2"}), hash2)
hash2 := pathwayHash(nodeHash("service-1", "env", []string{"topic:topic1"}), hash1)
hash3 := pathwayHash(nodeHash("service-1", "env", []string{"topic:topic2"}), hash2)
p, _ := PathwayFromContext(ctx)
assert.Equal(t, hash3, p.GetHash())
assert.Equal(t, start, p.PathwayStart())
Expand All @@ -45,29 +46,30 @@ func TestPathway(t *testing.T) {
timestamp: start.UnixNano(),
pathwayLatency: 0,
edgeLatency: 0,
}, <-processor.in)
}, processor.in.poll(time.Second).point)
assert.Equal(t, statsPoint{
edgeTags: []string{"edge-1"},
edgeTags: []string{"topic:topic1"},
hash: hash2,
parentHash: hash1,
timestamp: middle.UnixNano(),
pathwayLatency: middle.Sub(start).Nanoseconds(),
edgeLatency: middle.Sub(start).Nanoseconds(),
}, <-processor.in)
}, processor.in.poll(time.Second).point)
assert.Equal(t, statsPoint{
edgeTags: []string{"edge-2"},
edgeTags: []string{"topic:topic2"},
hash: hash3,
parentHash: hash2,
timestamp: end.UnixNano(),
pathwayLatency: end.Sub(start).Nanoseconds(),
edgeLatency: end.Sub(middle).Nanoseconds(),
}, <-processor.in)
}, processor.in.poll(time.Second).point)
})

t.Run("test new pathway creation", func(t *testing.T) {
processor := Processor{
hashCache: newHashCache(),
stopped: 1,
in: make(chan statsPoint, 10),
in: newFastQueue(),
service: "service-1",
env: "env",
timeSource: time.Now,
Expand All @@ -84,9 +86,9 @@ func TestPathway(t *testing.T) {
assert.Equal(t, hash2, pathwayWith1EdgeTag.GetHash())
assert.Equal(t, hash3, pathwayWith2EdgeTags.GetHash())

var statsPointWithNoEdgeTags = <-processor.in
var statsPointWith1EdgeTag = <-processor.in
var statsPointWith2EdgeTags = <-processor.in
var statsPointWithNoEdgeTags = processor.in.poll(time.Second).point
var statsPointWith1EdgeTag = processor.in.poll(time.Second).point
var statsPointWith2EdgeTags = processor.in.poll(time.Second).point
assert.Equal(t, hash1, statsPointWithNoEdgeTags.hash)
assert.Equal(t, []string(nil), statsPointWithNoEdgeTags.edgeTags)
assert.Equal(t, hash2, statsPointWith1EdgeTag.hash)
Expand Down

0 comments on commit 4221d83

Please sign in to comment.