diff --git a/writer/multi_writer.go b/writer/multi_writer.go index 1721ebeab..05442c106 100644 --- a/writer/multi_writer.go +++ b/writer/multi_writer.go @@ -17,13 +17,6 @@ type multiSender struct { mch chan interface{} // monitor funneling channel } -// newMultiSenderFactory returns a new factory to generate multiSender. -func newMultiSenderFactory(cfg config.QueuablePayloadSenderConf) func([]Endpoint) PayloadSender { - return func(endpoints []Endpoint) PayloadSender { - return newMultiSender(endpoints, cfg) - } -} - // newMultiSender returns a new PayloadSender which forwards all sent payloads to all // the given endpoints, as well as funnels all monitoring channels. func newMultiSender(endpoints []Endpoint, cfg config.QueuablePayloadSenderConf) PayloadSender { diff --git a/writer/multi_writer_test.go b/writer/multi_writer_test.go index 434aaf868..8e3a09281 100644 --- a/writer/multi_writer_test.go +++ b/writer/multi_writer_test.go @@ -12,11 +12,10 @@ import ( func TestNewMultiSenderFactory(t *testing.T) { cfg := config.DefaultQueuablePayloadSenderConf() - fn := newMultiSenderFactory(cfg) t.Run("one", func(t *testing.T) { endpoint := &DatadogEndpoint{Host: "host1", APIKey: "key1"} - sender, ok := fn([]Endpoint{endpoint}).(*QueuablePayloadSender) + sender, ok := newMultiSender([]Endpoint{endpoint}, cfg).(*QueuablePayloadSender) assert := assert.New(t) assert.True(ok) assert.EqualValues(endpoint, sender.BasePayloadSender.endpoint) @@ -29,7 +28,7 @@ func TestNewMultiSenderFactory(t *testing.T) { &DatadogEndpoint{Host: "host2", APIKey: "key2"}, &DatadogEndpoint{Host: "host3", APIKey: "key3"}, } - sender, ok := fn(endpoints).(*multiSender) + sender, ok := newMultiSender(endpoints, cfg).(*multiSender) assert := assert.New(t) assert.True(ok) assert.Len(sender.senders, 3) diff --git a/writer/service_writer.go b/writer/service_writer.go index 6c3bfee3c..b8df953dc 100644 --- a/writer/service_writer.go +++ b/writer/service_writer.go @@ -15,6 +15,8 @@ import ( writerconfig "github.com/DataDog/datadog-trace-agent/writer/config" ) +const pathServices = "/api/v0.2/services" + // ServiceWriter ingests service metadata and flush them to the API. type ServiceWriter struct { stats info.ServiceWriterInfo @@ -23,25 +25,29 @@ type ServiceWriter struct { serviceBuffer model.ServicesMetadata - BaseWriter + sender PayloadSender + exit chan struct{} } // NewServiceWriter returns a new writer for services. func NewServiceWriter(conf *config.AgentConfig, InServices <-chan model.ServicesMetadata) *ServiceWriter { - writerConf := conf.ServiceWriterConfig - log.Infof("Service writer initializing with config: %+v", writerConf) + cfg := conf.ServiceWriterConfig + endpoints := NewEndpoints(conf, pathServices) + sender := newMultiSender(endpoints, cfg.SenderConfig) + log.Infof("Service writer initializing with config: %+v", cfg) return &ServiceWriter{ - conf: writerConf, + conf: cfg, InServices: InServices, serviceBuffer: model.ServicesMetadata{}, - BaseWriter: *NewBaseWriter(conf, "/api/v0.2/services", newMultiSenderFactory(writerConf.SenderConfig)), + sender: sender, + exit: make(chan struct{}), } } // Start starts the writer. func (w *ServiceWriter) Start() { - w.BaseWriter.Start() + w.sender.Start() go func() { defer watchdog.LogOnPanic() w.Run() @@ -64,7 +70,7 @@ func (w *ServiceWriter) Run() { // Monitor sender for events go func() { - for event := range w.payloadSender.Monitor() { + for event := range w.sender.Monitor() { if event == nil { continue } @@ -114,7 +120,7 @@ func (w *ServiceWriter) Run() { func (w *ServiceWriter) Stop() { w.exit <- struct{}{} <-w.exit - w.BaseWriter.Stop() + w.sender.Stop() } func (w *ServiceWriter) handleServiceMetadata(metadata model.ServicesMetadata) { @@ -146,7 +152,7 @@ func (w *ServiceWriter) flush() { atomic.AddInt64(&w.stats.Bytes, int64(len(data))) payload := NewPayload(data, headers) - w.payloadSender.Send(payload) + w.sender.Send(payload) w.serviceBuffer = make(model.ServicesMetadata) } diff --git a/writer/service_writer_test.go b/writer/service_writer_test.go index 54bd39bee..378206871 100644 --- a/writer/service_writer_test.go +++ b/writer/service_writer_test.go @@ -24,7 +24,7 @@ func TestServiceWriter_SenderMaxPayloads(t *testing.T) { defer teardown() // When checking its default sender configuration - queuableSender := serviceWriter.BaseWriter.payloadSender.(*QueuablePayloadSender) + queuableSender := serviceWriter.sender.(*QueuablePayloadSender) // Then the MaxQueuedPayloads setting should be -1 (unlimited) assert.Equal(-1, queuableSender.conf.MaxQueuedPayloads) @@ -204,7 +204,7 @@ func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndp } serviceWriter := NewServiceWriter(conf, serviceChannel) testEndpoint := &testEndpoint{} - serviceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint) + serviceWriter.sender.setEndpoint(testEndpoint) testStatsClient := &testutil.TestStatsClient{} originalClient := statsd.Client statsd.Client = testStatsClient diff --git a/writer/stats_writer.go b/writer/stats_writer.go index c6bef46d7..fa162e504 100644 --- a/writer/stats_writer.go +++ b/writer/stats_writer.go @@ -15,9 +15,12 @@ import ( writerconfig "github.com/DataDog/datadog-trace-agent/writer/config" ) +const pathStats = "/api/v0.2/stats" + // StatsWriter ingests stats buckets and flushes them to the API. type StatsWriter struct { - BaseWriter + sender PayloadSender + exit chan struct{} // InStats is the stream of stat buckets to send out. InStats <-chan []model.StatsBucket @@ -39,22 +42,24 @@ type StatsWriter struct { // NewStatsWriter returns a new writer for stats. func NewStatsWriter(conf *config.AgentConfig, InStats <-chan []model.StatsBucket) *StatsWriter { - writerConf := conf.StatsWriterConfig - log.Infof("Stats writer initializing with config: %+v", writerConf) + cfg := conf.StatsWriterConfig + endpoints := NewEndpoints(conf, pathStats) + sender := newMultiSender(endpoints, cfg.SenderConfig) + log.Infof("Stats writer initializing with config: %+v", cfg) - bw := *NewBaseWriter(conf, "/api/v0.2/stats", newMultiSenderFactory(writerConf.SenderConfig)) return &StatsWriter{ - BaseWriter: bw, - InStats: InStats, - hostName: conf.Hostname, - env: conf.DefaultEnv, - conf: writerConf, + sender: sender, + exit: make(chan struct{}), + InStats: InStats, + hostName: conf.Hostname, + env: conf.DefaultEnv, + conf: cfg, } } // Start starts the writer, awaiting stat buckets and flushing them. func (w *StatsWriter) Start() { - w.BaseWriter.Start() + w.sender.Start() go func() { defer watchdog.LogOnPanic() @@ -89,11 +94,7 @@ func (w *StatsWriter) Run() { func (w *StatsWriter) Stop() { w.exit <- struct{}{} <-w.exit - - // Closing the base writer, among other things, will close the - // w.payloadSender.Monitor() channel, stoping the monitoring - // goroutine. - w.BaseWriter.Stop() + w.sender.Stop() } func (w *StatsWriter) handleStats(stats []model.StatsBucket) { @@ -126,7 +127,7 @@ func (w *StatsWriter) handleStats(stats []model.StatsBucket) { } payload := NewPayload(data, headers) - w.payloadSender.Send(payload) + w.sender.Send(payload) atomic.AddInt64(&w.info.Bytes, int64(len(data))) } @@ -247,7 +248,7 @@ func (w *StatsWriter) buildPayloads(stats []model.StatsBucket, maxEntriesPerPayl // them, send out statsd metrics, and updates the writer info // - periodically dumps the writer info func (w *StatsWriter) monitor() { - monC := w.payloadSender.Monitor() + monC := w.sender.Monitor() infoTicker := time.NewTicker(w.conf.UpdateInfoPeriod) defer infoTicker.Stop() diff --git a/writer/stats_writer_test.go b/writer/stats_writer_test.go index 8bae38fdf..33ad6e719 100644 --- a/writer/stats_writer_test.go +++ b/writer/stats_writer_test.go @@ -399,7 +399,7 @@ func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, * } statsWriter := NewStatsWriter(conf, statsChannel) testEndpoint := &testEndpoint{} - statsWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint) + statsWriter.sender.setEndpoint(testEndpoint) testStatsClient := &testutil.TestStatsClient{} originalClient := statsd.Client statsd.Client = testStatsClient diff --git a/writer/trace_writer.go b/writer/trace_writer.go index 0a3fa2a7a..0da3a108e 100644 --- a/writer/trace_writer.go +++ b/writer/trace_writer.go @@ -18,6 +18,8 @@ import ( "github.com/golang/protobuf/proto" ) +const pathTraces = "/api/v0.2/traces" + // SampledTrace represents the result of a trace sample operation. type SampledTrace struct { Trace *model.Trace @@ -41,16 +43,19 @@ type TraceWriter struct { transactions []*model.Span spansInBuffer int - BaseWriter + sender PayloadSender + exit chan struct{} } // NewTraceWriter returns a new writer for traces. func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWriter { - writerConf := conf.TraceWriterConfig - log.Infof("Trace writer initializing with config: %+v", writerConf) + cfg := conf.TraceWriterConfig + endpoints := NewEndpoints(conf, pathTraces) + sender := newMultiSender(endpoints, cfg.SenderConfig) + log.Infof("Trace writer initializing with config: %+v", cfg) return &TraceWriter{ - conf: writerConf, + conf: cfg, hostName: conf.Hostname, env: conf.DefaultEnv, @@ -59,13 +64,14 @@ func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWri in: in, - BaseWriter: *NewBaseWriter(conf, "/api/v0.2/traces", newMultiSenderFactory(writerConf.SenderConfig)), + sender: sender, + exit: make(chan struct{}), } } // Start starts the writer. func (w *TraceWriter) Start() { - w.BaseWriter.Start() + w.sender.Start() go func() { defer watchdog.LogOnPanic() w.Run() @@ -86,7 +92,7 @@ func (w *TraceWriter) Run() { // Monitor sender for events go func() { - for event := range w.payloadSender.Monitor() { + for event := range w.sender.Monitor() { if event == nil { continue } @@ -137,7 +143,7 @@ func (w *TraceWriter) Run() { func (w *TraceWriter) Stop() { w.exit <- struct{}{} <-w.exit - w.BaseWriter.Stop() + w.sender.Stop() } func (w *TraceWriter) handleSampledTrace(sampledTrace *SampledTrace) { @@ -256,7 +262,7 @@ func (w *TraceWriter) flush() { payload := NewPayload(serialized, headers) log.Debugf("flushing traces=%v transactions=%v", len(w.traces), len(w.transactions)) - w.payloadSender.Send(payload) + w.sender.Send(payload) w.resetBuffer() } diff --git a/writer/trace_writer_test.go b/writer/trace_writer_test.go index d04b5b58c..2b16d874c 100644 --- a/writer/trace_writer_test.go +++ b/writer/trace_writer_test.go @@ -348,7 +348,7 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut } traceWriter := NewTraceWriter(conf, payloadChannel) testEndpoint := &testEndpoint{} - traceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint) + traceWriter.sender.setEndpoint(testEndpoint) testStatsClient := &testutil.TestStatsClient{} originalClient := statsd.Client statsd.Client = testStatsClient diff --git a/writer/writer.go b/writer/writer.go deleted file mode 100644 index 2ae2d6939..000000000 --- a/writer/writer.go +++ /dev/null @@ -1,28 +0,0 @@ -package writer - -import "github.com/DataDog/datadog-trace-agent/config" - -// BaseWriter encodes the base components and behaviour of a typical Writer. -type BaseWriter struct { - payloadSender PayloadSender - exit chan struct{} -} - -// NewBaseWriter creates a new instance of a BaseWriter. -func NewBaseWriter(conf *config.AgentConfig, path string, senderFactory func([]Endpoint) PayloadSender) *BaseWriter { - endpoints := NewEndpoints(conf, path) - return &BaseWriter{ - payloadSender: senderFactory(endpoints), - exit: make(chan struct{}), - } -} - -// Start starts the necessary components of a BaseWriter. -func (w *BaseWriter) Start() { - w.payloadSender.Start() -} - -// Stop stops any the stoppable components of a BaseWriter. -func (w *BaseWriter) Stop() { - w.payloadSender.Stop() -}