Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
Build pipeline processors based on new configuration (#576)
Browse files Browse the repository at this point in the history
- Build pipeline processors and plug them into exporters.
- Added tests to verify that single exporter and multiple exporter pipelines
  (fan out) work correctly.
- Minor cleanup in exporters_builder.go
- Fixed opencensus receiver TestCreateReceiver to find available port for
  testing (instead of fixed port which would fail if already listened).
  • Loading branch information
tigrannajaryan committed Jun 14, 2019
1 parent bf83eb9 commit 9317ffe
Show file tree
Hide file tree
Showing 17 changed files with 426 additions and 51 deletions.
40 changes: 6 additions & 34 deletions cmd/occollector/app/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,31 @@
package builder

import (
"context"
"fmt"

"go.uber.org/zap"

"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/internal/configmodels"
"github.com/census-instrumentation/opencensus-service/internal/factories"
)

// exporterImpl is a running exporter that is built based on a config. It can have
// builtExporter is an exporter that is built based on a config. It can have
// a trace and/or a metrics consumer and have a stop function.
type exporterImpl struct {
type builtExporter struct {
tc consumer.TraceConsumer
mc consumer.MetricsConsumer
stop func() error
}

// Check that exporterImpl implements Exporter interface.
var _ exporter.Exporter = (*exporterImpl)(nil)

// ConsumeTraceData receives data.TraceData for processing by the TraceConsumer.
func (exp *exporterImpl) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
return exp.tc.ConsumeTraceData(ctx, td)
}

// ConsumeMetricsData receives data.MetricsData for processing by the MetricsConsumer.
func (exp *exporterImpl) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
return exp.mc.ConsumeMetricsData(ctx, md)
}

// TraceExportFormat is unneeded, we need to get rid of it after we cleanup
// exporter.TraceExporter interface.
func (exp *exporterImpl) TraceExportFormat() string {
return ""
}

// MetricsExportFormat is unneeded, we need to get rid of it after we cleanup
// exporter.MetricsExporter interface.
func (exp *exporterImpl) MetricsExportFormat() string {
return ""
}

