From 8fae9bc6bf89795e071c139335ea271215e19e71 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 26 Jun 2020 15:59:38 -0700 Subject: [PATCH] Rely on gRPC to batch and loadbalance between connections instead of custom logic (#1212) Signed-off-by: Bogdan Drutu --- exporter/otlpexporter/otlp.go | 106 ++++++---------------------------- 1 file changed, 17 insertions(+), 89 deletions(-) diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 6d134b39a6f87..7bd206080189b 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -20,7 +20,6 @@ import ( "sync" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" @@ -32,7 +31,8 @@ import ( ) type otlpExporter struct { - exporters chan *exporterImp + exporter *exporterImp + stopOnce sync.Once } type exporterErrorCode int @@ -48,8 +48,6 @@ func (e *exporterError) Error() string { } const ( - defaultNumWorkers int = 8 - _ exporterErrorCode = iota // skip 0 // errEndpointRequired indicates that this exporter was not provided with an endpoint in its config. errEndpointRequired @@ -133,77 +131,32 @@ func createOTLPExporter(config configmodels.Exporter) (*otlpExporter, error) { } } - numWorkers := defaultNumWorkers - if oCfg.NumWorkers > 0 { - numWorkers = oCfg.NumWorkers + exporter, serr := newExporter(oCfg) + if serr != nil { + return nil, fmt.Errorf("cannot configure OTLP exporter: %v", serr) } - exportersChan := make(chan *exporterImp, numWorkers) - for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ { - // TODO: newExporter blocks for connection. Now that we have ability - // to report errors asynchronously using Host.ReportFatalError we can move this - // code to Start() and do it in background to avoid blocking Collector startup - // as we do now. - exporter, serr := newExporter(oCfg) - if serr != nil { - return nil, fmt.Errorf("cannot configure OTLP exporter: %v", serr) - } - exportersChan <- exporter - } - oce := &otlpExporter{exporters: exportersChan} + oce := &otlpExporter{exporter: exporter} return oce, nil } func (oce *otlpExporter) Shutdown(context.Context) error { - // Stop all exporters. Will wait until all are stopped. - wg := &sync.WaitGroup{} - var errors []error - var errorsMu sync.Mutex - visitedCnt := 0 - for currExporter := range oce.exporters { - wg.Add(1) - go func(exporter *exporterImp) { - defer wg.Done() - err := exporter.stop() - if err != nil { - errorsMu.Lock() - errors = append(errors, err) - errorsMu.Unlock() - } - }(currExporter) - visitedCnt++ - if visitedCnt == cap(oce.exporters) { - // Visited and concurrently executed stop() on all exporters. - break - } - } - - // Wait for all stop() calls to finish. - wg.Wait() - close(oce.exporters) - - return componenterror.CombineErrors(errors) + err := error(&exporterError{ + code: errAlreadyStopped, + msg: "OpenTelemetry exporter was already stopped.", + }) + oce.stopOnce.Do(func() { + err = oce.exporter.stop() + }) + return err } func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { - // Get first available exporter. - exporter, ok := <-oce.exporters - if !ok { - err := &exporterError{ - code: errAlreadyStopped, - msg: "OpenTelemetry exporter was already stopped.", - } - return td.SpanCount(), err - } - - // Perform the request. request := &otlptrace.ExportTraceServiceRequest{ ResourceSpans: pdata.TracesToOtlp(td), } - err := exporter.exportTrace(ctx, request) + err := oce.exporter.exportTrace(ctx, request) - // Return the exporter to the pool. - oce.exporters <- exporter if err != nil { return td.SpanCount(), err } @@ -212,24 +165,11 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics) (int, error) { imd := pdatautil.MetricsToInternalMetrics(md) - // Get first available exporter. - exporter, ok := <-oce.exporters - if !ok { - err := &exporterError{ - code: errAlreadyStopped, - msg: "OpenTelemetry exporter was already stopped.", - } - return imd.MetricCount(), err - } - - // Perform the request. request := &otlpmetrics.ExportMetricsServiceRequest{ ResourceMetrics: data.MetricDataToOtlp(imd), } - err := exporter.exportMetrics(ctx, request) + err := oce.exporter.exportMetrics(ctx, request) - // Return the exporter to the pool. - oce.exporters <- exporter if err != nil { return imd.MetricCount(), err } @@ -237,23 +177,11 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics) } func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int, error) { - // Get first available exporter. - exporter, ok := <-oce.exporters - if !ok { - err := &exporterError{ - code: errAlreadyStopped, - msg: "OpenTelemetry exporter was already stopped.", - } - return logs.LogRecordCount(), err - } - request := &otlplogs.ExportLogServiceRequest{ ResourceLogs: data.LogsToProto(logs), } - err := exporter.exportLogs(ctx, request) + err := oce.exporter.exportLogs(ctx, request) - // Return the exporter to the pool. - oce.exporters <- exporter if err != nil { return logs.LogRecordCount(), err }