Skip to content

Commit

Permalink
Add consumer profiles (open-telemetry#10464)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
This adds profiles support for consumers.


<!-- Issue number if applicable -->
#### Link to tracking issue
Based on the discussion in open-telemetry#10375.

---------

Co-authored-by: Pablo Baeyens <pbaeyens31+github@gmail.com>
  • Loading branch information
2 people authored and codeboten committed Jul 9, 2024
1 parent 47bbc59 commit 9bfbc47
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 45 deletions.
25 changes: 25 additions & 0 deletions .chloggen/profile-consumer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: consumer/consumerprofiles

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow handling profiles in consumer.

# One or more tracking issues or pull requests related to the change
issues: [10464]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
42 changes: 6 additions & 36 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,22 @@ package consumer // import "go.opentelemetry.io/collector/consumer"

import (
"errors"

"go.opentelemetry.io/collector/consumer/internal"
)

// Capabilities describes the capabilities of a Processor.
type Capabilities struct {
// MutatesData is set to true if Consume* function of the
// processor modifies the input Traces, Logs or Metrics argument.
// Processors which modify the input data MUST set this flag to true. If the processor
// does not modify the data it MUST set this flag to false. If the processor creates
// a copy of the data before modifying then this flag can be safely set to false.
MutatesData bool
}

type baseConsumer interface {
Capabilities() Capabilities
}
type Capabilities = internal.Capabilities

var errNilFunc = errors.New("nil consumer func")

type baseImpl struct {
capabilities Capabilities
}

// Option to construct new consumers.
type Option func(*baseImpl)
type Option = internal.Option

// WithCapabilities overrides the default GetCapabilities function for a processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities Capabilities) Option {
return func(o *baseImpl) {
o.capabilities = capabilities
}
}

// Capabilities returns the capabilities of the component
func (bs baseImpl) Capabilities() Capabilities {
return bs.capabilities
}

func newBaseImpl(options ...Option) *baseImpl {
bs := &baseImpl{
capabilities: Capabilities{MutatesData: false},
return func(o *internal.BaseImpl) {
o.Cap = capabilities
}

for _, op := range options {
op(bs)
}

return bs
}
1 change: 1 addition & 0 deletions consumer/consumerprofiles/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
35 changes: 35 additions & 0 deletions consumer/consumerprofiles/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module go.opentelemetry.io/collector/consumer/consumerprofiles

go 1.21.0

replace go.opentelemetry.io/collector/pdata => ../../pdata

replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile

replace go.opentelemetry.io/collector/consumer => ../

require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/consumer v0.104.0
go.opentelemetry.io/collector/pdata/pprofile v0.104.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/pdata v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata
77 changes: 77 additions & 0 deletions consumer/consumerprofiles/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions consumer/consumerprofiles/profiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumerprofiles // import "go.opentelemetry.io/collector/consumer/consumerprofiles"

import (
"context"
"errors"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/pprofile"
)

var errNilFunc = errors.New("nil consumer func")

// Profiles is an interface that receives pprofile.Profiles, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Profiles interface {
internal.BaseConsumer
// ConsumeProfiles receives pprofile.Profiles for consumption.
ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error
}

// ConsumeProfilesFunc is a helper function that is similar to ConsumeProfiles.
type ConsumeProfilesFunc func(ctx context.Context, td pprofile.Profiles) error

// ConsumeProfiles calls f(ctx, td).
func (f ConsumeProfilesFunc) ConsumeProfiles(ctx context.Context, td pprofile.Profiles) error {
return f(ctx, td)
}

type baseProfiles struct {
*internal.BaseImpl
ConsumeProfilesFunc
}

// NewProfiles returns a Profiles configured with the provided options.
func NewProfiles(consume ConsumeProfilesFunc, options ...consumer.Option) (Profiles, error) {
if consume == nil {
return nil, errNilFunc
}
return &baseProfiles{
BaseImpl: internal.NewBaseImpl(options...),
ConsumeProfilesFunc: consume,
}, nil
}
51 changes: 51 additions & 0 deletions consumer/consumerprofiles/profiles_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumerprofiles

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pprofile"
)

func TestDefaultProfiles(t *testing.T) {
cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return nil })
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, cp.Capabilities())
}

func TestNilFuncProfiles(t *testing.T) {
_, err := NewProfiles(nil)
assert.Equal(t, errNilFunc, err)
}

func TestWithCapabilitiesProfiles(t *testing.T) {
cp, err := NewProfiles(
func(context.Context, pprofile.Profiles) error { return nil },
consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
assert.Equal(t, consumer.Capabilities{MutatesData: true}, cp.Capabilities())
}

func TestConsumeProfiles(t *testing.T) {
consumeCalled := false
cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { consumeCalled = true; return nil })
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
assert.True(t, consumeCalled)
}

func TestConsumeProfiles_ReturnError(t *testing.T) {
want := errors.New("my_error")
cp, err := NewProfiles(func(context.Context, pprofile.Profiles) error { return want })
assert.NoError(t, err)
assert.Equal(t, want, cp.ConsumeProfiles(context.Background(), pprofile.NewProfiles()))
}
42 changes: 42 additions & 0 deletions consumer/internal/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/consumer/internal"

// Capabilities describes the capabilities of a Processor.
type Capabilities struct {
// MutatesData is set to true if Consume* function of the
// processor modifies the input Traces, Logs or Metrics argument.
// Processors which modify the input data MUST set this flag to true. If the processor
// does not modify the data it MUST set this flag to false. If the processor creates
// a copy of the data before modifying then this flag can be safely set to false.
MutatesData bool
}

type BaseConsumer interface {
Capabilities() Capabilities
}

type BaseImpl struct {
Cap Capabilities
}

// Option to construct new consumers.
type Option func(*BaseImpl)

// Capabilities returns the capabilities of the component
func (bs BaseImpl) Capabilities() Capabilities {
return bs.Cap
}

func NewBaseImpl(options ...Option) *BaseImpl {
bs := &BaseImpl{
Cap: Capabilities{MutatesData: false},
}

for _, op := range options {
op(bs)
}

return bs
}
7 changes: 4 additions & 3 deletions consumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package consumer // import "go.opentelemetry.io/collector/consumer"
import (
"context"

"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/plog"
)

// Logs is an interface that receives plog.Logs, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Logs interface {
baseConsumer
internal.BaseConsumer
// ConsumeLogs receives plog.Logs for consumption.
ConsumeLogs(ctx context.Context, ld plog.Logs) error
}
Expand All @@ -26,7 +27,7 @@ func (f ConsumeLogsFunc) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
}

type baseLogs struct {
*baseImpl
*internal.BaseImpl
ConsumeLogsFunc
}

Expand All @@ -36,7 +37,7 @@ func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) {
return nil, errNilFunc
}
return &baseLogs{
baseImpl: newBaseImpl(options...),
BaseImpl: internal.NewBaseImpl(options...),
ConsumeLogsFunc: consume,
}, nil
}
Loading

0 comments on commit 9bfbc47

Please sign in to comment.