Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
writer: remove statsd.Client race in writer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbr committed Oct 23, 2018
1 parent 8f79a84 commit 863755d
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ci:
GOOS=windows go build ./cmd/trace-agent # ensure windows builds
go get -u golang.org/x/lint/golint
golint -set_exit_status=1 ./cmd/trace-agent ./filters ./api ./testutil ./info ./quantile ./obfuscate ./sampler ./statsd ./watchdog ./writer ./flags ./osutil
go test -v ./...
go test -v -race ./...

windows:
# pre-packages resources needed for the windows release
Expand Down
2 changes: 1 addition & 1 deletion config/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestFullYamlConfig(t *testing.T) {
assert.Equal(0.5, c.ExtraSampleRate)
assert.Equal(5.0, c.MaxTPS)
assert.Equal("0.0.0.0", c.ReceiverHost)
assert.EqualValues([]*Endpoint{
assert.ElementsMatch([]*Endpoint{
{Host: "https://datadog.unittests", APIKey: "api_key_test"},
{Host: "https://my1.endpoint.com", APIKey: "apikey1"},
{Host: "https://my1.endpoint.com", APIKey: "apikey2"},
Expand Down
43 changes: 27 additions & 16 deletions testutil/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,48 @@ type GaugeSummary struct {

// TestStatsClient is a mocked StatsClient that records all calls and replies with configurable error return values.
type TestStatsClient struct {
gaugeLock sync.Mutex
GaugeErr error
GaugeCalls []StatsClientGaugeArgs

countLock sync.RWMutex
CountErr error
CountCalls []StatsClientCountArgs
mu sync.RWMutex

GaugeErr error
GaugeCalls []StatsClientGaugeArgs
CountErr error
CountCalls []StatsClientCountArgs
HistogramErr error
HistogramCalls []StatsClientHistogramArgs
histogramLock sync.Mutex
}

// Reset resets client's internal records.
func (c *TestStatsClient) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.GaugeErr = nil
c.GaugeCalls = c.GaugeCalls[:0]
c.CountErr = nil
c.CountCalls = c.CountCalls[:0]
c.HistogramErr = nil
c.HistogramCalls = c.HistogramCalls[:0]
}

// Gauge records a call to a Gauge operation and replies with GaugeErr
func (c *TestStatsClient) Gauge(name string, value float64, tags []string, rate float64) error {
c.gaugeLock.Lock()
defer c.gaugeLock.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
c.GaugeCalls = append(c.GaugeCalls, StatsClientGaugeArgs{Name: name, Value: value, Tags: tags, Rate: rate})
return c.GaugeErr
}

// Count records a call to a Count operation and replies with CountErr
func (c *TestStatsClient) Count(name string, value int64, tags []string, rate float64) error {
c.countLock.Lock()
defer c.countLock.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
c.CountCalls = append(c.CountCalls, StatsClientCountArgs{Name: name, Value: value, Tags: tags, Rate: rate})
return c.CountErr
}

// Histogram records a call to a Histogram operation and replies with HistogramErr
func (c *TestStatsClient) Histogram(name string, value float64, tags []string, rate float64) error {
c.histogramLock.Lock()
defer c.histogramLock.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
c.HistogramCalls = append(c.HistogramCalls, StatsClientHistogramArgs{Name: name, Value: value, Tags: tags, Rate: rate})
return c.HistogramErr
}
Expand All @@ -85,8 +94,8 @@ func (c *TestStatsClient) Histogram(name string, value float64, tags []string, r
func (c *TestStatsClient) GetCountSummaries() map[string]*CountSummary {
result := map[string]*CountSummary{}

c.countLock.RLock()
defer c.countLock.RUnlock()
c.mu.RLock()
defer c.mu.RUnlock()
for _, countCall := range c.CountCalls {
name := countCall.Name
summary, ok := result[name]
Expand All @@ -107,6 +116,8 @@ func (c *TestStatsClient) GetCountSummaries() map[string]*CountSummary {
func (c *TestStatsClient) GetGaugeSummaries() map[string]*GaugeSummary {
result := map[string]*GaugeSummary{}

c.mu.RLock()
defer c.mu.RUnlock()
for _, gaugeCall := range c.GaugeCalls {
name := gaugeCall.Name
summary, ok := result[name]
Expand Down
13 changes: 13 additions & 0 deletions writer/multi_writer_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
package writer

import (
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/DataDog/datadog-trace-agent/statsd"
"github.com/DataDog/datadog-trace-agent/testutil"
"github.com/DataDog/datadog-trace-agent/writer/config"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
defer func() {
statsd.Client = originalClient
}()
os.Exit(m.Run())
}

func TestNewMultiSenderFactory(t *testing.T) {
cfg := config.DefaultQueuablePayloadSenderConf()

Expand Down
6 changes: 3 additions & 3 deletions writer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,13 +521,13 @@ func TestQueuablePayloadSender_RetryOfTooOldQueue(t *testing.T) {
// Wait for a retry
time.Sleep(200 * time.Millisecond)

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")

// When we stop the sender
queuableSender.Stop()
monitor.Stop()

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")

// Then endpoint should have received only payload3. Because payload1 and payload2 were too old after the failed
// retry (first TriggerTick).
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3")
Expand Down
20 changes: 7 additions & 13 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ func TestServiceWriter_SenderMaxPayloads(t *testing.T) {
assert := assert.New(t)

// Given a service writer
serviceWriter, _, _, _, teardown := testServiceWriter()
defer teardown()
serviceWriter, _, _, _ := testServiceWriter()

// When checking its default sender configuration
queuableSender := serviceWriter.sender.(*QueuablePayloadSender)
Expand All @@ -34,8 +33,7 @@ func TestServiceWriter_ServiceHandling(t *testing.T) {
assert := assert.New(t)

// Given a service writer, its incoming channel and the endpoint that receives the payloads
serviceWriter, serviceChannel, testEndpoint, _, teardown := testServiceWriter()
defer teardown()
serviceWriter, serviceChannel, testEndpoint, _ := testServiceWriter()
serviceWriter.conf.FlushPeriod = 100 * time.Millisecond

serviceWriter.Start()
Expand Down Expand Up @@ -79,8 +77,7 @@ func TestServiceWriter_UpdateInfoHandling(t *testing.T) {
assert := assert.New(t)

// Given a service writer, its incoming channel and the endpoint that receives the payloads
serviceWriter, serviceChannel, testEndpoint, statsClient, teardown := testServiceWriter()
defer teardown()
serviceWriter, serviceChannel, testEndpoint, statsClient := testServiceWriter()
serviceWriter.conf.FlushPeriod = 100 * time.Millisecond
serviceWriter.conf.UpdateInfoPeriod = 100 * time.Millisecond

Expand Down Expand Up @@ -197,19 +194,16 @@ func assertMetadata(assert *assert.Assertions, expectedHeaders map[string]string
assert.Equal(expectedMetadata, servicesMetadata, "Service metadata should match expectation")
}

func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndpoint, *testutil.TestStatsClient, func()) {
func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndpoint, *testutil.TestStatsClient) {
serviceChannel := make(chan model.ServicesMetadata)
conf := &config.AgentConfig{
ServiceWriterConfig: writerconfig.DefaultServiceWriterConfig(),
}
serviceWriter := NewServiceWriter(conf, serviceChannel)
testEndpoint := &testEndpoint{}
serviceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
testStatsClient := statsd.Client.(*testutil.TestStatsClient)
testStatsClient.Reset()

return serviceWriter, serviceChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
}
return serviceWriter, serviceChannel, testEndpoint, testStatsClient
}
26 changes: 9 additions & 17 deletions writer/stats_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func TestStatsWriter_StatHandling(t *testing.T) {
assert := assert.New(t)

// Given a stats writer, its incoming channel and the endpoint that receives the payloads
statsWriter, statsChannel, testEndpoint, _, teardown := testStatsWriter()
defer teardown()
statsWriter, statsChannel, testEndpoint, _ := testStatsWriter()

statsWriter.Start()

Expand Down Expand Up @@ -70,8 +69,7 @@ func TestStatsWriter_UpdateInfoHandling(t *testing.T) {
assert := assert.New(t)

// Given a stats writer, its incoming channel and the endpoint that receives the payloads
statsWriter, statsChannel, testEndpoint, statsClient, teardown := testStatsWriter()
defer teardown()
statsWriter, statsChannel, testEndpoint, statsClient := testStatsWriter()
statsWriter.conf.UpdateInfoPeriod = 100 * time.Millisecond

statsWriter.Start()
Expand Down Expand Up @@ -177,8 +175,7 @@ func TestStatsWriter_BuildPayloads(t *testing.T) {
t.Run("common case, no duplicate entries", func(t *testing.T) {
assert := assert.New(t)

sw, _, _, _, teardown := testStatsWriter()
defer teardown()
sw, _, _, _ := testStatsWriter()

// This gives us a total of 45 entries. 3 per span, 5
// spans per stat bucket. Each buckets have the same
Expand Down Expand Up @@ -220,8 +217,7 @@ func TestStatsWriter_BuildPayloads(t *testing.T) {
t.Run("common case, with duplicate entries", func(t *testing.T) {
assert := assert.New(t)

sw, _, _, _, teardown := testStatsWriter()
defer teardown()
sw, _, _, _ := testStatsWriter()

// This gives us a total of 45 entries. 3 per span, 5
// spans per stat bucket. Each buckets have the same
Expand Down Expand Up @@ -284,8 +280,7 @@ func TestStatsWriter_BuildPayloads(t *testing.T) {
t.Run("no need for split", func(t *testing.T) {
assert := assert.New(t)

sw, _, _, _, teardown := testStatsWriter()
defer teardown()
sw, _, _, _ := testStatsWriter()
sw.Start()

// This gives us a tota of 45 entries. 3 per span, 5 spans per
Expand Down Expand Up @@ -390,7 +385,7 @@ func assertStatsPayload(assert *assert.Assertions, headers map[string]string, bu
assert.Equal(buckets, statsPayload.Stats, "Stat buckets should match expectation")
}

func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *testutil.TestStatsClient, func()) {
func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *testutil.TestStatsClient) {
statsChannel := make(chan []model.StatsBucket)
conf := &config.AgentConfig{
Hostname: testHostName,
Expand All @@ -400,11 +395,8 @@ func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *
statsWriter := NewStatsWriter(conf, statsChannel)
testEndpoint := &testEndpoint{}
statsWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
testStatsClient := statsd.Client.(*testutil.TestStatsClient)
testStatsClient.Reset()

return statsWriter, statsChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
}
return statsWriter, statsChannel, testEndpoint, testStatsClient
}
20 changes: 7 additions & 13 deletions writer/trace_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func TestTraceWriter(t *testing.T) {
assert := assert.New(t)

// Create a trace writer, its incoming channel and the endpoint that receives the payloads
traceWriter, traceChannel, testEndpoint, _, teardown := testTraceWriter()
defer teardown()
traceWriter, traceChannel, testEndpoint, _ := testTraceWriter()
// Set a maximum of 4 spans per payload
traceWriter.conf.MaxSpansPerPayload = 4
traceWriter.Start()
Expand Down Expand Up @@ -71,8 +70,7 @@ func TestTraceWriter(t *testing.T) {
testFlushPeriod := 100 * time.Millisecond

// Create a trace writer, its incoming channel and the endpoint that receives the payloads
traceWriter, traceChannel, testEndpoint, _, teardown := testTraceWriter()
defer teardown()
traceWriter, traceChannel, testEndpoint, _ := testTraceWriter()
// Periodically flushing every 100ms
traceWriter.conf.FlushPeriod = testFlushPeriod
traceWriter.Start()
Expand Down Expand Up @@ -107,8 +105,7 @@ func TestTraceWriter(t *testing.T) {
testFlushPeriod := 100 * time.Millisecond

// Create a trace writer, its incoming channel and the endpoint that receives the payloads
traceWriter, traceChannel, testEndpoint, statsClient, teardown := testTraceWriter()
defer teardown()
traceWriter, traceChannel, testEndpoint, statsClient := testTraceWriter()
traceWriter.conf.FlushPeriod = 100 * time.Millisecond
traceWriter.conf.UpdateInfoPeriod = 100 * time.Millisecond
traceWriter.conf.MaxSpansPerPayload = 10
Expand Down Expand Up @@ -339,7 +336,7 @@ func assertPayloads(assert *assert.Assertions, traceWriter *TraceWriter, expecte
}
}

func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testutil.TestStatsClient, func()) {
func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testutil.TestStatsClient) {
payloadChannel := make(chan *SampledTrace)
conf := &config.AgentConfig{
Hostname: testHostName,
Expand All @@ -349,13 +346,10 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut
traceWriter := NewTraceWriter(conf, payloadChannel)
testEndpoint := &testEndpoint{}
traceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
testStatsClient := statsd.Client.(*testutil.TestStatsClient)
testStatsClient.Reset()

return traceWriter, payloadChannel, testEndpoint, testStatsClient, func() {
statsd.Client = originalClient
}
return traceWriter, payloadChannel, testEndpoint, testStatsClient
}

func randomSampledTrace(numSpans, numTransactions int) *SampledTrace {
Expand Down

0 comments on commit 863755d

Please sign in to comment.