Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

write: fix test races and enable CI -race flag #500

Merged
merged 1 commit into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ci:
GOOS=windows go build ./cmd/trace-agent # ensure windows builds
go get -u golang.org/x/lint/golint
golint -set_exit_status=1 ./cmd/trace-agent ./filters ./api ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil
go test -v ./...
go test -v -race ./...

windows:
# pre-packages resources needed for the windows release
Expand Down
2 changes: 1 addition & 1 deletion config/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestFullYamlConfig(t *testing.T) {
assert.Equal(0.5, c.ExtraSampleRate)
assert.Equal(5.0, c.MaxTPS)
assert.Equal("0.0.0.0", c.ReceiverHost)
assert.EqualValues([]*Endpoint{
assert.ElementsMatch([]*Endpoint{
{Host: "https://datadog.unittests", APIKey: "api_key_test"},
{Host: "https://my1.endpoint.com", APIKey: "apikey1"},
{Host: "https://my1.endpoint.com", APIKey: "apikey2"},
Expand Down
43 changes: 27 additions & 16 deletions testutil/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,48 @@ type GaugeSummary struct {

// TestStatsClient is a mocked StatsClient that records all calls and replies with configurable error return values.
type TestStatsClient struct {
gaugeLock sync.Mutex
GaugeErr error
GaugeCalls []StatsClientGaugeArgs

countLock sync.RWMutex
CountErr error
CountCalls []StatsClientCountArgs
mu sync.RWMutex

GaugeErr error
GaugeCalls []StatsClientGaugeArgs
CountErr error
CountCalls []StatsClientCountArgs
HistogramErr error
HistogramCalls []StatsClientHistogramArgs
histogramLock sync.Mutex
}

// Reset resets client's internal records.
func (c *TestStatsClient) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.GaugeErr = nil
c.GaugeCalls = c.GaugeCalls[:0]
c.CountErr = nil
c.CountCalls = c.CountCalls[:0]
c.HistogramErr = nil
c.HistogramCalls = c.HistogramCalls[:0]
}

// Gauge records a call to a Gauge operation and replies with GaugeErr
func (c *TestStatsClient) Gauge(name string, value float64, tags []string, rate float64) error {
c.gaugeLock.Lock()
defer c.gaugeLock.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
c.GaugeCalls = append(c.GaugeCalls, StatsClientGaugeArgs{Name: name, Value: value, Tags: tags, Rate: rate})
return c.GaugeErr
}

// Count records a call to a Count operation and replies with CountErr
func (c *TestStatsClient) Count(name string, value int64, tags []string, rate float64) error {
c.countLock.Lock()
defer c.countLock.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
c.CountCalls = append(c.CountCalls, StatsClientCountArgs{Name: name, Value: value, Tags: tags, Rate: rate})
return c.CountErr
}

// Histogram records a call to a Histogram operation and replies with HistogramErr
func (c *TestStatsClient) Histogram(name string, value float64, tags []string, rate float64) error {
c.histogramLock.Lock()
defer c.histogramLock.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
c.HistogramCalls = append(c.HistogramCalls, StatsClientHistogramArgs{Name: name, Value: value, Tags: tags, Rate: rate})
return c.HistogramErr
}
Expand All @@ -85,8 +94,8 @@ func (c *TestStatsClient) Histogram(name string, value float64, tags []string, r
func (c *TestStatsClient) GetCountSummaries() map[string]*CountSummary {
result := map[string]*CountSummary{}

c.countLock.RLock()
defer c.countLock.RUnlock()
c.mu.RLock()
defer c.mu.RUnlock()
for _, countCall := range c.CountCalls {
name := countCall.Name
summary, ok := result[name]
Expand All @@ -107,6 +116,8 @@ func (c *TestStatsClient) GetCountSummaries() map[string]*CountSummary {
func (c *TestStatsClient) GetGaugeSummaries() map[string]*GaugeSummary {
result := map[string]*GaugeSummary{}

c.mu.RLock()
defer c.mu.RUnlock()
for _, gaugeCall := range c.GaugeCalls {
name := gaugeCall.Name
summary, ok := result[name]
Expand Down
13 changes: 13 additions & 0 deletions writer/multi_writer_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
package writer

import (
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/DataDog/datadog-trace-agent/statsd"
"github.com/DataDog/datadog-trace-agent/testutil"
"github.com/DataDog/datadog-trace-agent/writer/config"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
defer func() {
statsd.Client = originalClient
}()
os.Exit(m.Run())
}

func TestNewMultiSenderFactory(t *testing.T) {
cfg := config.DefaultQueuablePayloadSenderConf()

Expand Down
6 changes: 3 additions & 3 deletions writer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,13 +521,13 @@ func TestQueuablePayloadSender_RetryOfTooOldQueue(t *testing.T) {
// Wait for a retry
time.Sleep(200 * time.Millisecond)

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")

// When we stop the sender
queuableSender.Stop()
monitor.Stop()

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")

// Then endpoint should have received only payload3. Because payload1 and payload2 were too old after the failed
// retry (first TriggerTick).
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3")
Expand Down
20 changes: 7 additions & 13 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ func TestServiceWriter_SenderMaxPayloads(t *testing.T) {
assert := assert.New(t)

// Given a service writer
serviceWriter, _, _, _, teardown := testServiceWriter()
defer teardown()
serviceWriter, _, _, _ := testServiceWriter()

// When checking its default sender configuration
queuableSender := serviceWriter.sender.(*QueuablePayloadSender)
Expand All @@ -34,8 +33,7 @@ func TestServiceWriter_ServiceHandling(t *testing.T) {
assert := assert.New(t)

// Given a service writer, its incoming channel and the endpoint that receives the payloads
serviceWriter, serviceChannel, testEndpoint, _, teardown := testServiceWriter()
defer teardown()
serviceWriter, serviceChannel, testEndpoint, _ := testServiceWriter()
serviceWriter.conf.FlushPeriod = 100 * time.Millisecond

serviceWriter.Start()
Expand Down Expand Up @@ -79,8 +77,7 @@ func TestServiceWriter_UpdateInfoHandling(t *testing.T) {
assert := assert.New(t)

// Given a service writer, its incoming channel and the endpoint that receives the payloads
serviceWriter, serviceChannel, testEndpoint, statsClient, teardown := testServiceWriter()
defer teardown()
serviceWriter, serviceChannel, testEndpoint, statsClient := testServiceWriter()
serviceWriter.conf.FlushPeriod = 100 * time.Millisecond
serviceWriter.conf.UpdateInfoPeriod = 100 * time.Millisecond

Expand Down Expand Up @@ -197,19 +194,16 @@ func assertMetadata(assert *assert.Assertions, expectedHeaders map[string]string
assert.Equal(expectedMetadata, servicesMetadata, "Service metadata should match expectation")
}

func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndpoint, *testutil.TestStatsClient, func()) {
func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndpoint, *testutil.TestStatsClient) {
serviceChannel := make(chan model.ServicesMetadata)
conf := &config.AgentConfig{
ServiceWriterConfig: writerconfig.DefaultServiceWriterConfig(),
}
serviceWriter := NewServiceWriter(conf, serviceChannel)
testEndpoint := &testEndpoint{}
serviceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
testStatsClient := statsd.Client.(*testutil.TestStatsClient)
testStatsClient.Reset()

return serviceWriter, serviceChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
}
return serviceWriter, serviceChannel, testEndpoint, testStatsClient
}
26 changes: 9 additions & 17 deletions writer/stats_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func TestStatsWriter_StatHandling(t *testing.T) {
assert := assert.New(t)

// Given a stats writer, its incoming channel and the endpoint that receives the payloads
statsWriter, statsChannel, testEndpoint, _, teardown := testStatsWriter()
defer teardown()
statsWriter, statsChannel, testEndpoint, _ := testStatsWriter()

statsWriter.Start()

Expand Down Expand Up @@ -70,8 +69,7 @@ func TestStatsWriter_UpdateInfoHandling(t *testing.T) {
assert := assert.New(t)

// Given a stats writer, its incoming channel and the endpoint that receives the payloads
statsWriter, statsChannel, testEndpoint, statsClient, teardown := testStatsWriter()
defer teardown()
statsWriter, statsChannel, testEndpoint, statsClient := testStatsWriter()
statsWriter.conf.UpdateInfoPeriod = 100 * time.Millisecond

statsWriter.Start()
Expand Down Expand Up @@ -177,8 +175,7 @@ func TestStatsWriter_BuildPayloads(t *testing.T) {
t.Run("common case, no duplicate entries", func(t *testing.T) {
assert := assert.New(t)

sw, _, _, _, teardown := testStatsWriter()
defer teardown()
sw, _, _, _ := testStatsWriter()

// This gives us a total of 45 entries. 3 per span, 5
// spans per stat bucket. Each buckets have the same
Expand Down Expand Up @@ -220,8 +217,7 @@ func TestStatsWriter_BuildPayloads(t *testing.T) {
t.Run("common case, with duplicate entries", func(t *testing.T) {
assert := assert.New(t)

sw, _, _, _, teardown := testStatsWriter()
defer teardown()
sw, _, _, _ := testStatsWriter()

// This gives us a total of 45 entries. 3 per span, 5
// spans per stat bucket. Each buckets have the same
Expand Down Expand Up @@ -284,8 +280,7 @@ func TestStatsWriter_BuildPayloads(t *testing.T) {
t.Run("no need for split", func(t *testing.T) {
assert := assert.New(t)

sw, _, _, _, teardown := testStatsWriter()
defer teardown()
sw, _, _, _ := testStatsWriter()
sw.Start()

// This gives us a tota of 45 entries. 3 per span, 5 spans per
Expand Down Expand Up @@ -390,7 +385,7 @@ func assertStatsPayload(assert *assert.Assertions, headers map[string]string, bu
assert.Equal(buckets, statsPayload.Stats, "Stat buckets should match expectation")
}

func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *testutil.TestStatsClient, func()) {
func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *testutil.TestStatsClient) {
statsChannel := make(chan []model.StatsBucket)
conf := &config.AgentConfig{
Hostname: testHostName,
Expand All @@ -400,11 +395,8 @@ func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *
statsWriter := NewStatsWriter(conf, statsChannel)
testEndpoint := &testEndpoint{}
statsWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
testStatsClient := statsd.Client.(*testutil.TestStatsClient)
testStatsClient.Reset()

return statsWriter, statsChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
}
return statsWriter, statsChannel, testEndpoint, testStatsClient
}
20 changes: 7 additions & 13 deletions writer/trace_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func TestTraceWriter(t *testing.T) {
assert := assert.New(t)

// Create a trace writer, its incoming channel and the endpoint that receives the payloads
traceWriter, traceChannel, testEndpoint, _, teardown := testTraceWriter()
defer teardown()
traceWriter, traceChannel, testEndpoint, _ := testTraceWriter()
// Set a maximum of 4 spans per payload
traceWriter.conf.MaxSpansPerPayload = 4
traceWriter.Start()
Expand Down Expand Up @@ -71,8 +70,7 @@ func TestTraceWriter(t *testing.T) {
testFlushPeriod := 100 * time.Millisecond

// Create a trace writer, its incoming channel and the endpoint that receives the payloads
traceWriter, traceChannel, testEndpoint, _, teardown := testTraceWriter()
defer teardown()
traceWriter, traceChannel, testEndpoint, _ := testTraceWriter()
// Periodically flushing every 100ms
traceWriter.conf.FlushPeriod = testFlushPeriod
traceWriter.Start()
Expand Down Expand Up @@ -107,8 +105,7 @@ func TestTraceWriter(t *testing.T) {
testFlushPeriod := 100 * time.Millisecond

// Create a trace writer, its incoming channel and the endpoint that receives the payloads
traceWriter, traceChannel, testEndpoint, statsClient, teardown := testTraceWriter()
defer teardown()
traceWriter, traceChannel, testEndpoint, statsClient := testTraceWriter()
traceWriter.conf.FlushPeriod = 100 * time.Millisecond
traceWriter.conf.UpdateInfoPeriod = 100 * time.Millisecond
traceWriter.conf.MaxSpansPerPayload = 10
Expand Down Expand Up @@ -339,7 +336,7 @@ func assertPayloads(assert *assert.Assertions, traceWriter *TraceWriter, expecte
}
}

func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testutil.TestStatsClient, func()) {
func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testutil.TestStatsClient) {
payloadChannel := make(chan *SampledTrace)
conf := &config.AgentConfig{
Hostname: testHostName,
Expand All @@ -349,13 +346,10 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut
traceWriter := NewTraceWriter(conf, payloadChannel)
testEndpoint := &testEndpoint{}
traceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
testStatsClient := statsd.Client.(*testutil.TestStatsClient)
testStatsClient.Reset()

return traceWriter, payloadChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
}
return traceWriter, payloadChannel, testEndpoint, testStatsClient
}

func randomSampledTrace(numSpans, numTransactions int) *SampledTrace {
Expand Down