// Stop the exporter.
func (exp *exporterImpl) Stop() error {
func (exp *builtExporter) Stop() error {
return exp.stop()
}

// Exporters is a map of exporters created from exporter configs.
type Exporters map[configmodels.Exporter]*exporterImpl
type Exporters map[configmodels.Exporter]*builtExporter

// StopAll stops all exporters.
func (exps Exporters) StopAll() {
Expand Down Expand Up @@ -179,11 +151,11 @@ func combineStopFunc(f1, f2 factories.StopFunc) factories.StopFunc {
func (eb *ExportersBuilder) buildExporter(
config configmodels.Exporter,
exportersInputDataTypes exportersRequiredDataTypes,
) (*exporterImpl, error) {
) (*builtExporter, error) {

factory := factories.GetExporterFactory(config.Type())

exporter := &exporterImpl{}
exporter := &builtExporter{}

inputDataTypes := exportersInputDataTypes[config]
if inputDataTypes == nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/occollector/app/builder/exporters_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestExportersBuilder_StopAll(t *testing.T) {
exporters := make(Exporters)
expCfg := &configmodels.ExporterSettings{}
stopCalled := false
exporters[expCfg] = &exporterImpl{
exporters[expCfg] = &builtExporter{
stop: func() error {
stopCalled = true
return nil
Expand Down
164 changes: 164 additions & 0 deletions cmd/occollector/app/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"fmt"

"go.uber.org/zap"

"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/internal/configmodels"
"github.com/census-instrumentation/opencensus-service/internal/factories"
"github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
)

// builtProcessor is a processor that is built based on a config.
// It can have a trace and/or a metrics consumer.
type builtProcessor struct {
tc consumer.TraceConsumer
mc consumer.MetricsConsumer
}

// PipelineProcessors is a map of entry-point processors created from pipeline configs.
// Each element of the map points to the first processor of the pipeline.
type PipelineProcessors map[*configmodels.Pipeline]*builtProcessor

// PipelinesBuilder builds pipelines from config.
type PipelinesBuilder struct {
logger *zap.Logger
config *configmodels.ConfigV2
exporters Exporters
}

// NewPipelinesBuilder creates a new PipelinesBuilder. Requires exporters to be already
// built via ExportersBuilder. Call Build() on the returned value.
func NewPipelinesBuilder(
logger *zap.Logger,
config *configmodels.ConfigV2,
exporters Exporters,
) *PipelinesBuilder {
return &PipelinesBuilder{logger, config, exporters}
}

// Build pipeline processors from config.
func (eb *PipelinesBuilder) Build() (PipelineProcessors, error) {
pipelineProcessors := make(PipelineProcessors)

for _, pipeline := range eb.config.Pipelines {
firstProcessor, err := eb.buildPipeline(pipeline)
if err != nil {
return nil, err
}
pipelineProcessors[pipeline] = firstProcessor
}

return pipelineProcessors, nil
}

// Builds a pipeline of processors. Returns the first processor in the pipeline.
// The last processor in the pipeline will be plugged to fan out the data into exporters
// that are configured for this pipeline.
func (eb *PipelinesBuilder) buildPipeline(
pipelineCfg *configmodels.Pipeline,
) (*builtProcessor, error) {

// Build the pipeline backwards.

// First create a consumer junction point that fans out the data to all exporters.
var tc consumer.TraceConsumer
var mc consumer.MetricsConsumer

switch pipelineCfg.InputType {
case configmodels.TracesDataType:
tc = eb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters)
case configmodels.MetricsDataType:
mc = eb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters)
}

// Now build the processors backwards, starting from the last one.
// The last processor points to consumer which fans out to exporters, then
// the processor itself becomes a consumer for the one that precedes it in
// in the pipeline and so on.
for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- {
procName := pipelineCfg.Processors[i]
procCfg := eb.config.Processors[procName]

factory := factories.GetProcessorFactory(procCfg.Type())

// This processor must point to the next consumer and then
// it becomes the next for the previous one (previous in the pipeline,
// which we will build in the next loop iteration).
var err error
switch pipelineCfg.InputType {
case configmodels.TracesDataType:
tc, err = factory.CreateTraceProcessor(tc, procCfg)
case configmodels.MetricsDataType:
mc, err = factory.CreateMetricsProcessor(mc, procCfg)
}

if err != nil {
return nil, fmt.Errorf("error creating processor %q in pipeline %q: %v",
procName, pipelineCfg.Name, err)
}
}

return &builtProcessor{tc, mc}, nil
}

// Converts the list of exporter names to a list of corresponding builtExporters.
func (eb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
var result []*builtExporter
for _, name := range exporterNames {
exporter := eb.exporters[eb.config.Exporters[name]]
result = append(result, exporter)
}

return result
}

func (eb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer {
builtExporters := eb.getBuiltExportersByNames(exporterNames)

// Optimize for the case when there is only one exporter, no need to create junction point.
if len(builtExporters) == 1 {
return builtExporters[0].tc
}

var exporters []consumer.TraceConsumer
for _, builtExp := range builtExporters {
exporters = append(exporters, builtExp.tc)
}

// Create a junction point that fans out to all exporters.
return multiconsumer.NewTraceProcessor(exporters)
}

func (eb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
builtExporters := eb.getBuiltExportersByNames(exporterNames)

// Optimize for the case when there is only one exporter, no need to create junction point.
if len(builtExporters) == 1 {
return builtExporters[0].mc
}

var exporters []consumer.MetricsConsumer
for _, builtExp := range builtExporters {
exporters = append(exporters, builtExp.mc)
}

// Create a junction point that fans out to all exporters.
return multiconsumer.NewMetricsProcessor(exporters)
}
143 changes: 143 additions & 0 deletions cmd/occollector/app/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"context"
"testing"

"go.uber.org/zap"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"

"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal/configmodels"
"github.com/census-instrumentation/opencensus-service/internal/configv2"
"github.com/census-instrumentation/opencensus-service/processor/addattributesprocessor"
)

// Ensure attributes processor is registered.
var _ = addattributesprocessor.ConfigV2{}

// Register test factories used in the pipelines_builder.yaml test config.
var _ = configv2.RegisterTestFactories()

func TestPipelinesBuilder_Build(t *testing.T) {
tests := []struct {
name string
pipelineName string
exporterNames []string
}{
{
name: "one-exporter",
pipelineName: "traces",
exporterNames: []string{"exampleexporter"},
},
{
name: "multi-exporter",
pipelineName: "traces/2",
exporterNames: []string{"exampleexporter", "exampleexporter/2"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testPipeline(t, test.pipelineName, test.exporterNames)
})
}
}

func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
// Load the config
config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml")
require.Nil(t, err)

// Build the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), config).Build()
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), config, allExporters).Build()

assert.NoError(t, err)
require.NotNil(t, pipelineProcessors)

processor := pipelineProcessors[config.Pipelines[pipelineName]]

// Ensure pipeline has its fields correctly populated.
require.NotNil(t, processor)
assert.NotNil(t, processor.tc)
assert.Nil(t, processor.mc)

// Compose the list of created exporters.
var exporters []*builtExporter
for _, name := range exporterNames {
// Ensure exporter is created.
exp := allExporters[config.Exporters[name]]
require.NotNil(t, exp)
exporters = append(exporters, exp)
}

// Send TraceData via processor and verify that all exporters of the pipeline receive it.

// First check that there are no traces in the exporters yet.
var exporterConsumers []*configv2.ExampleExporterConsumer
for _, exporter := range exporters {
consumer := exporter.tc.(*configv2.ExampleExporterConsumer)
exporterConsumers = append(exporterConsumers, consumer)
require.Equal(t, len(consumer.Traces), 0)
}

// Send one trace.
name := tracepb.TruncatableString{Value: "testspanname"}
traceData := data.TraceData{
SourceFormat: "test-source-format",
Spans: []*tracepb.Span{
{Name: &name},
},
}
processor.tc.ConsumeTraceData(context.Background(), traceData)

// Now verify received data.
for _, consumer := range exporterConsumers {
// Check that the trace is received by exporter.
require.Equal(t, 1, len(consumer.Traces))
assert.Equal(t, traceData, consumer.Traces[0])

// Check that the span was processed by "attributes" processor and an
// attribute was added.
assert.Equal(t, int64(12345),
consumer.Traces[0].Spans[0].Attributes.AttributeMap["attr1"].GetIntValue())
}
}

func TestPipelinesBuilder_Error(t *testing.T) {
config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml")
require.Nil(t, err)

// Corrupt the pipeline, change data type to metrics. We have to forcedly do it here
// since there is no way to have such config loaded by LoadConfigFile, it would not
// pass validation. We are doing this to test failure mode of PipelinesBuilder.
pipeline := config.Pipelines["traces"]
pipeline.InputType = configmodels.MetricsDataType

exporters, err := NewExportersBuilder(zap.NewNop(), config).Build()

// This should fail because "attributes" processor defined in the config does
// not support metrics data type.
_, err = NewPipelinesBuilder(zap.NewNop(), config, exporters).Build()

assert.NotNil(t, err)
}
Loading

0 comments on commit 9317ffe

Please sign in to comment.