Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

writer: remove BaseWriter and sender factory. #499

Merged
merged 2 commits into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 0 additions & 7 deletions writer/multi_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ type multiSender struct {
mch chan interface{} // monitor funneling channel
}

// newMultiSenderFactory returns a new factory to generate multiSender.
func newMultiSenderFactory(cfg config.QueuablePayloadSenderConf) func([]Endpoint) PayloadSender {
return func(endpoints []Endpoint) PayloadSender {
return newMultiSender(endpoints, cfg)
}
}

// newMultiSender returns a new PayloadSender which forwards all sent payloads to all
// the given endpoints, as well as funnels all monitoring channels.
func newMultiSender(endpoints []Endpoint, cfg config.QueuablePayloadSenderConf) PayloadSender {
Expand Down
5 changes: 2 additions & 3 deletions writer/multi_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (

func TestNewMultiSenderFactory(t *testing.T) {
cfg := config.DefaultQueuablePayloadSenderConf()
fn := newMultiSenderFactory(cfg)

t.Run("one", func(t *testing.T) {
endpoint := &DatadogEndpoint{Host: "host1", APIKey: "key1"}
sender, ok := fn([]Endpoint{endpoint}).(*QueuablePayloadSender)
sender, ok := newMultiSender([]Endpoint{endpoint}, cfg).(*QueuablePayloadSender)
assert := assert.New(t)
assert.True(ok)
assert.EqualValues(endpoint, sender.BasePayloadSender.endpoint)
Expand All @@ -29,7 +28,7 @@ func TestNewMultiSenderFactory(t *testing.T) {
&DatadogEndpoint{Host: "host2", APIKey: "key2"},
&DatadogEndpoint{Host: "host3", APIKey: "key3"},
}
sender, ok := fn(endpoints).(*multiSender)
sender, ok := newMultiSender(endpoints, cfg).(*multiSender)
assert := assert.New(t)
assert.True(ok)
assert.Len(sender.senders, 3)
Expand Down
24 changes: 15 additions & 9 deletions writer/service_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
)

const pathServices = "/api/v0.2/services"

// ServiceWriter ingests service metadata and flush them to the API.
type ServiceWriter struct {
stats info.ServiceWriterInfo
Expand All @@ -23,25 +25,29 @@ type ServiceWriter struct {

serviceBuffer model.ServicesMetadata

BaseWriter
sender PayloadSender
exit chan struct{}
}

// NewServiceWriter returns a new writer for services.
func NewServiceWriter(conf *config.AgentConfig, InServices <-chan model.ServicesMetadata) *ServiceWriter {
writerConf := conf.ServiceWriterConfig
log.Infof("Service writer initializing with config: %+v", writerConf)
cfg := conf.ServiceWriterConfig
endpoints := NewEndpoints(conf, pathServices)
sender := newMultiSender(endpoints, cfg.SenderConfig)
log.Infof("Service writer initializing with config: %+v", cfg)

return &ServiceWriter{
conf: writerConf,
conf: cfg,
InServices: InServices,
serviceBuffer: model.ServicesMetadata{},
BaseWriter: *NewBaseWriter(conf, "/api/v0.2/services", newMultiSenderFactory(writerConf.SenderConfig)),
sender: sender,
exit: make(chan struct{}),
}
}

// Start starts the writer.
func (w *ServiceWriter) Start() {
w.BaseWriter.Start()
w.sender.Start()
go func() {
defer watchdog.LogOnPanic()
w.Run()
Expand All @@ -64,7 +70,7 @@ func (w *ServiceWriter) Run() {

// Monitor sender for events
go func() {
for event := range w.payloadSender.Monitor() {
for event := range w.sender.Monitor() {
if event == nil {
continue
}
Expand Down Expand Up @@ -114,7 +120,7 @@ func (w *ServiceWriter) Run() {
func (w *ServiceWriter) Stop() {
w.exit <- struct{}{}
<-w.exit
w.BaseWriter.Stop()
w.sender.Stop()
}

func (w *ServiceWriter) handleServiceMetadata(metadata model.ServicesMetadata) {
Expand Down Expand Up @@ -146,7 +152,7 @@ func (w *ServiceWriter) flush() {
atomic.AddInt64(&w.stats.Bytes, int64(len(data)))

payload := NewPayload(data, headers)
w.payloadSender.Send(payload)
w.sender.Send(payload)

w.serviceBuffer = make(model.ServicesMetadata)
}
Expand Down
4 changes: 2 additions & 2 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestServiceWriter_SenderMaxPayloads(t *testing.T) {
defer teardown()

// When checking its default sender configuration
queuableSender := serviceWriter.BaseWriter.payloadSender.(*QueuablePayloadSender)
queuableSender := serviceWriter.sender.(*QueuablePayloadSender)

// Then the MaxQueuedPayloads setting should be -1 (unlimited)
assert.Equal(-1, queuableSender.conf.MaxQueuedPayloads)
Expand Down Expand Up @@ -204,7 +204,7 @@ func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndp
}
serviceWriter := NewServiceWriter(conf, serviceChannel)
testEndpoint := &testEndpoint{}
serviceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
serviceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
Expand Down
35 changes: 18 additions & 17 deletions writer/stats_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ import (
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
)

const pathStats = "/api/v0.2/stats"

// StatsWriter ingests stats buckets and flushes them to the API.
type StatsWriter struct {
BaseWriter
sender PayloadSender
exit chan struct{}

// InStats is the stream of stat buckets to send out.
InStats <-chan []model.StatsBucket
Expand All @@ -39,22 +42,24 @@ type StatsWriter struct {

// NewStatsWriter returns a new writer for stats.
func NewStatsWriter(conf *config.AgentConfig, InStats <-chan []model.StatsBucket) *StatsWriter {
writerConf := conf.StatsWriterConfig
log.Infof("Stats writer initializing with config: %+v", writerConf)
cfg := conf.StatsWriterConfig
endpoints := NewEndpoints(conf, pathStats)
sender := newMultiSender(endpoints, cfg.SenderConfig)
log.Infof("Stats writer initializing with config: %+v", cfg)

bw := *NewBaseWriter(conf, "/api/v0.2/stats", newMultiSenderFactory(writerConf.SenderConfig))
return &StatsWriter{
BaseWriter: bw,
InStats: InStats,
hostName: conf.Hostname,
env: conf.DefaultEnv,
conf: writerConf,
sender: sender,
exit: make(chan struct{}),
InStats: InStats,
hostName: conf.Hostname,
env: conf.DefaultEnv,
conf: cfg,
}
}

// Start starts the writer, awaiting stat buckets and flushing them.
func (w *StatsWriter) Start() {
w.BaseWriter.Start()
w.sender.Start()

go func() {
defer watchdog.LogOnPanic()
Expand Down Expand Up @@ -89,11 +94,7 @@ func (w *StatsWriter) Run() {
func (w *StatsWriter) Stop() {
w.exit <- struct{}{}
<-w.exit

// Closing the base writer, among other things, will close the
// w.payloadSender.Monitor() channel, stoping the monitoring
// goroutine.
w.BaseWriter.Stop()
w.sender.Stop()
}

func (w *StatsWriter) handleStats(stats []model.StatsBucket) {
Expand Down Expand Up @@ -126,7 +127,7 @@ func (w *StatsWriter) handleStats(stats []model.StatsBucket) {
}

payload := NewPayload(data, headers)
w.payloadSender.Send(payload)
w.sender.Send(payload)

atomic.AddInt64(&w.info.Bytes, int64(len(data)))
}
Expand Down Expand Up @@ -247,7 +248,7 @@ func (w *StatsWriter) buildPayloads(stats []model.StatsBucket, maxEntriesPerPayl
// them, send out statsd metrics, and updates the writer info
// - periodically dumps the writer info
func (w *StatsWriter) monitor() {
monC := w.payloadSender.Monitor()
monC := w.sender.Monitor()

infoTicker := time.NewTicker(w.conf.UpdateInfoPeriod)
defer infoTicker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion writer/stats_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *
}
statsWriter := NewStatsWriter(conf, statsChannel)
testEndpoint := &testEndpoint{}
statsWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
statsWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
Expand Down
24 changes: 15 additions & 9 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/golang/protobuf/proto"
)

const pathTraces = "/api/v0.2/traces"

// SampledTrace represents the result of a trace sample operation.
type SampledTrace struct {
Trace *model.Trace
Expand All @@ -41,16 +43,19 @@ type TraceWriter struct {
transactions []*model.Span
spansInBuffer int

BaseWriter
sender PayloadSender
exit chan struct{}
}

// NewTraceWriter returns a new writer for traces.
func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWriter {
writerConf := conf.TraceWriterConfig
log.Infof("Trace writer initializing with config: %+v", writerConf)
cfg := conf.TraceWriterConfig
endpoints := NewEndpoints(conf, pathTraces)
sender := newMultiSender(endpoints, cfg.SenderConfig)
log.Infof("Trace writer initializing with config: %+v", cfg)

return &TraceWriter{
conf: writerConf,
conf: cfg,
hostName: conf.Hostname,
env: conf.DefaultEnv,

Expand All @@ -59,13 +64,14 @@ func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWri

in: in,

BaseWriter: *NewBaseWriter(conf, "/api/v0.2/traces", newMultiSenderFactory(writerConf.SenderConfig)),
sender: sender,
exit: make(chan struct{}),
}
}

// Start starts the writer.
func (w *TraceWriter) Start() {
w.BaseWriter.Start()
w.sender.Start()
go func() {
defer watchdog.LogOnPanic()
w.Run()
Expand All @@ -86,7 +92,7 @@ func (w *TraceWriter) Run() {

// Monitor sender for events
go func() {
for event := range w.payloadSender.Monitor() {
for event := range w.sender.Monitor() {
if event == nil {
continue
}
Expand Down Expand Up @@ -137,7 +143,7 @@ func (w *TraceWriter) Run() {
func (w *TraceWriter) Stop() {
w.exit <- struct{}{}
<-w.exit
w.BaseWriter.Stop()
w.sender.Stop()
}

func (w *TraceWriter) handleSampledTrace(sampledTrace *SampledTrace) {
Expand Down Expand Up @@ -256,7 +262,7 @@ func (w *TraceWriter) flush() {
payload := NewPayload(serialized, headers)

log.Debugf("flushing traces=%v transactions=%v", len(w.traces), len(w.transactions))
w.payloadSender.Send(payload)
w.sender.Send(payload)
w.resetBuffer()
}

Expand Down
2 changes: 1 addition & 1 deletion writer/trace_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut
}
traceWriter := NewTraceWriter(conf, payloadChannel)
testEndpoint := &testEndpoint{}
traceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
traceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
Expand Down
28 changes: 0 additions & 28 deletions writer/writer.go

This file was deleted.