diff --git a/CHANGELOG.md b/CHANGELOG.md index c2b3111066..f7e957d2ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ Main (unreleased) - Update Public preview `remotecfg` to use `alloy-remote-config` instead of `agent-remote-config`. The API has been updated to use the term `collector` over `agent`. (@erikbaranowski) +### Features + +- Adding an `otelcol.exporter.kafka` component for sending OTLP metrics, logs, and traces to Kafka. + ### Enhancements - (_Public preview_) Add native histogram support to `otelcol.receiver.prometheus`. (@wildum) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index f391fe7f21..5747b2720e 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -285,6 +285,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol. - [otelcol.connector.servicegraph](../components/otelcol.connector.servicegraph) - [otelcol.connector.spanlogs](../components/otelcol.connector.spanlogs) - [otelcol.connector.spanmetrics](../components/otelcol.connector.spanmetrics) +- [otelcol.exporter.kafka](../components/otelcol.exporter.kafka) - [otelcol.exporter.loadbalancing](../components/otelcol.exporter.loadbalancing) - [otelcol.exporter.logging](../components/otelcol.exporter.logging) - [otelcol.exporter.loki](../components/otelcol.exporter.loki) diff --git a/docs/sources/reference/components/otelcol.exporter.kafka.md b/docs/sources/reference/components/otelcol.exporter.kafka.md new file mode 100644 index 0000000000..a13b20c1e2 --- /dev/null +++ b/docs/sources/reference/components/otelcol.exporter.kafka.md @@ -0,0 +1,323 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol.exporter.kafka/ +description: Learn about otelcol.exporter.kafka +title: otelcol.exporter.kafka +--- + +# otelcol.exporter.kafka + +`otelcol.exporter.kafka` accepts logs, metrics, and traces telemetry data from +other `otelcol` components and sends it to Kafka. + +> **NOTE**: `otelcol.exporter.kafka` is a wrapper over the upstream +> OpenTelemetry Collector `kafka` exporter from the `otelcol-contrib` +> distribution. Bug reports or feature requests will be redirected to the +> upstream repository, if necessary. + +Multiple `otelcol.exporter.kafka` components can be specified by giving them +different labels. + +## Usage + +```alloy +otelcol.exporter.kafka "LABEL" { + protocol_version = "PROTOCOL_VERSION" +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +------------------------------------------ | --------------- | ----------------------------------------------------------------------------------- | -------------------- | -------- +`protocol_version` | `string` | Kafka protocol version to use. | | yes +`brokers` | `list(string)` | Kafka brokers to connect to. | `["localhost:9092"]` | no +`topic` | `string` | Kafka topic to read from. | _See below_ | no +`topic_from_attribute` | `string` | A resource attribute whose value should be used as the message's topic. | `""` | no +`encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no +`client_id` | `string` | Consumer client ID to use. The ID will be used for all produce requests. | `"sarama"` | no +`timeout` | `duration` | The timeout for every attempt to send data to the backend. | `"5s"` | no +`resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no +`partition_traces_by_id` | `bool` | Whether to include the trace ID as the message key in trace messages sent to Kafka. | `"false"` | no +`partition_metrics_by_resource_attributes` | `bool` | Whether to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to Kafka. | `"false"` | no + +If `topic` is not set, different topics will be used for different telemetry signals: + +* Metrics will be received from an `otlp_metrics` topic. +* Traces will be received from an `otlp_spans` topic. +* Logs will be received from an `otlp_logs` topic. + +If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block. +For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces. +If it contains only metrics, then `otelcol.exporter.kafka` should be configured to process only metrics. + +When `topic_from_attribute` is set, it will take precedence over `topic`. + +The `encoding` argument determines how to encode messages sent to Kafka. +`encoding` must be one of the following strings: +* Encodings which work for traces, logs, and metrics: + * `"otlp_proto"`: Encode messages as OTLP protobuf. + * `"otlp_json"`: Encode messages as OTLP JSON. +* Encodings which work only for traces: + * `"jaeger_proto"`: The payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID. + * `"jaeger_json"`: The payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID. + * `"zipkin_proto"`: The payload is serialized to Zipkin v2 proto Span. + * `"zipkin_json"`: The payload is serialized to Zipkin v2 JSON Span. +* Encodings which work only for logs: + * `"raw"`: If the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. + +`partition_traces_by_id` does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. + +## Blocks + +The following blocks are supported inside the definition of `otelcol.exporter.kafka`: + +Hierarchy | Block | Description | Required +-------------------------------- | ------------------- | --------------------------------------------------------------------------- | -------- +authentication | [authentication][] | Configures authentication for connecting to Kafka brokers. | no +authentication > plaintext | [plaintext][] | Authenticates against Kafka brokers with plaintext. | no +authentication > sasl | [sasl][] | Authenticates against Kafka brokers with SASL. | no +authentication > sasl > aws_msk | [aws_msk][] | Additional SASL parameters when using AWS_MSK_IAM. | no +authentication > tls | [tls][] | Configures TLS for connecting to the Kafka brokers. | no +authentication > kerberos | [kerberos][] | Authenticates against Kafka brokers with Kerberos. | no +metadata | [metadata][] | Configures how to retrieve metadata from Kafka brokers. | no +metadata > retry | [retry][] | Configures how to retry metadata retrieval. | no +retry_on_failure | [retry_on_failure][] | Configures retry mechanism for failed requests. | no +queue | [queue][] | Configures batching of data before sending. | no +producer | [producer][] | Kafka producer configuration, | no +debug_metrics | [debug_metrics][] | Configures the metrics which this component generates to monitor its state. | no + +The `>` symbol indicates deeper levels of nesting. +For example, `authentication > tls` refers to a `tls` block defined inside an `authentication` block. + +[authentication]: #authentication-block +[plaintext]: #plaintext-block +[sasl]: #sasl-block +[aws_msk]: #aws_msk-block +[tls]: #tls-block +[kerberos]: #kerberos-block +[metadata]: #metadata-block +[retry]: #retry-block +[retry_on_failure]: #retry_on_failure-block +[queue]: #queue-block +[producer]: #producer-block +[debug_metrics]: #debug_metrics-block + +### authentication block + +The `authentication` block holds the definition of different authentication +mechanisms to use when connecting to Kafka brokers. It doesn't support any +arguments and is configured fully through inner blocks. + +### plaintext block + +The `plaintext` block configures `PLAIN` authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for `PLAIN` authentication. | | yes +`password` | `secret` | Password to use for `PLAIN` authentication. | | yes + +### sasl block + +The `sasl` block configures SASL authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for SASL authentication. | | yes +`password` | `secret` | Password to use for SASL authentication. | | yes +`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes +`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no + +The `mechanism` argument can be set to one of the following strings: + +* `"PLAIN"` +* `"AWS_MSK_IAM"` +* `"SCRAM-SHA-256"` +* `"SCRAM-SHA-512"` + +When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. + +The `version` argument can be set to either `0` or `1`. + +### aws_msk block + +The `aws_msk` block configures extra parameters for SASL authentication when +using the `AWS_MSK_IAM` mechanism. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`region` | `string` | AWS region the MSK cluster is based in. | | yes +`broker_addr` | `string` | MSK address to connect to for authentication. | | yes + +### tls block + +The `tls` block configures TLS settings used for connecting to the Kafka +brokers. If the `tls` block isn't provided, TLS won't be used for +communication. + +{{< docs/shared lookup="reference/components/otelcol-tls-client-block.md" source="alloy" version="" >}} + +### kerberos block + +The `kerberos` block configures Kerberos authentication against the Kafka +broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`service_name` | `string` | Kerberos service name. | | no +`realm` | `string` | Kerberos realm. | | no +`use_keytab` | `string` | Enables using keytab instead of password. | | no +`username` | `string` | Kerberos username to authenticate as. | | yes +`password` | `secret` | Kerberos password to authenticate with. | | no +`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no +`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no + +When `use_keytab` is `false`, the `password` argument is required. When +`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is +used for authentication instead. At most one of `password` or `keytab_file` +must be provided. + +### metadata block + +The `metadata` block configures how to retrieve and store metadata from the +Kafka broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no + +If the `include_all_topics` argument is `true`, `otelcol.exporter.kafka` +maintains a full set of metadata for all topics rather than the minimal set +that has been necessary so far. Including the full set of metadata is more +convenient for users but can consume a substantial amount of memory if you have +many topics and partitions. + +Retrieving metadata may fail if the Kafka broker is starting up at the same +time as the `otelcol.exporter.kafka` component. The [`retry` child +block][retry] can be provided to customize retry behavior. + +### retry block + +The `retry` block configures how to retry retrieving metadata when retrieval +fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no +`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no + +### retry_on_failure block + +The `retry_on_failure` block configures how failed requests to Kafka are retried. + +{{< docs/shared lookup="reference/components/otelcol-retry-block.md" source="alloy" version="" >}} + +### queue block + +The `queue` block configures an in-memory buffer of batches before data is sent to the gRPC server. + +{{< docs/shared lookup="reference/components/otelcol-queue-block.md" source="alloy" version="" >}} + +### producer block + +The `producer` block configures how to retry retrieving metadata when retrieval fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_message_bytes` | `number` | The maximum permitted size of a message in bytes. | `1000000` | no +`required_acks` | `number` | Controls when a message is regarded as transmitted. | `1` | no +`compression` | `string` | Time to wait between retries. | `"none"` | no +`flush_max_messages` | `number` | Time to wait between retries. | `0` | no + +Refer to the [sarama documentation][RequiredAcks] for more information on `required_acks`. + +`compression` could be set to either `none`, `gzip`, `snappy`, `lz4`, or `zstd`. +Refer to the [sarama documentation][CompressionCodec] for more information. + +[RequiredAcks]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#RequiredAcks +[CompressionCodec]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#CompressionCodec + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +--------|--------------------|----------------------------------------------------------------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, logs, or traces). + +## Component health + +`otelcol.exporter.kafka` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.exporter.kafka` does not expose any component-specific debug +information. + +## Example + +This example forwards telemetry data through a batch processor before finally sending it to Kafka: + +```alloy +otelcol.receiver.otlp "default" { + http {} + grpc {} + + output { + metrics = [otelcol.processor.batch.default.input] + logs = [otelcol.processor.batch.default.input] + traces = [otelcol.processor.batch.default.input] + } +} + +otelcol.processor.batch "default" { + output { + metrics = [otelcol.exporter.kafka.default.input] + logs = [otelcol.exporter.kafka.default.input] + traces = [otelcol.exporter.kafka.default.input] + } +} + +otelcol.exporter.kafka "default" { + brokers = ["localhost:9092"] + protocol_version = "2.0.0" +} +``` + + + +## Compatible components + +`otelcol.exporter.kafka` has exports that can be consumed by the following components: + +- Components that consume [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + \ No newline at end of file diff --git a/docs/sources/reference/components/otelcol.receiver.kafka.md b/docs/sources/reference/components/otelcol.receiver.kafka.md index e0664d1ced..acd01b63ac 100644 --- a/docs/sources/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/reference/components/otelcol.receiver.kafka.md @@ -40,7 +40,7 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `brokers` | `array(string)` | Kafka brokers to connect to. | | yes `protocol_version` | `string` | Kafka protocol version to use. | | yes -`topic` | `string` | Kafka topic to read from. | | no +`topic` | `string` | Kafka topic to read from. | _See below_ | no `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no `client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no diff --git a/internal/component/all/all.go b/internal/component/all/all.go index f852e62341..760153c3fd 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -67,6 +67,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/connector/servicegraph" // Import otelcol.connector.servicegraph _ "github.com/grafana/alloy/internal/component/otelcol/connector/spanlogs" // Import otelcol.connector.spanlogs _ "github.com/grafana/alloy/internal/component/otelcol/connector/spanmetrics" // Import otelcol.connector.spanmetrics + _ "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" // Import otelcol.exporter.kafka _ "github.com/grafana/alloy/internal/component/otelcol/exporter/loadbalancing" // Import otelcol.exporter.loadbalancing _ "github.com/grafana/alloy/internal/component/otelcol/exporter/logging" // Import otelcol.exporter.logging _ "github.com/grafana/alloy/internal/component/otelcol/exporter/loki" // Import otelcol.exporter.loki diff --git a/internal/component/otelcol/exporter/kafka/kafka.go b/internal/component/otelcol/exporter/kafka/kafka.go new file mode 100644 index 0000000000..2b0a5f0d8c --- /dev/null +++ b/internal/component/otelcol/exporter/kafka/kafka.go @@ -0,0 +1,179 @@ +// Package kafka provides an otelcol.exporter.kafka component. +package kafka + +import ( + "time" + + "github.com/IBM/sarama" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/exporter" + alloy_kafka_receiver "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/syntax" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + otelcomponent "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper" + otelextension "go.opentelemetry.io/collector/extension" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.exporter.kafka", + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := kafkaexporter.NewFactory() + return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + }, + }) +} + +// Arguments configures the otelcol.exporter.kafka component. +type Arguments struct { + ProtocolVersion string `alloy:"protocol_version,attr"` + Brokers []string `alloy:"brokers,attr,optional"` + ResolveCanonicalBootstrapServersOnly bool `alloy:"resolve_canonical_bootstrap_servers_only,attr,optional"` + ClientID string `alloy:"client_id,attr,optional"` + Topic string `alloy:"topic,attr,optional"` + TopicFromAttribute string `alloy:"topic_from_attribute,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + PartitionTracesByID bool `alloy:"partition_traces_by_id,attr,optional"` + PartitionMetricsByResourceAttributes bool `alloy:"partition_metrics_by_resource_attributes,attr,optional"` + Timeout time.Duration `alloy:"timeout,attr,optional"` + + Authentication alloy_kafka_receiver.AuthenticationArguments `alloy:"authentication,block,optional"` + Metadata alloy_kafka_receiver.MetadataArguments `alloy:"metadata,block,optional"` + Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"` + Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"` + Producer Producer `alloy:"producer,block,optional"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` +} + +// Producer defines configuration for producer +type Producer struct { + // Maximum message bytes the producer will accept to produce. + MaxMessageBytes int `alloy:"max_message_bytes,attr,optional"` + + // RequiredAcks Number of acknowledgements required to assume that a message has been sent. + // https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#RequiredAcks + // The options are: + // 0 -> NoResponse. doesn't send any response + // 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default ) + // -1 -> WaitForAll. waits for all in-sync replicas to commit before responding. + RequiredAcks int `alloy:"required_acks,attr,optional"` + + // Compression Codec used to produce messages + // https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#CompressionCodec + // The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd' + Compression string `alloy:"compression,attr,optional"` + + // The maximum number of messages the producer will send in a single + // broker request. Defaults to 0 for unlimited. Similar to + // `queue.buffering.max.messages` in the JVM producer. + FlushMaxMessages int `alloy:"flush_max_messages,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args Producer) Convert() kafkaexporter.Producer { + return kafkaexporter.Producer{ + MaxMessageBytes: args.MaxMessageBytes, + RequiredAcks: sarama.RequiredAcks(args.RequiredAcks), + Compression: args.Compression, + FlushMaxMessages: args.FlushMaxMessages, + } +} + +var ( + _ syntax.Validator = (*Arguments)(nil) + _ syntax.Defaulter = (*Arguments)(nil) + _ exporter.Arguments = (*Arguments)(nil) +) + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{ + Encoding: "otlp_proto", + Brokers: []string{"localhost:9092"}, + ClientID: "sarama", + Timeout: 5 * time.Second, + Metadata: alloy_kafka_receiver.MetadataArguments{ + IncludeAllTopics: true, + Retry: alloy_kafka_receiver.MetadataRetryArguments{ + MaxRetries: 3, + Backoff: 250 * time.Millisecond, + }, + }, + Producer: Producer{ + MaxMessageBytes: 1000000, + RequiredAcks: 1, + Compression: "none", + FlushMaxMessages: 0, + }, + } + args.Retry.SetToDefault() + args.Queue.SetToDefault() + args.DebugMetrics.SetToDefault() +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + otelCfg, err := args.Convert() + if err != nil { + return err + } + kafkaCfg := otelCfg.(*kafkaexporter.Config) + return kafkaCfg.Validate() +} + +// Convert implements exporter.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + input := make(map[string]interface{}) + input["auth"] = args.Authentication.Convert() + + var result kafkaexporter.Config + err := mapstructure.Decode(input, &result) + if err != nil { + return nil, err + } + + result.Brokers = args.Brokers + result.ResolveCanonicalBootstrapServersOnly = args.ResolveCanonicalBootstrapServersOnly + result.ProtocolVersion = args.ProtocolVersion + result.ClientID = args.ClientID + result.Topic = args.Topic + result.TopicFromAttribute = args.TopicFromAttribute + result.Encoding = args.Encoding + result.PartitionTracesByID = args.PartitionTracesByID + result.PartitionMetricsByResourceAttributes = args.PartitionMetricsByResourceAttributes + result.TimeoutSettings = exporterhelper.TimeoutSettings{ + Timeout: args.Timeout, + } + result.Metadata = args.Metadata.Convert() + result.BackOffConfig = *args.Retry.Convert() + result.QueueSettings = *args.Queue.Convert() + result.Producer = args.Producer.Convert() + + return &result, nil +} + +// Extensions implements exporter.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements exporter.Arguments. +func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// DebugMetricsConfig implements exporter.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/exporter/kafka/kafka_test.go b/internal/component/otelcol/exporter/kafka/kafka_test.go new file mode 100644 index 0000000000..85d2154a46 --- /dev/null +++ b/internal/component/otelcol/exporter/kafka/kafka_test.go @@ -0,0 +1,240 @@ +package kafka_test + +import ( + "testing" + "time" + + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" + "github.com/grafana/alloy/syntax" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/stretchr/testify/require" +) + +func TestArguments_UnmarshalAlloy(t *testing.T) { + tests := []struct { + testName string + cfg string + expected map[string]interface{} + }{ + { + testName: "Defaults", + cfg: ` + protocol_version = "2.0.0" + `, + expected: map[string]interface{}{ + "brokers": []string{"localhost:9092"}, + "protocol_version": "2.0.0", + "resolve_canonical_bootstrap_servers_only": false, + "client_id": "sarama", + "topic": "", + "topic_from_attribute": "", + "encoding": "otlp_proto", + "partition_traces_by_id": false, + "partition_metrics_by_resource_attributes": false, + "timeout": 5 * time.Second, + "authentication": map[string]interface{}{}, + + "metadata": map[string]interface{}{ + "full": true, + "retry": map[string]interface{}{ + "max": 3, + "backoff": 250 * time.Millisecond, + }, + }, + "retry_on_failure": map[string]interface{}{ + "enabled": true, + "initial_interval": 5 * time.Second, + "randomization_factor": 0.5, + "multiplier": 1.5, + "max_interval": 30 * time.Second, + "max_elapsed_time": 5 * time.Minute, + }, + "sending_queue": map[string]interface{}{ + "enabled": true, + "num_consumers": 10, + "queue_size": 1000, + }, + "producer": map[string]interface{}{ + "max_message_bytes": 1000000, + "required_acks": 1, + "compression": "none", + "flush_max_messages": 0, + }, + }, + }, + { + testName: "Explicit", + cfg: ` + protocol_version = "2.0.0" + brokers = ["redpanda:123"] + resolve_canonical_bootstrap_servers_only = true + client_id = "my-client" + topic = "my-topic" + topic_from_attribute = "my-attr" + encoding = "otlp_json" + partition_traces_by_id = true + partition_metrics_by_resource_attributes = true + timeout = "12s" + + authentication { + plaintext { + username = "user" + password = "pass" + } + } + + metadata { + include_all_topics = false + retry { + max_retries = 5 + backoff = "511ms" + } + } + + retry_on_failure { + enabled = true + initial_interval = "10s" + randomization_factor = 0.1 + multiplier = 2.0 + max_interval = "61s" + max_elapsed_time = "11m" + } + + sending_queue { + enabled = true + num_consumers = 11 + queue_size = 1001 + } + + producer { + max_message_bytes = 2000001 + required_acks = 0 + compression = "gzip" + flush_max_messages = 101 + } + `, + expected: map[string]interface{}{ + "brokers": []string{"redpanda:123"}, + "protocol_version": "2.0.0", + "resolve_canonical_bootstrap_servers_only": true, + "client_id": "my-client", + "topic": "my-topic", + "topic_from_attribute": "my-attr", + "encoding": "otlp_json", + "partition_traces_by_id": true, + "partition_metrics_by_resource_attributes": true, + "timeout": 12 * time.Second, + "auth": map[string]interface{}{ + "plain_text": map[string]interface{}{ + "username": "user", + "password": "pass", + }, + }, + + "metadata": map[string]interface{}{ + "full": false, + "retry": map[string]interface{}{ + "max": 5, + "backoff": 511 * time.Millisecond, + }, + }, + "retry_on_failure": map[string]interface{}{ + "enabled": true, + "initial_interval": 10 * time.Second, + "randomization_factor": 0.1, + "multiplier": 2.0, + "max_interval": 61 * time.Second, + "max_elapsed_time": 11 * time.Minute, + }, + "sending_queue": map[string]interface{}{ + "enabled": true, + "num_consumers": 11, + "queue_size": 1001, + }, + "producer": map[string]interface{}{ + "max_message_bytes": 2000001, + "required_acks": 0, + "compression": "gzip", + "flush_max_messages": 101, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var expected kafkaexporter.Config + err := mapstructure.Decode(tc.expected, &expected) + require.NoError(t, err) + + var args kafka.Arguments + err = syntax.Unmarshal([]byte(tc.cfg), &args) + require.NoError(t, err) + + actualPtr, err := args.Convert() + require.NoError(t, err) + + actual := actualPtr.(*kafkaexporter.Config) + + require.Equal(t, expected, *actual) + }) + } +} + +func TestDebugMetricsConfig(t *testing.T) { + tests := []struct { + testName string + agentCfg string + expected otelcolCfg.DebugMetricsArguments + }{ + { + testName: "default", + agentCfg: ` + protocol_version = "2.0.0" + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: true, + Level: otelcolCfg.LevelDetailed, + }, + }, + { + testName: "explicit_false", + agentCfg: ` + protocol_version = "2.0.0" + debug_metrics { + disable_high_cardinality_metrics = false + } + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: false, + Level: otelcolCfg.LevelDetailed, + }, + }, + { + testName: "explicit_true", + agentCfg: ` + protocol_version = "2.0.0" + debug_metrics { + disable_high_cardinality_metrics = true + } + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: true, + Level: otelcolCfg.LevelDetailed, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args kafka.Arguments + require.NoError(t, syntax.Unmarshal([]byte(tc.agentCfg), &args)) + _, err := args.Convert() + require.NoError(t, err) + + require.Equal(t, tc.expected, args.DebugMetricsConfig()) + }) + } +} diff --git a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go new file mode 100644 index 0000000000..4ade6e5396 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go @@ -0,0 +1,72 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, kafkaExporterConverter{}) +} + +type kafkaExporterConverter struct{} + +func (kafkaExporterConverter) Factory() component.Factory { return kafkaexporter.NewFactory() } + +func (kafkaExporterConverter) InputComponentName() string { + return "otelcol.exporter.kafka" +} + +func (kafkaExporterConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toKafkaExporter(state, id, cfg.(*kafkaexporter.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "kafka"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toKafkaExporter(state *State, id component.InstanceID, cfg *kafkaexporter.Config) *kafka.Arguments { + return &kafka.Arguments{ + Brokers: cfg.Brokers, + ProtocolVersion: cfg.ProtocolVersion, + ResolveCanonicalBootstrapServersOnly: cfg.ResolveCanonicalBootstrapServersOnly, + ClientID: cfg.ClientID, + Topic: cfg.Topic, + TopicFromAttribute: cfg.TopicFromAttribute, + Encoding: cfg.Encoding, + PartitionTracesByID: cfg.PartitionTracesByID, + PartitionMetricsByResourceAttributes: cfg.PartitionMetricsByResourceAttributes, + Timeout: cfg.Timeout, + + Authentication: toKafkaAuthentication(encodeMapstruct(cfg.Authentication)), + Metadata: toKafkaMetadata(cfg.Metadata), + Retry: toRetryArguments(cfg.BackOffConfig), + Queue: toQueueArguments(cfg.QueueSettings), + Producer: toKafkaProducer(cfg.Producer), + + DebugMetrics: common.DefaultValue[kafka.Arguments]().DebugMetrics, + } +} + +func toKafkaProducer(cfg kafkaexporter.Producer) kafka.Producer { + return kafka.Producer{ + MaxMessageBytes: cfg.MaxMessageBytes, + Compression: cfg.Compression, + RequiredAcks: int(cfg.RequiredAcks), + FlushMaxMessages: cfg.FlushMaxMessages, + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy index b98eabaf2f..bdb45cfc7b 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy @@ -11,8 +11,8 @@ otelcol.receiver.kafka "default" { sasl { username = "fakeusername" password = "fakepassword" - mechanism = "somemechanism" - version = 5 + mechanism = "AWS_MSK_IAM" + version = 1 aws_msk { region = "us-east-1" @@ -33,14 +33,56 @@ otelcol.receiver.kafka "default" { } output { - metrics = [otelcol.exporter.otlp.default.input] - logs = [otelcol.exporter.otlp.default.input] - traces = [otelcol.exporter.otlp.default.input] + metrics = [otelcol.exporter.kafka.default.input] + logs = [otelcol.exporter.kafka.default.input] + traces = [otelcol.exporter.kafka.default.input] } } -otelcol.exporter.otlp "default" { - client { - endpoint = "database:4317" +otelcol.exporter.kafka "default" { + protocol_version = "2.0.0" + brokers = ["redpanda:9092"] + resolve_canonical_bootstrap_servers_only = true + client_id = "otelcol" + topic_from_attribute = "my_topic" + encoding = "otlp_json" + partition_traces_by_id = true + partition_metrics_by_resource_attributes = true + timeout = "11s" + + authentication { + plaintext { + username = "fakeusername" + password = "fakepassword" + } + + sasl { + username = "fakeusername" + password = "fakepassword" + mechanism = "SCRAM-SHA-256" + version = 1 + + aws_msk { + region = "us-east-1" + broker_addr = "broker:9092" + } + } + + tls { + insecure = true + } + + kerberos { + service_name = "someservice" + realm = "myrealm" + username = "fakeusername" + password = "fakepassword" + } + } + + producer { + max_message_bytes = 1000001 + compression = "snappy" + flush_max_messages = 11 } } diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.yaml b/internal/converter/internal/otelcolconvert/testdata/kafka.yaml index fb8455bbf5..d5f563c3de 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.yaml +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.yaml @@ -9,8 +9,8 @@ receivers: sasl: username: fakeusername password: fakepassword - mechanism: somemechanism - version: 5 + mechanism: AWS_MSK_IAM + version: 1 aws_msk: region: us-east-1 broker_addr: broker:9092 @@ -24,20 +24,52 @@ receivers: exporters: - otlp: - endpoint: database:4317 + kafka: + brokers: redpanda:9092 + protocol_version: 2.0.0 + resolve_canonical_bootstrap_servers_only: true + client_id: otelcol + topic_from_attribute: my_topic + partition_traces_by_id: true + partition_metrics_by_resource_attributes: true + timeout: 11s + encoding: otlp_json + auth: + plain_text: + username: fakeusername + password: fakepassword + sasl: + username: fakeusername + password: fakepassword + mechanism: SCRAM-SHA-256 + version: 1 + aws_msk: + region: us-east-1 + broker_addr: broker:9092 + tls: + insecure: true + kerberos: + username: fakeusername + password: fakepassword + service_name: someservice + realm: myrealm + producer: + max_message_bytes: 1000001 + compression: snappy + required_acks: 0 + flush_max_messages: 11 service: pipelines: metrics: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] logs: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] traces: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] diff --git a/internal/tools/docs_generator/docs_updated_test.go b/internal/tools/docs_generator/docs_updated_test.go index 65358f908a..2a67fd3dda 100644 --- a/internal/tools/docs_generator/docs_updated_test.go +++ b/internal/tools/docs_generator/docs_updated_test.go @@ -64,12 +64,12 @@ func runForGenerator(t *testing.T, g generator.DocsGenerator) { } actual, err := g.Read() - require.NoError(t, err, "failed to read existing generated docs for %q, try running 'go generate ./docs'", g.Name()) + require.NoError(t, err, "failed to read existing generated docs for %q, try running 'go generate ./internal/tools/docs_generator/", g.Name()) require.Contains( t, actual, strings.TrimSpace(generated), - "outdated docs detected when running %q, try updating with 'go generate ./docs'", + "outdated docs detected when running %q, try updating with 'go generate ./internal/tools/docs_generator/", g.Name(), ) }