Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
writer: remove BaseWriter and sender factory. (#499)
Browse files Browse the repository at this point in the history
* writer: remove BaseWriter
* writer: remove multiSenderFactory
  • Loading branch information
gbbr committed Oct 22, 2018
1 parent 8720fdf commit 8f79a84
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 77 deletions.
7 changes: 0 additions & 7 deletions writer/multi_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ type multiSender struct {
mch chan interface{} // monitor funneling channel
}

// newMultiSenderFactory returns a new factory to generate multiSender.
func newMultiSenderFactory(cfg config.QueuablePayloadSenderConf) func([]Endpoint) PayloadSender {
return func(endpoints []Endpoint) PayloadSender {
return newMultiSender(endpoints, cfg)
}
}

// newMultiSender returns a new PayloadSender which forwards all sent payloads to all
// the given endpoints, as well as funnels all monitoring channels.
func newMultiSender(endpoints []Endpoint, cfg config.QueuablePayloadSenderConf) PayloadSender {
Expand Down
5 changes: 2 additions & 3 deletions writer/multi_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (

func TestNewMultiSenderFactory(t *testing.T) {
cfg := config.DefaultQueuablePayloadSenderConf()
fn := newMultiSenderFactory(cfg)

t.Run("one", func(t *testing.T) {
endpoint := &DatadogEndpoint{Host: "host1", APIKey: "key1"}
sender, ok := fn([]Endpoint{endpoint}).(*QueuablePayloadSender)
sender, ok := newMultiSender([]Endpoint{endpoint}, cfg).(*QueuablePayloadSender)
assert := assert.New(t)
assert.True(ok)
assert.EqualValues(endpoint, sender.BasePayloadSender.endpoint)
Expand All @@ -29,7 +28,7 @@ func TestNewMultiSenderFactory(t *testing.T) {
&DatadogEndpoint{Host: "host2", APIKey: "key2"},
&DatadogEndpoint{Host: "host3", APIKey: "key3"},
}
sender, ok := fn(endpoints).(*multiSender)
sender, ok := newMultiSender(endpoints, cfg).(*multiSender)
assert := assert.New(t)
assert.True(ok)
assert.Len(sender.senders, 3)
Expand Down
24 changes: 15 additions & 9 deletions writer/service_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
)

const pathServices = "/api/v0.2/services"

// ServiceWriter ingests service metadata and flush them to the API.
type ServiceWriter struct {
stats info.ServiceWriterInfo
Expand All @@ -23,25 +25,29 @@ type ServiceWriter struct {

serviceBuffer model.ServicesMetadata

BaseWriter
sender PayloadSender
exit chan struct{}
}

// NewServiceWriter returns a new writer for services.
func NewServiceWriter(conf *config.AgentConfig, InServices <-chan model.ServicesMetadata) *ServiceWriter {
writerConf := conf.ServiceWriterConfig
log.Infof("Service writer initializing with config: %+v", writerConf)
cfg := conf.ServiceWriterConfig
endpoints := NewEndpoints(conf, pathServices)
sender := newMultiSender(endpoints, cfg.SenderConfig)
log.Infof("Service writer initializing with config: %+v", cfg)

return &ServiceWriter{
conf: writerConf,
conf: cfg,
InServices: InServices,
serviceBuffer: model.ServicesMetadata{},
BaseWriter: *NewBaseWriter(conf, "/api/v0.2/services", newMultiSenderFactory(writerConf.SenderConfig)),
sender: sender,
exit: make(chan struct{}),
}
}

// Start starts the writer.
func (w *ServiceWriter) Start() {
w.BaseWriter.Start()
w.sender.Start()
go func() {
defer watchdog.LogOnPanic()
w.Run()
Expand All @@ -64,7 +70,7 @@ func (w *ServiceWriter) Run() {

// Monitor sender for events
go func() {
for event := range w.payloadSender.Monitor() {
for event := range w.sender.Monitor() {
if event == nil {
continue
}
Expand Down Expand Up @@ -114,7 +120,7 @@ func (w *ServiceWriter) Run() {
func (w *ServiceWriter) Stop() {
w.exit <- struct{}{}
<-w.exit
w.BaseWriter.Stop()
w.sender.Stop()
}

func (w *ServiceWriter) handleServiceMetadata(metadata model.ServicesMetadata) {
Expand Down Expand Up @@ -146,7 +152,7 @@ func (w *ServiceWriter) flush() {
atomic.AddInt64(&w.stats.Bytes, int64(len(data)))

payload := NewPayload(data, headers)
w.payloadSender.Send(payload)
w.sender.Send(payload)

w.serviceBuffer = make(model.ServicesMetadata)
}
Expand Down
4 changes: 2 additions & 2 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestServiceWriter_SenderMaxPayloads(t *testing.T) {
defer teardown()

// When checking its default sender configuration
queuableSender := serviceWriter.BaseWriter.payloadSender.(*QueuablePayloadSender)
queuableSender := serviceWriter.sender.(*QueuablePayloadSender)

// Then the MaxQueuedPayloads setting should be -1 (unlimited)
assert.Equal(-1, queuableSender.conf.MaxQueuedPayloads)
Expand Down Expand Up @@ -204,7 +204,7 @@ func testServiceWriter() (*ServiceWriter, chan model.ServicesMetadata, *testEndp
}
serviceWriter := NewServiceWriter(conf, serviceChannel)
testEndpoint := &testEndpoint{}
serviceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
serviceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
Expand Down
35 changes: 18 additions & 17 deletions writer/stats_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ import (
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
)

const pathStats = "/api/v0.2/stats"

// StatsWriter ingests stats buckets and flushes them to the API.
type StatsWriter struct {
BaseWriter
sender PayloadSender
exit chan struct{}

// InStats is the stream of stat buckets to send out.
InStats <-chan []model.StatsBucket
Expand All @@ -39,22 +42,24 @@ type StatsWriter struct {

// NewStatsWriter returns a new writer for stats.
func NewStatsWriter(conf *config.AgentConfig, InStats <-chan []model.StatsBucket) *StatsWriter {
writerConf := conf.StatsWriterConfig
log.Infof("Stats writer initializing with config: %+v", writerConf)
cfg := conf.StatsWriterConfig
endpoints := NewEndpoints(conf, pathStats)
sender := newMultiSender(endpoints, cfg.SenderConfig)
log.Infof("Stats writer initializing with config: %+v", cfg)

bw := *NewBaseWriter(conf, "/api/v0.2/stats", newMultiSenderFactory(writerConf.SenderConfig))
return &StatsWriter{
BaseWriter: bw,
InStats: InStats,
hostName: conf.Hostname,
env: conf.DefaultEnv,
conf: writerConf,
sender: sender,
exit: make(chan struct{}),
InStats: InStats,
hostName: conf.Hostname,
env: conf.DefaultEnv,
conf: cfg,
}
}

// Start starts the writer, awaiting stat buckets and flushing them.
func (w *StatsWriter) Start() {
w.BaseWriter.Start()
w.sender.Start()

go func() {
defer watchdog.LogOnPanic()
Expand Down Expand Up @@ -89,11 +94,7 @@ func (w *StatsWriter) Run() {
func (w *StatsWriter) Stop() {
w.exit <- struct{}{}
<-w.exit

// Closing the base writer, among other things, will close the
// w.payloadSender.Monitor() channel, stoping the monitoring
// goroutine.
w.BaseWriter.Stop()
w.sender.Stop()
}

func (w *StatsWriter) handleStats(stats []model.StatsBucket) {
Expand Down Expand Up @@ -126,7 +127,7 @@ func (w *StatsWriter) handleStats(stats []model.StatsBucket) {
}

payload := NewPayload(data, headers)
w.payloadSender.Send(payload)
w.sender.Send(payload)

atomic.AddInt64(&w.info.Bytes, int64(len(data)))
}
Expand Down Expand Up @@ -247,7 +248,7 @@ func (w *StatsWriter) buildPayloads(stats []model.StatsBucket, maxEntriesPerPayl
// them, send out statsd metrics, and updates the writer info
// - periodically dumps the writer info
func (w *StatsWriter) monitor() {
monC := w.payloadSender.Monitor()
monC := w.sender.Monitor()

infoTicker := time.NewTicker(w.conf.UpdateInfoPeriod)
defer infoTicker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion writer/stats_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func testStatsWriter() (*StatsWriter, chan []model.StatsBucket, *testEndpoint, *
}
statsWriter := NewStatsWriter(conf, statsChannel)
testEndpoint := &testEndpoint{}
statsWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
statsWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
Expand Down
24 changes: 15 additions & 9 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/golang/protobuf/proto"
)

const pathTraces = "/api/v0.2/traces"

// SampledTrace represents the result of a trace sample operation.
type SampledTrace struct {
Trace *model.Trace
Expand All @@ -41,16 +43,19 @@ type TraceWriter struct {
transactions []*model.Span
spansInBuffer int

BaseWriter
sender PayloadSender
exit chan struct{}
}

// NewTraceWriter returns a new writer for traces.
func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWriter {
writerConf := conf.TraceWriterConfig
log.Infof("Trace writer initializing with config: %+v", writerConf)
cfg := conf.TraceWriterConfig
endpoints := NewEndpoints(conf, pathTraces)
sender := newMultiSender(endpoints, cfg.SenderConfig)
log.Infof("Trace writer initializing with config: %+v", cfg)

return &TraceWriter{
conf: writerConf,
conf: cfg,
hostName: conf.Hostname,
env: conf.DefaultEnv,

Expand All @@ -59,13 +64,14 @@ func NewTraceWriter(conf *config.AgentConfig, in <-chan *SampledTrace) *TraceWri

in: in,

BaseWriter: *NewBaseWriter(conf, "/api/v0.2/traces", newMultiSenderFactory(writerConf.SenderConfig)),
sender: sender,
exit: make(chan struct{}),
}
}

// Start starts the writer.
func (w *TraceWriter) Start() {
w.BaseWriter.Start()
w.sender.Start()
go func() {
defer watchdog.LogOnPanic()
w.Run()
Expand All @@ -86,7 +92,7 @@ func (w *TraceWriter) Run() {

// Monitor sender for events
go func() {
for event := range w.payloadSender.Monitor() {
for event := range w.sender.Monitor() {
if event == nil {
continue
}
Expand Down Expand Up @@ -137,7 +143,7 @@ func (w *TraceWriter) Run() {
func (w *TraceWriter) Stop() {
w.exit <- struct{}{}
<-w.exit
w.BaseWriter.Stop()
w.sender.Stop()
}

func (w *TraceWriter) handleSampledTrace(sampledTrace *SampledTrace) {
Expand Down Expand Up @@ -256,7 +262,7 @@ func (w *TraceWriter) flush() {
payload := NewPayload(serialized, headers)

log.Debugf("flushing traces=%v transactions=%v", len(w.traces), len(w.transactions))
w.payloadSender.Send(payload)
w.sender.Send(payload)
w.resetBuffer()
}

Expand Down
2 changes: 1 addition & 1 deletion writer/trace_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func testTraceWriter() (*TraceWriter, chan *SampledTrace, *testEndpoint, *testut
}
traceWriter := NewTraceWriter(conf, payloadChannel)
testEndpoint := &testEndpoint{}
traceWriter.BaseWriter.payloadSender.setEndpoint(testEndpoint)
traceWriter.sender.setEndpoint(testEndpoint)
testStatsClient := &testutil.TestStatsClient{}
originalClient := statsd.Client
statsd.Client = testStatsClient
Expand Down
28 changes: 0 additions & 28 deletions writer/writer.go

This file was deleted.

0 comments on commit 8f79a84

Please sign in to comment.