Skip to content

Commit

Permalink
Add a otelcol.exporter.kafka component.
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Apr 30, 2024
1 parent 00b32ef commit b40405e
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 0 deletions.
Empty file.
164 changes: 164 additions & 0 deletions internal/component/otelcol/exporter/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Package kafka provides an otelcol.exporter.kafka component.
package kafka

import (
"time"

"github.com/IBM/sarama"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/exporter"
alloy_kafka_receiver "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/syntax"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.exporter.kafka",
Stability: featuregate.StabilityPublicPreview,
Args: Arguments{},
Exports: otelcol.ConsumerExports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := kafkaexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll)
},
})
}

// Arguments configures the otelcol.exporter.kafka component.
type Arguments struct {
Brokers []string `alloy:"brokers,attr,optional"`
ProtocolVersion string `alloy:"protocol_version,attr"`
Topic string `alloy:"topic,attr,optional"`
Encoding string `alloy:"encoding,attr,optional"`
Timeout time.Duration `alloy:"timeout,attr,optional"`

Authentication alloy_kafka_receiver.AuthenticationArguments `alloy:"authentication,block,optional"`
Metadata alloy_kafka_receiver.MetadataArguments `alloy:"metadata,block,optional"`
Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"`
Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"`
Producer Producer `alloy:"producer,block,optional"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcol.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
}

// Producer defines configuration for producer
type Producer struct {
// Maximum message bytes the producer will accept to produce.
MaxMessageBytes int `alloy:"max_message_bytes,attr,optional"`

// RequiredAcks Number of acknowledgements required to assume that a message has been sent.
// https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#RequiredAcks
// The options are:
// 0 -> NoResponse. doesn't send any response
// 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default )
// -1 -> WaitForAll. waits for all in-sync replicas to commit before responding.
RequiredAcks int `alloy:"required_acks,attr,optional"`

// Compression Codec used to produce messages
// https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#CompressionCodec
// The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd'
Compression string `alloy:"compression,attr,optional"`

// The maximum number of messages the producer will send in a single
// broker request. Defaults to 0 for unlimited. Similar to
// `queue.buffering.max.messages` in the JVM producer.
FlushMaxMessages int `alloy:"flush_max_messages,attr,optional"`
}

// Convert converts args into the upstream type.
func (args Producer) Convert() kafkaexporter.Producer {
return kafkaexporter.Producer{
MaxMessageBytes: args.MaxMessageBytes,
RequiredAcks: sarama.RequiredAcks(args.RequiredAcks),
Compression: args.Compression,
FlushMaxMessages: args.FlushMaxMessages,
}
}

var (
_ syntax.Validator = (*Arguments)(nil)
_ syntax.Defaulter = (*Arguments)(nil)
_ exporter.Arguments = (*Arguments)(nil)
)

// SetToDefault implements syntax.Defaulter.
func (args *Arguments) SetToDefault() {
*args = Arguments{
Topic: "otlp_spans",
Encoding: "otlp_proto",
Brokers: []string{"localhost:9092"},
Timeout: 5 * time.Second,
Metadata: alloy_kafka_receiver.MetadataArguments{
IncludeAllTopics: true,
Retry: alloy_kafka_receiver.MetadataRetryArguments{
MaxRetries: 3,
Backoff: 250 * time.Millisecond,
},
},
Producer: Producer{
MaxMessageBytes: 1000000,
RequiredAcks: 1,
Compression: "none",
FlushMaxMessages: 0,
},
}
args.Retry.SetToDefault()
args.Queue.SetToDefault()
args.DebugMetrics.SetToDefault()
}

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
//TODO: Implement this later
return nil
}

// Convert implements exporter.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
input := make(map[string]interface{})
input["auth"] = args.Authentication.Convert()

var result kafkaexporter.Config
err := mapstructure.Decode(input, &result)
if err != nil {
return nil, err
}

result.Brokers = args.Brokers
result.ProtocolVersion = args.ProtocolVersion
result.Topic = args.Topic
result.Encoding = args.Encoding
result.TimeoutSettings = exporterhelper.TimeoutSettings{
Timeout: args.Timeout,
}
result.Metadata = args.Metadata.Convert()
result.BackOffConfig = *args.Retry.Convert()
result.QueueSettings = *args.Queue.Convert()
result.Producer = args.Producer.Convert()

return &result, nil
}

// Extensions implements exporter.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements exporter.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// DebugMetricsConfig implements receiver.Arguments.
func (args Arguments) DebugMetricsConfig() otelcol.DebugMetricsArguments {
return args.DebugMetrics
}
135 changes: 135 additions & 0 deletions internal/component/otelcol/exporter/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package kafka_test

import (
"testing"
"time"

"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/exporter/kafka"
"github.com/grafana/alloy/syntax"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/stretchr/testify/require"
)

func TestArguments_UnmarshalAlloy(t *testing.T) {
tests := []struct {
testName string
cfg string
expected map[string]interface{}
}{
{
testName: "Defaults",
cfg: `
protocol_version = "2.0.0"
`,
expected: map[string]interface{}{
"brokers": []string{"localhost:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"timeout": 5 * time.Second,
"authentication": map[string]interface{}{},
"metadata": map[string]interface{}{
"full": true,
"retry": map[string]interface{}{
"max": 3,
"backoff": 250 * time.Millisecond,
},
},
"retry_on_failure": map[string]interface{}{
//TODO: Add more parameters, which are not in the otel docs?
"enabled": true,
"initial_interval": 5 * time.Second,
"randomization_factor": 0.5,
"multiplier": 1.5,
"max_interval": 30 * time.Second,
"max_elapsed_time": 5 * time.Minute,
},
"sending_queue": map[string]interface{}{
"enabled": true,
"num_consumers": 10,
"queue_size": 1000,
},
"producer": map[string]interface{}{
"max_message_bytes": 1000000,
"required_acks": 1,
"compression": "none",
"flush_max_messages": 0,
},
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var expected kafkaexporter.Config
err := mapstructure.Decode(tc.expected, &expected)
require.NoError(t, err)

var args kafka.Arguments
err = syntax.Unmarshal([]byte(tc.cfg), &args)
require.NoError(t, err)

actualPtr, err := args.Convert()
require.NoError(t, err)

actual := actualPtr.(*kafkaexporter.Config)

require.Equal(t, expected, *actual)
})
}
}

func TestDebugMetricsConfig(t *testing.T) {
tests := []struct {
testName string
agentCfg string
expected otelcol.DebugMetricsArguments
}{
{
testName: "default",
agentCfg: `
protocol_version = "2.0.0"
`,
expected: otelcol.DebugMetricsArguments{
DisableHighCardinalityMetrics: true,
},
},
{
testName: "explicit_false",
agentCfg: `
protocol_version = "2.0.0"
debug_metrics {
disable_high_cardinality_metrics = false
}
`,
expected: otelcol.DebugMetricsArguments{
DisableHighCardinalityMetrics: false,
},
},
{
testName: "explicit_true",
agentCfg: `
protocol_version = "2.0.0"
debug_metrics {
disable_high_cardinality_metrics = true
}
`,
expected: otelcol.DebugMetricsArguments{
DisableHighCardinalityMetrics: true,
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args kafka.Arguments
require.NoError(t, syntax.Unmarshal([]byte(tc.agentCfg), &args))
_, err := args.Convert()
require.NoError(t, err)

require.Equal(t, tc.expected, args.DebugMetricsConfig())
})
}
}

0 comments on commit b40405e

Please sign in to comment.