Skip to content

Commit

Permalink
[CI Visibility] CI Test Visibility integration (#2736) (#2774)
Browse files Browse the repository at this point in the history
Co-authored-by: Tony Redondo <tonyredondo@gmail.com>
Co-authored-by: Manuel Palenzuela Merino <manuel.palenzuela@datadoghq.com>
Co-authored-by: Dario Castañé <dario.castane@datadoghq.com>
  • Loading branch information
4 people committed Jul 8, 2024
1 parent fe89caa commit dc5c6a9
Show file tree
Hide file tree
Showing 62 changed files with 16,370 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ go.sum
/datastreams @Datadog/data-streams-monitoring
/internal/datastreams @Datadog/data-streams-monitoring

# civisibility
/internal/civisibility @DataDog/ci-app-libraries

# Gitlab configuration
.gitlab-ci.yml @DataDog/dd-trace-go-guild @DataDog/apm-core-reliability-and-performance
/.gitlab-ci @DataDog/dd-trace-go-guild @DataDog/apm-core-reliability-and-performance
117 changes: 117 additions & 0 deletions ddtrace/tracer/civisibility_payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracer

import (
"bytes"
"sync/atomic"

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)

// ciVisibilityPayload represents a payload specifically designed for CI Visibility events.
// It embeds the generic payload structure and adds methods to handle CI Visibility specific data.
type ciVisibilityPayload struct {
*payload
}

// push adds a new CI Visibility event to the payload buffer.
// It grows the buffer to accommodate the new event, encodes the event in MessagePack format, and updates the event count.
//
// Parameters:
//
// event - The CI Visibility event to be added to the payload.
//
// Returns:
//
// An error if encoding the event fails.
func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
p.buf.Grow(event.Msgsize())
if err := msgp.Encode(&p.buf, event); err != nil {
return err
}
atomic.AddUint32(&p.count, 1)
p.updateHeader()
return nil
}

// newCiVisibilityPayload creates a new instance of civisibilitypayload.
//
// Returns:
//
// A pointer to a newly initialized civisibilitypayload instance.
func newCiVisibilityPayload() *ciVisibilityPayload {
return &ciVisibilityPayload{newPayload()}
}

// getBuffer retrieves the complete body of the CI Visibility payload, including metadata.
// It reads the current payload buffer, adds metadata, and encodes the entire payload in MessagePack format.
//
// Parameters:
//
// config - A pointer to the config structure containing environment settings.
//
// Returns:
//
// A pointer to a bytes.Buffer containing the encoded CI Visibility payload.
// An error if reading from the buffer or encoding the payload fails.
func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {

/*
The Payload format in the CI Visibility protocol is like this:
{
"version": 1,
"metadata": {
"*": {
"runtime-id": "...",
"language": "...",
"library_version": "...",
"env": "..."
}
},
"events": [
// ...
]
}
The event format can be found in the `civisibility_tslv.go` file in the ciVisibilityEvent documentation
*/

// Create a buffer to read the current payload
payloadBuf := new(bytes.Buffer)
if _, err := payloadBuf.ReadFrom(p.payload); err != nil {
return nil, err
}

// Create the metadata map
allMetadata := map[string]string{
"language": "go",
"runtime-id": globalconfig.RuntimeID(),
"library_version": version.Tag,
}
if config.env != "" {
allMetadata["env"] = config.env
}

// Create the visibility payload
visibilityPayload := ciTestCyclePayload{
Version: 1,
Metadata: map[string]map[string]string{
"*": allMetadata,
},
Events: payloadBuf.Bytes(),
}

// Create a new buffer to encode the visibility payload in MessagePack format
encodedBuf := new(bytes.Buffer)
if err := msgp.Encode(encodedBuf, &visibilityPayload); err != nil {
return nil, err
}

return encodedBuf, nil
}
120 changes: 120 additions & 0 deletions ddtrace/tracer/civisibility_payload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracer

import (
"bytes"
"io"
"strconv"
"strings"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/tinylib/msgp/msgp"
)

func newCiVisibilityEventsList(n int) []*ciVisibilityEvent {
list := make([]*ciVisibilityEvent, n)
for i := 0; i < n; i++ {
s := newBasicSpan("span.list." + strconv.Itoa(i%5+1))
s.Start = fixedTime
list[i] = getCiVisibilityEvent(s)
}

return list
}

// TestCiVisibilityPayloadIntegrity tests that whatever we push into the payload
// allows us to read the same content as would have been encoded by
// the codec.
func TestCiVisibilityPayloadIntegrity(t *testing.T) {
want := new(bytes.Buffer)
for _, n := range []int{10, 1 << 10, 1 << 17} {
t.Run(strconv.Itoa(n), func(t *testing.T) {
assert := assert.New(t)
p := newCiVisibilityPayload()
var allEvents ciVisibilityEvents

for i := 0; i < n; i++ {
list := newCiVisibilityEventsList(i%5 + 1)
allEvents = append(allEvents, list...)
for _, event := range list {
p.push(event)
}
}

want.Reset()
err := msgp.Encode(want, allEvents)
assert.NoError(err)
assert.Equal(want.Len(), p.size())
assert.Equal(p.itemCount(), len(allEvents))

got, err := io.ReadAll(p)
assert.NoError(err)
assert.Equal(want.Bytes(), got)
})
}
}

// TestCiVisibilityPayloadDecode ensures that whatever we push into the payload can
// be decoded by the codec.
func TestCiVisibilityPayloadDecode(t *testing.T) {
assert := assert.New(t)
for _, n := range []int{10, 1 << 10} {
t.Run(strconv.Itoa(n), func(t *testing.T) {
p := newCiVisibilityPayload()
for i := 0; i < n; i++ {
list := newCiVisibilityEventsList(i%5 + 1)
for _, event := range list {
p.push(event)
}
}
var got ciVisibilityEvents
err := msgp.Decode(p, &got)
assert.NoError(err)
})
}
}

func BenchmarkCiVisibilityPayloadThroughput(b *testing.B) {
b.Run("10K", benchmarkCiVisibilityPayloadThroughput(1))
b.Run("100K", benchmarkCiVisibilityPayloadThroughput(10))
b.Run("1MB", benchmarkCiVisibilityPayloadThroughput(100))
}

// benchmarkCiVisibilityPayloadThroughput benchmarks the throughput of the payload by subsequently
// pushing a list of civisibility events containing count spans of approximately 10KB in size each, until the
// payload is filled.
func benchmarkCiVisibilityPayloadThroughput(count int) func(*testing.B) {
return func(b *testing.B) {
p := newCiVisibilityPayload()
s := newBasicSpan("X")
s.Meta["key"] = strings.Repeat("X", 10*1024)
e := getCiVisibilityEvent(s)
events := make(ciVisibilityEvents, count)
for i := 0; i < count; i++ {
events[i] = e
}

b.ReportAllocs()
b.ResetTimer()
reset := func() {
p.header = make([]byte, 8)
p.off = 8
atomic.StoreUint32(&p.count, 0)
p.buf.Reset()
}
for i := 0; i < b.N; i++ {
reset()
for _, event := range events {
for p.size() < payloadMaxLimit {
p.push(event)
}
}
}
}
}
Loading

0 comments on commit dc5c6a9

Please sign in to comment.