diff --git a/CHANGELOG.md b/CHANGELOG.md index 458cd752ce..142646a0b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ Main (unreleased) - Update Public preview `remotecfg` to use `alloy-remote-config` instead of `agent-remote-config`. The API has been updated to use the term `collector` over `agent`. (@erikbaranowski) +### Features + +- Add an `otelcol.exporter.kafka` component to send OTLP metrics, logs, and traces to Kafka. + ### Enhancements - (_Public preview_) Add native histogram support to `otelcol.receiver.prometheus`. (@wildum) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index f391fe7f21..5747b2720e 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -285,6 +285,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol. - [otelcol.connector.servicegraph](../components/otelcol.connector.servicegraph) - [otelcol.connector.spanlogs](../components/otelcol.connector.spanlogs) - [otelcol.connector.spanmetrics](../components/otelcol.connector.spanmetrics) +- [otelcol.exporter.kafka](../components/otelcol.exporter.kafka) - [otelcol.exporter.loadbalancing](../components/otelcol.exporter.loadbalancing) - [otelcol.exporter.logging](../components/otelcol.exporter.logging) - [otelcol.exporter.loki](../components/otelcol.exporter.loki) diff --git a/docs/sources/reference/components/otelcol.exporter.kafka.md b/docs/sources/reference/components/otelcol.exporter.kafka.md new file mode 100644 index 0000000000..c1c2b81fdd --- /dev/null +++ b/docs/sources/reference/components/otelcol.exporter.kafka.md @@ -0,0 +1,244 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol.exporter.kafka/ +description: Learn about otelcol.exporter.kafka +title: otelcol.exporter.kafka +--- + +# otelcol.exporter.kafka + +`otelcol.exporter.kafka` accepts logs, metrics, and traces telemetry data from +other `otelcol` components and sends it to Kafka. + +It is important to use `otelcol.exporter.kafka` together with `otelcol.processor.batch` +to make sure `otelcol.exporter.kafka` doesn't slow down due to sending Kafka a huge number of small payloads. + +{{< admonition type="note" >}} +`otelcol.exporter.kafka` is a wrapper over the upstream OpenTelemetry Collector `kafka` exporter from the `otelcol-contrib` distribution. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} + +Multiple `otelcol.exporter.kafka` components can be specified by giving them +different labels. + +## Usage + +```alloy +otelcol.exporter.kafka "LABEL" { + protocol_version = "PROTOCOL_VERSION" +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +------------------------------------------ | --------------- | ----------------------------------------------------------------------------------- | -------------------- | -------- +`protocol_version` | `string` | Kafka protocol version to use. | | yes +`brokers` | `list(string)` | Kafka brokers to connect to. | `["localhost:9092"]` | no +`topic` | `string` | Kafka topic to send to. | _See below_ | no +`topic_from_attribute` | `string` | A resource attribute whose value should be used as the message's topic. | `""` | no +`encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no +`client_id` | `string` | Consumer client ID to use. The ID will be used for all produce requests. | `"sarama"` | no +`timeout` | `duration` | The timeout for every attempt to send data to the backend. | `"5s"` | no +`resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no +`partition_traces_by_id` | `bool` | Whether to include the trace ID as the message key in trace messages sent to Kafka. | `"false"` | no +`partition_metrics_by_resource_attributes` | `bool` | Whether to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to Kafka. | `"false"` | no + +If `topic` is not set, different topics will be used for different telemetry signals: + +* Metrics will be sent to an `otlp_metrics` topic. +* Traces will be sent to an `otlp_spans` topic. +* Logs will be sent to an `otlp_logs` topic. + +If topic is set, the same topic will be used for all telemetry signals - metrics, logs, and traces. + +When `topic_from_attribute` is set, it will take precedence over `topic`. + +The `encoding` argument determines how to encode messages sent to Kafka. +`encoding` must be one of the following strings: +* Encodings which work for traces, logs, and metrics: + * `"otlp_proto"`: Encode messages as OTLP protobuf. + * `"otlp_json"`: Encode messages as OTLP JSON. +* Encodings which work only for traces: + * `"jaeger_proto"`: The payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID. + * `"jaeger_json"`: The payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID. + * `"zipkin_proto"`: The payload is serialized to Zipkin v2 proto Span. + * `"zipkin_json"`: The payload is serialized to Zipkin v2 JSON Span. +* Encodings which work only for logs: + * `"raw"`: If the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. + +`partition_traces_by_id` does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. + +## Blocks + +The following blocks are supported inside the definition of `otelcol.exporter.kafka`: + +Hierarchy | Block | Description | Required +-------------------------------- | ------------------- | --------------------------------------------------------------------------- | -------- +authentication | [authentication][] | Configures authentication for connecting to Kafka brokers. | no +authentication > plaintext | [plaintext][] | Authenticates against Kafka brokers with plaintext. | no +authentication > sasl | [sasl][] | Authenticates against Kafka brokers with SASL. | no +authentication > sasl > aws_msk | [aws_msk][] | Additional SASL parameters when using AWS_MSK_IAM. | no +authentication > tls | [tls][] | Configures TLS for connecting to the Kafka brokers. | no +authentication > kerberos | [kerberos][] | Authenticates against Kafka brokers with Kerberos. | no +metadata | [metadata][] | Configures how to retrieve metadata from Kafka brokers. | no +metadata > retry | [retry][] | Configures how to retry metadata retrieval. | no +retry_on_failure | [retry_on_failure][] | Configures retry mechanism for failed requests. | no +queue | [queue][] | Configures batching of data before sending. | no +producer | [producer][] | Kafka producer configuration, | no +debug_metrics | [debug_metrics][] | Configures the metrics which this component generates to monitor its state. | no + +The `>` symbol indicates deeper levels of nesting. +For example, `authentication > tls` refers to a `tls` block defined inside an `authentication` block. + +[authentication]: #authentication-block +[plaintext]: #plaintext-block +[sasl]: #sasl-block +[aws_msk]: #aws_msk-block +[tls]: #tls-block +[kerberos]: #kerberos-block +[metadata]: #metadata-block +[retry]: #retry-block +[retry_on_failure]: #retry_on_failure-block +[queue]: #queue-block +[producer]: #producer-block +[debug_metrics]: #debug_metrics-block + +### authentication block + +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication.md" source="alloy" version="" >}} + +### plaintext block + +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-plaintext.md" source="alloy" version="" >}} + +### sasl block + +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl.md" source="alloy" version="" >}} + +### aws_msk block + +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl-aws_msk.md" source="alloy" version="" >}} + +### tls block + +The `tls` block configures TLS settings used for connecting to the Kafka +brokers. If the `tls` block isn't provided, TLS won't be used for +communication. + +{{< docs/shared lookup="reference/components/otelcol-tls-client-block.md" source="alloy" version="" >}} + +### kerberos block + +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-kerberos.md" source="alloy" version="" >}} + +### metadata block + +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata.md" source="alloy" version="" >}} + +### retry block + +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata-retry.md" source="alloy" version="" >}} + +### retry_on_failure block + +The `retry_on_failure` block configures how failed requests to Kafka are retried. + +{{< docs/shared lookup="reference/components/otelcol-retry-block.md" source="alloy" version="" >}} + +### queue block + +The `queue` block configures an in-memory buffer of batches before data is sent to the gRPC server. + +{{< docs/shared lookup="reference/components/otelcol-queue-block.md" source="alloy" version="" >}} + +### producer block + +The `producer` block configures how to retry retrieving metadata when retrieval fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_message_bytes` | `number` | The maximum permitted size of a message in bytes. | `1000000` | no +`required_acks` | `number` | Controls when a message is regarded as transmitted. | `1` | no +`compression` | `string` | Time to wait between retries. | `"none"` | no +`flush_max_messages` | `number` | Time to wait between retries. | `0` | no + +Refer to the [sarama documentation][RequiredAcks] for more information on `required_acks`. + +`compression` could be set to either `none`, `gzip`, `snappy`, `lz4`, or `zstd`. +Refer to the [Sarama documentation][CompressionCodec] for more information. + +[RequiredAcks]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#RequiredAcks +[CompressionCodec]: https://pkg.go.dev/github.com/IBM/sarama@v1.43.2#CompressionCodec + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +--------|--------------------|----------------------------------------------------------------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics, logs, or traces). + +## Component health + +`otelcol.exporter.kafka` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.exporter.kafka` does not expose any component-specific debug +information. + +## Example + +This example forwards telemetry data through a batch processor before finally sending it to Kafka: + +```alloy +otelcol.receiver.otlp "default" { + http {} + grpc {} + + output { + metrics = [otelcol.processor.batch.default.input] + logs = [otelcol.processor.batch.default.input] + traces = [otelcol.processor.batch.default.input] + } +} + +otelcol.processor.batch "default" { + output { + metrics = [otelcol.exporter.kafka.default.input] + logs = [otelcol.exporter.kafka.default.input] + traces = [otelcol.exporter.kafka.default.input] + } +} + +otelcol.exporter.kafka "default" { + brokers = ["localhost:9092"] + protocol_version = "2.0.0" +} +``` + + + +## Compatible components + +`otelcol.exporter.kafka` has exports that can be consumed by the following components: + +- Components that consume [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + \ No newline at end of file diff --git a/docs/sources/reference/components/otelcol.receiver.kafka.md b/docs/sources/reference/components/otelcol.receiver.kafka.md index e0664d1ced..ba71f33030 100644 --- a/docs/sources/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/reference/components/otelcol.receiver.kafka.md @@ -40,7 +40,7 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `brokers` | `array(string)` | Kafka brokers to connect to. | | yes `protocol_version` | `string` | Kafka protocol version to use. | | yes -`topic` | `string` | Kafka topic to read from. | | no +`topic` | `string` | Kafka topic to read from. | _See below_ | no `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no `client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no @@ -118,56 +118,19 @@ The `>` symbol indicates deeper levels of nesting. For example, ### authentication block -The `authentication` block holds the definition of different authentication -mechanisms to use when connecting to Kafka brokers. It doesn't support any -arguments and is configured fully through inner blocks. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication.md" source="alloy" version="" >}} ### plaintext block -The `plaintext` block configures `PLAIN` authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for `PLAIN` authentication. | | yes -`password` | `secret` | Password to use for `PLAIN` authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-plaintext.md" source="alloy" version="" >}} ### sasl block -The `sasl` block configures SASL authentication against Kafka brokers. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`username` | `string` | Username to use for SASL authentication. | | yes -`password` | `secret` | Password to use for SASL authentication. | | yes -`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes -`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no - -The `mechanism` argument can be set to one of the following strings: - -* `"PLAIN"` -* `"AWS_MSK_IAM"` -* `"SCRAM-SHA-256"` -* `"SCRAM-SHA-512"` - -When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided. - -The `version` argument can be set to either `0` or `1`. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl.md" source="alloy" version="" >}} ### aws_msk block -The `aws_msk` block configures extra parameters for SASL authentication when -using the `AWS_MSK_IAM` mechanism. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`region` | `string` | AWS region the MSK cluster is based in. | | yes -`broker_addr` | `string` | MSK address to connect to for authentication. | | yes +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-sasl-aws_msk.md" source="alloy" version="" >}} ### tls block @@ -179,58 +142,15 @@ communication. ### kerberos block -The `kerberos` block configures Kerberos authentication against the Kafka -broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`service_name` | `string` | Kerberos service name. | | no -`realm` | `string` | Kerberos realm. | | no -`use_keytab` | `string` | Enables using keytab instead of password. | | no -`username` | `string` | Kerberos username to authenticate as. | | yes -`password` | `secret` | Kerberos password to authenticate with. | | no -`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no -`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no - -When `use_keytab` is `false`, the `password` argument is required. When -`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is -used for authentication instead. At most one of `password` or `keytab_file` -must be provided. +{{< docs/shared lookup="reference/components/otelcol-kafka-authentication-kerberos.md" source="alloy" version="" >}} ### metadata block -The `metadata` block configures how to retrieve and store metadata from the -Kafka broker. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no - -If the `include_all_topics` argument is `true`, `otelcol.receiver.kafka` -maintains a full set of metadata for all topics rather than the minimal set -that has been necessary so far. Including the full set of metadata is more -convenient for users but can consume a substantial amount of memory if you have -many topics and partitions. - -Retrieving metadata may fail if the Kafka broker is starting up at the same -time as the `otelcol.receiver.kafka` component. The [`retry` child -block][retry] can be provided to customize retry behavior. +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata.md" source="alloy" version="" >}} ### retry block -The `retry` block configures how to retry retrieving metadata when retrieval -fails. - -The following arguments are supported: - -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no -`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no +{{< docs/shared lookup="reference/components/otelcol-kafka-metadata-retry.md" source="alloy" version="" >}} ### autocommit block diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md new file mode 100644 index 0000000000..ff0e469c63 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-kerberos.md @@ -0,0 +1,24 @@ +--- +description: Shared content, otelcol Kafka Kerberos authentication +headless: true +--- + +The `kerberos` block configures Kerberos authentication against the Kafka +broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`service_name` | `string` | Kerberos service name. | | no +`realm` | `string` | Kerberos realm. | | no +`use_keytab` | `string` | Enables using keytab instead of password. | | no +`username` | `string` | Kerberos username to authenticate as. | | yes +`password` | `secret` | Kerberos password to authenticate with. | | no +`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no +`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no + +When `use_keytab` is `false`, the `password` argument is required. When +`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is +used for authentication instead. At most one of `password` or `keytab_file` +must be provided. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md new file mode 100644 index 0000000000..bf6d51962a --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-plaintext.md @@ -0,0 +1,13 @@ +--- +description: Shared content, otelcol Kafka plaintext authentication +headless: true +--- + +The `plaintext` block configures plain text authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for plain text authentication. | | yes +`password` | `secret` | Password to use for plain text authentication. | | yes diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md new file mode 100644 index 0000000000..5061b9d5cb --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl-aws_msk.md @@ -0,0 +1,14 @@ +--- +description: Shared content, otelcol Kafka SASL AWS_MSK authentication +headless: true +--- + +The `aws_msk` block configures extra parameters for SASL authentication when +using the `AWS_MSK_IAM` mechanism. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`region` | `string` | AWS region the MSK cluster is based in. | | yes +`broker_addr` | `string` | MSK address to connect to for authentication. | | yes diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md new file mode 100644 index 0000000000..a15d2b59f5 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication-sasl.md @@ -0,0 +1,26 @@ +--- +description: Shared content, otelcol Kafka SASL authentication +headless: true +--- + +The `sasl` block configures SASL authentication against Kafka brokers. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`username` | `string` | Username to use for SASL authentication. | | yes +`password` | `secret` | Password to use for SASL authentication. | | yes +`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes +`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no + +The `mechanism` argument can be set to one of the following strings: + +* `"PLAIN"` +* `"AWS_MSK_IAM"` +* `"SCRAM-SHA-256"` +* `"SCRAM-SHA-512"` + +When `mechanism` is set to `"AWS_MSK_IAM"`, the `aws_msk` child block must also be provided. + +The `version` argument can be set to either `0` or `1`. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-authentication.md b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md new file mode 100644 index 0000000000..f0df3d7078 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-authentication.md @@ -0,0 +1,8 @@ +--- +description: Shared content, otelcol Kafka authentication +headless: true +--- + +The `authentication` block holds the definition of different authentication +mechanisms to use when connecting to Kafka brokers. It doesn't support any +arguments and is configured fully through inner blocks. diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md new file mode 100644 index 0000000000..f0ca24f486 --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata-retry.md @@ -0,0 +1,14 @@ +--- +description: Shared content, otelcol Kafka metadata retry +headless: true +--- + +The `retry` block configures how to retry retrieving metadata when retrieval +fails. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no +`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no diff --git a/docs/sources/shared/reference/components/otelcol-kafka-metadata.md b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md new file mode 100644 index 0000000000..09de4bc85c --- /dev/null +++ b/docs/sources/shared/reference/components/otelcol-kafka-metadata.md @@ -0,0 +1,22 @@ +--- +description: Shared content, otelcol Kafka metadata +headless: true +--- + +The `metadata` block configures how to retrieve and store metadata from the +Kafka broker. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no + +If the `include_all_topics` argument is `true`, +a full set of metadata for all topics is maintained rather than the minimal set +that has been necessary so far. Including the full set of metadata is more +convenient for users but can consume a substantial amount of memory if you have +many topics and partitions. + +Retrieving metadata may fail if the Kafka broker is starting up at the same +time as the Alloy component. The `retry` child block can be provided to customize retry behavior. diff --git a/internal/component/all/all.go b/internal/component/all/all.go index f852e62341..760153c3fd 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -67,6 +67,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/connector/servicegraph" // Import otelcol.connector.servicegraph _ "github.com/grafana/alloy/internal/component/otelcol/connector/spanlogs" // Import otelcol.connector.spanlogs _ "github.com/grafana/alloy/internal/component/otelcol/connector/spanmetrics" // Import otelcol.connector.spanmetrics + _ "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" // Import otelcol.exporter.kafka _ "github.com/grafana/alloy/internal/component/otelcol/exporter/loadbalancing" // Import otelcol.exporter.loadbalancing _ "github.com/grafana/alloy/internal/component/otelcol/exporter/logging" // Import otelcol.exporter.logging _ "github.com/grafana/alloy/internal/component/otelcol/exporter/loki" // Import otelcol.exporter.loki diff --git a/internal/component/otelcol/config_kafka.go b/internal/component/otelcol/config_kafka.go new file mode 100644 index 0000000000..1178e85e4c --- /dev/null +++ b/internal/component/otelcol/config_kafka.go @@ -0,0 +1,155 @@ +package otelcol + +import ( + "time" + + "github.com/grafana/alloy/syntax/alloytypes" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" +) + +// KafkaAuthenticationArguments configures how to authenticate to the Kafka broker. +type KafkaAuthenticationArguments struct { + Plaintext *KafkaPlaintextArguments `alloy:"plaintext,block,optional"` + SASL *KafkaSASLArguments `alloy:"sasl,block,optional"` + TLS *TLSClientArguments `alloy:"tls,block,optional"` + Kerberos *KafkaKerberosArguments `alloy:"kerberos,block,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaAuthenticationArguments) Convert() map[string]interface{} { + auth := make(map[string]interface{}) + + if args.Plaintext != nil { + conv := args.Plaintext.Convert() + auth["plain_text"] = &conv + } + if args.SASL != nil { + conv := args.SASL.Convert() + auth["sasl"] = &conv + } + if args.TLS != nil { + auth["tls"] = args.TLS.Convert() + } + if args.Kerberos != nil { + conv := args.Kerberos.Convert() + auth["kerberos"] = &conv + } + + return auth +} + +// KafkaPlaintextArguments configures plaintext authentication against the Kafka +// broker. +type KafkaPlaintextArguments struct { + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr"` +} + +// Convert converts args into the upstream type. +func (args KafkaPlaintextArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "username": args.Username, + "password": string(args.Password), + } +} + +// KafkaSASLArguments configures SASL authentication against the Kafka broker. +type KafkaSASLArguments struct { + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr"` + Mechanism string `alloy:"mechanism,attr"` + Version int `alloy:"version,attr,optional"` + AWSMSK KafkaAWSMSKArguments `alloy:"aws_msk,block,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaSASLArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "username": args.Username, + "password": string(args.Password), + "mechanism": args.Mechanism, + "version": args.Version, + "aws_msk": args.AWSMSK.Convert(), + } +} + +// KafkaAWSMSKArguments exposes additional SASL authentication measures required to +// use the AWS_MSK_IAM mechanism. +type KafkaAWSMSKArguments struct { + Region string `alloy:"region,attr"` + BrokerAddr string `alloy:"broker_addr,attr"` +} + +// Convert converts args into the upstream type. +func (args KafkaAWSMSKArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "region": args.Region, + "broker_addr": args.BrokerAddr, + } +} + +// KafkaKerberosArguments configures Kerberos authentication against the Kafka +// broker. +type KafkaKerberosArguments struct { + ServiceName string `alloy:"service_name,attr,optional"` + Realm string `alloy:"realm,attr,optional"` + UseKeyTab bool `alloy:"use_keytab,attr,optional"` + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr,optional"` + ConfigPath string `alloy:"config_file,attr,optional"` + KeyTabPath string `alloy:"keytab_file,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaKerberosArguments) Convert() map[string]interface{} { + return map[string]interface{}{ + "service_name": args.ServiceName, + "realm": args.Realm, + "use_keytab": args.UseKeyTab, + "username": args.Username, + "password": string(args.Password), + "config_file": args.ConfigPath, + "keytab_file": args.KeyTabPath, + } +} + +// KafkaMetadataArguments configures how the Alloy component will +// retrieve metadata from the Kafka broker. +type KafkaMetadataArguments struct { + IncludeAllTopics bool `alloy:"include_all_topics,attr,optional"` + Retry KafkaMetadataRetryArguments `alloy:"retry,block,optional"` +} + +func (args *KafkaMetadataArguments) SetToDefault() { + *args = KafkaMetadataArguments{ + IncludeAllTopics: true, + Retry: KafkaMetadataRetryArguments{ + MaxRetries: 3, + Backoff: 250 * time.Millisecond, + }, + } +} + +// Convert converts args into the upstream type. +func (args KafkaMetadataArguments) Convert() kafkaexporter.Metadata { + return kafkaexporter.Metadata{ + Full: args.IncludeAllTopics, + Retry: args.Retry.Convert(), + } +} + +// KafkaMetadataRetryArguments configures how to retry retrieving metadata from the +// Kafka broker. Retrying is useful to avoid race conditions when the Kafka +// broker is starting at the same time as the Alloy component. +type KafkaMetadataRetryArguments struct { + MaxRetries int `alloy:"max_retries,attr,optional"` + Backoff time.Duration `alloy:"backoff,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args KafkaMetadataRetryArguments) Convert() kafkaexporter.MetadataRetry { + return kafkaexporter.MetadataRetry{ + Max: args.MaxRetries, + Backoff: args.Backoff, + } +} diff --git a/internal/component/otelcol/exporter/kafka/kafka.go b/internal/component/otelcol/exporter/kafka/kafka.go new file mode 100644 index 0000000000..01260863dd --- /dev/null +++ b/internal/component/otelcol/exporter/kafka/kafka.go @@ -0,0 +1,178 @@ +// Package kafka provides an otelcol.exporter.kafka component. +package kafka + +import ( + "time" + + "github.com/IBM/sarama" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/exporter" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/syntax" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + otelcomponent "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper" + otelextension "go.opentelemetry.io/collector/extension" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.exporter.kafka", + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := kafkaexporter.NewFactory() + return exporter.New(opts, fact, args.(Arguments), exporter.TypeAll) + }, + }) +} + +// Arguments configures the otelcol.exporter.kafka component. +type Arguments struct { + ProtocolVersion string `alloy:"protocol_version,attr"` + Brokers []string `alloy:"brokers,attr,optional"` + ResolveCanonicalBootstrapServersOnly bool `alloy:"resolve_canonical_bootstrap_servers_only,attr,optional"` + ClientID string `alloy:"client_id,attr,optional"` + Topic string `alloy:"topic,attr,optional"` + TopicFromAttribute string `alloy:"topic_from_attribute,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + PartitionTracesByID bool `alloy:"partition_traces_by_id,attr,optional"` + PartitionMetricsByResourceAttributes bool `alloy:"partition_metrics_by_resource_attributes,attr,optional"` + Timeout time.Duration `alloy:"timeout,attr,optional"` + + Authentication otelcol.KafkaAuthenticationArguments `alloy:"authentication,block,optional"` + Metadata otelcol.KafkaMetadataArguments `alloy:"metadata,block,optional"` + Retry otelcol.RetryArguments `alloy:"retry_on_failure,block,optional"` + Queue otelcol.QueueArguments `alloy:"sending_queue,block,optional"` + Producer Producer `alloy:"producer,block,optional"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` +} + +// Producer defines configuration for producer +type Producer struct { + // Maximum message bytes the producer will accept to produce. + MaxMessageBytes int `alloy:"max_message_bytes,attr,optional"` + + // RequiredAcks Number of acknowledgements required to assume that a message has been sent. + // https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#RequiredAcks + // The options are: + // 0 -> NoResponse. doesn't send any response + // 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default ) + // -1 -> WaitForAll. waits for all in-sync replicas to commit before responding. + RequiredAcks int `alloy:"required_acks,attr,optional"` + + // Compression Codec used to produce messages + // https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#CompressionCodec + // The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd' + Compression string `alloy:"compression,attr,optional"` + + // The maximum number of messages the producer will send in a single + // broker request. Defaults to 0 for unlimited. Similar to + // `queue.buffering.max.messages` in the JVM producer. + FlushMaxMessages int `alloy:"flush_max_messages,attr,optional"` +} + +// Convert converts args into the upstream type. +func (args Producer) Convert() kafkaexporter.Producer { + return kafkaexporter.Producer{ + MaxMessageBytes: args.MaxMessageBytes, + RequiredAcks: sarama.RequiredAcks(args.RequiredAcks), + Compression: args.Compression, + FlushMaxMessages: args.FlushMaxMessages, + } +} + +var ( + _ syntax.Validator = (*Arguments)(nil) + _ syntax.Defaulter = (*Arguments)(nil) + _ exporter.Arguments = (*Arguments)(nil) +) + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{ + Encoding: "otlp_proto", + Brokers: []string{"localhost:9092"}, + ClientID: "sarama", + Timeout: 5 * time.Second, + Metadata: otelcol.KafkaMetadataArguments{ + IncludeAllTopics: true, + Retry: otelcol.KafkaMetadataRetryArguments{ + MaxRetries: 3, + Backoff: 250 * time.Millisecond, + }, + }, + Producer: Producer{ + MaxMessageBytes: 1000000, + RequiredAcks: 1, + Compression: "none", + FlushMaxMessages: 0, + }, + } + args.Retry.SetToDefault() + args.Queue.SetToDefault() + args.DebugMetrics.SetToDefault() +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + otelCfg, err := args.Convert() + if err != nil { + return err + } + kafkaCfg := otelCfg.(*kafkaexporter.Config) + return kafkaCfg.Validate() +} + +// Convert implements exporter.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + input := make(map[string]interface{}) + input["auth"] = args.Authentication.Convert() + + var result kafkaexporter.Config + err := mapstructure.Decode(input, &result) + if err != nil { + return nil, err + } + + result.Brokers = args.Brokers + result.ResolveCanonicalBootstrapServersOnly = args.ResolveCanonicalBootstrapServersOnly + result.ProtocolVersion = args.ProtocolVersion + result.ClientID = args.ClientID + result.Topic = args.Topic + result.TopicFromAttribute = args.TopicFromAttribute + result.Encoding = args.Encoding + result.PartitionTracesByID = args.PartitionTracesByID + result.PartitionMetricsByResourceAttributes = args.PartitionMetricsByResourceAttributes + result.TimeoutSettings = exporterhelper.TimeoutSettings{ + Timeout: args.Timeout, + } + result.Metadata = args.Metadata.Convert() + result.BackOffConfig = *args.Retry.Convert() + result.QueueSettings = *args.Queue.Convert() + result.Producer = args.Producer.Convert() + + return &result, nil +} + +// Extensions implements exporter.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements exporter.Arguments. +func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// DebugMetricsConfig implements exporter.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/exporter/kafka/kafka_test.go b/internal/component/otelcol/exporter/kafka/kafka_test.go new file mode 100644 index 0000000000..85d2154a46 --- /dev/null +++ b/internal/component/otelcol/exporter/kafka/kafka_test.go @@ -0,0 +1,240 @@ +package kafka_test + +import ( + "testing" + "time" + + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" + "github.com/grafana/alloy/syntax" + "github.com/mitchellh/mapstructure" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/stretchr/testify/require" +) + +func TestArguments_UnmarshalAlloy(t *testing.T) { + tests := []struct { + testName string + cfg string + expected map[string]interface{} + }{ + { + testName: "Defaults", + cfg: ` + protocol_version = "2.0.0" + `, + expected: map[string]interface{}{ + "brokers": []string{"localhost:9092"}, + "protocol_version": "2.0.0", + "resolve_canonical_bootstrap_servers_only": false, + "client_id": "sarama", + "topic": "", + "topic_from_attribute": "", + "encoding": "otlp_proto", + "partition_traces_by_id": false, + "partition_metrics_by_resource_attributes": false, + "timeout": 5 * time.Second, + "authentication": map[string]interface{}{}, + + "metadata": map[string]interface{}{ + "full": true, + "retry": map[string]interface{}{ + "max": 3, + "backoff": 250 * time.Millisecond, + }, + }, + "retry_on_failure": map[string]interface{}{ + "enabled": true, + "initial_interval": 5 * time.Second, + "randomization_factor": 0.5, + "multiplier": 1.5, + "max_interval": 30 * time.Second, + "max_elapsed_time": 5 * time.Minute, + }, + "sending_queue": map[string]interface{}{ + "enabled": true, + "num_consumers": 10, + "queue_size": 1000, + }, + "producer": map[string]interface{}{ + "max_message_bytes": 1000000, + "required_acks": 1, + "compression": "none", + "flush_max_messages": 0, + }, + }, + }, + { + testName: "Explicit", + cfg: ` + protocol_version = "2.0.0" + brokers = ["redpanda:123"] + resolve_canonical_bootstrap_servers_only = true + client_id = "my-client" + topic = "my-topic" + topic_from_attribute = "my-attr" + encoding = "otlp_json" + partition_traces_by_id = true + partition_metrics_by_resource_attributes = true + timeout = "12s" + + authentication { + plaintext { + username = "user" + password = "pass" + } + } + + metadata { + include_all_topics = false + retry { + max_retries = 5 + backoff = "511ms" + } + } + + retry_on_failure { + enabled = true + initial_interval = "10s" + randomization_factor = 0.1 + multiplier = 2.0 + max_interval = "61s" + max_elapsed_time = "11m" + } + + sending_queue { + enabled = true + num_consumers = 11 + queue_size = 1001 + } + + producer { + max_message_bytes = 2000001 + required_acks = 0 + compression = "gzip" + flush_max_messages = 101 + } + `, + expected: map[string]interface{}{ + "brokers": []string{"redpanda:123"}, + "protocol_version": "2.0.0", + "resolve_canonical_bootstrap_servers_only": true, + "client_id": "my-client", + "topic": "my-topic", + "topic_from_attribute": "my-attr", + "encoding": "otlp_json", + "partition_traces_by_id": true, + "partition_metrics_by_resource_attributes": true, + "timeout": 12 * time.Second, + "auth": map[string]interface{}{ + "plain_text": map[string]interface{}{ + "username": "user", + "password": "pass", + }, + }, + + "metadata": map[string]interface{}{ + "full": false, + "retry": map[string]interface{}{ + "max": 5, + "backoff": 511 * time.Millisecond, + }, + }, + "retry_on_failure": map[string]interface{}{ + "enabled": true, + "initial_interval": 10 * time.Second, + "randomization_factor": 0.1, + "multiplier": 2.0, + "max_interval": 61 * time.Second, + "max_elapsed_time": 11 * time.Minute, + }, + "sending_queue": map[string]interface{}{ + "enabled": true, + "num_consumers": 11, + "queue_size": 1001, + }, + "producer": map[string]interface{}{ + "max_message_bytes": 2000001, + "required_acks": 0, + "compression": "gzip", + "flush_max_messages": 101, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var expected kafkaexporter.Config + err := mapstructure.Decode(tc.expected, &expected) + require.NoError(t, err) + + var args kafka.Arguments + err = syntax.Unmarshal([]byte(tc.cfg), &args) + require.NoError(t, err) + + actualPtr, err := args.Convert() + require.NoError(t, err) + + actual := actualPtr.(*kafkaexporter.Config) + + require.Equal(t, expected, *actual) + }) + } +} + +func TestDebugMetricsConfig(t *testing.T) { + tests := []struct { + testName string + agentCfg string + expected otelcolCfg.DebugMetricsArguments + }{ + { + testName: "default", + agentCfg: ` + protocol_version = "2.0.0" + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: true, + Level: otelcolCfg.LevelDetailed, + }, + }, + { + testName: "explicit_false", + agentCfg: ` + protocol_version = "2.0.0" + debug_metrics { + disable_high_cardinality_metrics = false + } + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: false, + Level: otelcolCfg.LevelDetailed, + }, + }, + { + testName: "explicit_true", + agentCfg: ` + protocol_version = "2.0.0" + debug_metrics { + disable_high_cardinality_metrics = true + } + `, + expected: otelcolCfg.DebugMetricsArguments{ + DisableHighCardinalityMetrics: true, + Level: otelcolCfg.LevelDetailed, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args kafka.Arguments + require.NoError(t, syntax.Unmarshal([]byte(tc.agentCfg), &args)) + _, err := args.Convert() + require.NoError(t, err) + + require.Equal(t, tc.expected, args.DebugMetricsConfig()) + }) + } +} diff --git a/internal/component/otelcol/receiver/kafka/kafka.go b/internal/component/otelcol/receiver/kafka/kafka.go index b6ff5917e8..186f381f4c 100644 --- a/internal/component/otelcol/receiver/kafka/kafka.go +++ b/internal/component/otelcol/receiver/kafka/kafka.go @@ -11,9 +11,7 @@ import ( otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/receiver" "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/syntax/alloytypes" "github.com/mitchellh/mapstructure" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" otelcomponent "go.opentelemetry.io/collector/component" otelextension "go.opentelemetry.io/collector/extension" @@ -44,11 +42,11 @@ type Arguments struct { ResolveCanonicalBootstrapServersOnly bool `alloy:"resolve_canonical_bootstrap_servers_only,attr,optional"` - Authentication AuthenticationArguments `alloy:"authentication,block,optional"` - Metadata MetadataArguments `alloy:"metadata,block,optional"` - AutoCommit AutoCommitArguments `alloy:"autocommit,block,optional"` - MessageMarking MessageMarkingArguments `alloy:"message_marking,block,optional"` - HeaderExtraction HeaderExtraction `alloy:"header_extraction,block,optional"` + Authentication otelcol.KafkaAuthenticationArguments `alloy:"authentication,block,optional"` + Metadata otelcol.KafkaMetadataArguments `alloy:"metadata,block,optional"` + AutoCommit AutoCommitArguments `alloy:"autocommit,block,optional"` + MessageMarking MessageMarkingArguments `alloy:"message_marking,block,optional"` + HeaderExtraction HeaderExtraction `alloy:"header_extraction,block,optional"` // DebugMetrics configures component internal metrics. Optional. DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` @@ -142,153 +140,6 @@ func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { return args.Output } -// AuthenticationArguments configures how to authenticate to the Kafka broker. -type AuthenticationArguments struct { - Plaintext *PlaintextArguments `alloy:"plaintext,block,optional"` - SASL *SASLArguments `alloy:"sasl,block,optional"` - TLS *otelcol.TLSClientArguments `alloy:"tls,block,optional"` - Kerberos *KerberosArguments `alloy:"kerberos,block,optional"` -} - -// Convert converts args into the upstream type. -func (args AuthenticationArguments) Convert() map[string]interface{} { - auth := make(map[string]interface{}) - - if args.Plaintext != nil { - conv := args.Plaintext.Convert() - auth["plain_text"] = &conv - } - if args.SASL != nil { - conv := args.SASL.Convert() - auth["sasl"] = &conv - } - if args.TLS != nil { - auth["tls"] = args.TLS.Convert() - } - if args.Kerberos != nil { - conv := args.Kerberos.Convert() - auth["kerberos"] = &conv - } - - return auth -} - -// PlaintextArguments configures plaintext authentication against the Kafka -// broker. -type PlaintextArguments struct { - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr"` -} - -// Convert converts args into the upstream type. -func (args PlaintextArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "username": args.Username, - "password": string(args.Password), - } -} - -// SASLArguments configures SASL authentication against the Kafka broker. -type SASLArguments struct { - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr"` - Mechanism string `alloy:"mechanism,attr"` - Version int `alloy:"version,attr,optional"` - AWSMSK AWSMSKArguments `alloy:"aws_msk,block,optional"` -} - -// Convert converts args into the upstream type. -func (args SASLArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "username": args.Username, - "password": string(args.Password), - "mechanism": args.Mechanism, - "version": args.Version, - "aws_msk": args.AWSMSK.Convert(), - } -} - -// AWSMSKArguments exposes additional SASL authentication measures required to -// use the AWS_MSK_IAM mechanism. -type AWSMSKArguments struct { - Region string `alloy:"region,attr"` - BrokerAddr string `alloy:"broker_addr,attr"` -} - -// Convert converts args into the upstream type. -func (args AWSMSKArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "region": args.Region, - "broker_addr": args.BrokerAddr, - } -} - -// KerberosArguments configures Kerberos authentication against the Kafka -// broker. -type KerberosArguments struct { - ServiceName string `alloy:"service_name,attr,optional"` - Realm string `alloy:"realm,attr,optional"` - UseKeyTab bool `alloy:"use_keytab,attr,optional"` - Username string `alloy:"username,attr"` - Password alloytypes.Secret `alloy:"password,attr,optional"` - ConfigPath string `alloy:"config_file,attr,optional"` - KeyTabPath string `alloy:"keytab_file,attr,optional"` -} - -// Convert converts args into the upstream type. -func (args KerberosArguments) Convert() map[string]interface{} { - return map[string]interface{}{ - "service_name": args.ServiceName, - "realm": args.Realm, - "use_keytab": args.UseKeyTab, - "username": args.Username, - "password": string(args.Password), - "config_file": args.ConfigPath, - "keytab_file": args.KeyTabPath, - } -} - -// MetadataArguments configures how the otelcol.receiver.kafka component will -// retrieve metadata from the Kafka broker. -type MetadataArguments struct { - IncludeAllTopics bool `alloy:"include_all_topics,attr,optional"` - Retry MetadataRetryArguments `alloy:"retry,block,optional"` -} - -func (args *MetadataArguments) SetToDefault() { - *args = MetadataArguments{ - IncludeAllTopics: true, - Retry: MetadataRetryArguments{ - MaxRetries: 3, - Backoff: 250 * time.Millisecond, - }, - } -} - -// Convert converts args into the upstream type. -func (args MetadataArguments) Convert() kafkaexporter.Metadata { - return kafkaexporter.Metadata{ - Full: args.IncludeAllTopics, - Retry: args.Retry.Convert(), - } -} - -// MetadataRetryArguments configures how to retry retrieving metadata from the -// Kafka broker. Retrying is useful to avoid race conditions when the Kafka -// broker is starting at the same time as the otelcol.receiver.kafka component. -type MetadataRetryArguments struct { - MaxRetries int `alloy:"max_retries,attr,optional"` - Backoff time.Duration `alloy:"backoff,attr,optional"` -} - -// Convert converts args into the upstream type. -func (args MetadataRetryArguments) Convert() kafkaexporter.MetadataRetry { - return kafkaexporter.MetadataRetry{ - Max: args.MaxRetries, - Backoff: args.Backoff, - } -} - // AutoCommitArguments configures how to automatically commit updated topic // offsets back to the Kafka broker. type AutoCommitArguments struct { diff --git a/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go new file mode 100644 index 0000000000..0c735ee45e --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_kafkaexporter.go @@ -0,0 +1,72 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component/otelcol/exporter/kafka" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, kafkaExporterConverter{}) +} + +type kafkaExporterConverter struct{} + +func (kafkaExporterConverter) Factory() component.Factory { return kafkaexporter.NewFactory() } + +func (kafkaExporterConverter) InputComponentName() string { + return "otelcol.exporter.kafka" +} + +func (kafkaExporterConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toKafkaExporter(cfg.(*kafkaexporter.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "kafka"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toKafkaExporter(cfg *kafkaexporter.Config) *kafka.Arguments { + return &kafka.Arguments{ + Brokers: cfg.Brokers, + ProtocolVersion: cfg.ProtocolVersion, + ResolveCanonicalBootstrapServersOnly: cfg.ResolveCanonicalBootstrapServersOnly, + ClientID: cfg.ClientID, + Topic: cfg.Topic, + TopicFromAttribute: cfg.TopicFromAttribute, + Encoding: cfg.Encoding, + PartitionTracesByID: cfg.PartitionTracesByID, + PartitionMetricsByResourceAttributes: cfg.PartitionMetricsByResourceAttributes, + Timeout: cfg.Timeout, + + Authentication: toKafkaAuthentication(encodeMapstruct(cfg.Authentication)), + Metadata: toKafkaMetadata(cfg.Metadata), + Retry: toRetryArguments(cfg.BackOffConfig), + Queue: toQueueArguments(cfg.QueueSettings), + Producer: toKafkaProducer(cfg.Producer), + + DebugMetrics: common.DefaultValue[kafka.Arguments]().DebugMetrics, + } +} + +func toKafkaProducer(cfg kafkaexporter.Producer) kafka.Producer { + return kafka.Producer{ + MaxMessageBytes: cfg.MaxMessageBytes, + Compression: cfg.Compression, + RequiredAcks: int(cfg.RequiredAcks), + FlushMaxMessages: cfg.FlushMaxMessages, + } +} diff --git a/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go b/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go index 7888c04f4a..654227f81c 100644 --- a/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go +++ b/internal/converter/internal/otelcolconvert/converter_kafkareceiver.go @@ -76,8 +76,8 @@ func toKafkaReceiver(state *State, id component.InstanceID, cfg *kafkareceiver.C } } -func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments { - return kafka.AuthenticationArguments{ +func toKafkaAuthentication(cfg map[string]any) otelcol.KafkaAuthenticationArguments { + return otelcol.KafkaAuthenticationArguments{ Plaintext: toKafkaPlaintext(encodeMapstruct(cfg["plain_text"])), SASL: toKafkaSASL(encodeMapstruct(cfg["sasl"])), TLS: toKafkaTLSClientArguments(encodeMapstruct(cfg["tls"])), @@ -85,23 +85,23 @@ func toKafkaAuthentication(cfg map[string]any) kafka.AuthenticationArguments { } } -func toKafkaPlaintext(cfg map[string]any) *kafka.PlaintextArguments { +func toKafkaPlaintext(cfg map[string]any) *otelcol.KafkaPlaintextArguments { if cfg == nil { return nil } - return &kafka.PlaintextArguments{ + return &otelcol.KafkaPlaintextArguments{ Username: cfg["username"].(string), Password: alloytypes.Secret(cfg["password"].(string)), } } -func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments { +func toKafkaSASL(cfg map[string]any) *otelcol.KafkaSASLArguments { if cfg == nil { return nil } - return &kafka.SASLArguments{ + return &otelcol.KafkaSASLArguments{ Username: cfg["username"].(string), Password: alloytypes.Secret(cfg["password"].(string)), Mechanism: cfg["mechanism"].(string), @@ -110,12 +110,12 @@ func toKafkaSASL(cfg map[string]any) *kafka.SASLArguments { } } -func toKafkaAWSMSK(cfg map[string]any) kafka.AWSMSKArguments { +func toKafkaAWSMSK(cfg map[string]any) otelcol.KafkaAWSMSKArguments { if cfg == nil { - return kafka.AWSMSKArguments{} + return otelcol.KafkaAWSMSKArguments{} } - return kafka.AWSMSKArguments{ + return otelcol.KafkaAWSMSKArguments{ Region: cfg["region"].(string), BrokerAddr: cfg["broker_addr"].(string), } @@ -136,12 +136,12 @@ func toKafkaTLSClientArguments(cfg map[string]any) *otelcol.TLSClientArguments { return &res } -func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments { +func toKafkaKerberos(cfg map[string]any) *otelcol.KafkaKerberosArguments { if cfg == nil { return nil } - return &kafka.KerberosArguments{ + return &otelcol.KafkaKerberosArguments{ ServiceName: cfg["service_name"].(string), Realm: cfg["realm"].(string), UseKeyTab: cfg["use_keytab"].(bool), @@ -152,15 +152,15 @@ func toKafkaKerberos(cfg map[string]any) *kafka.KerberosArguments { } } -func toKafkaMetadata(cfg kafkaexporter.Metadata) kafka.MetadataArguments { - return kafka.MetadataArguments{ +func toKafkaMetadata(cfg kafkaexporter.Metadata) otelcol.KafkaMetadataArguments { + return otelcol.KafkaMetadataArguments{ IncludeAllTopics: cfg.Full, Retry: toKafkaRetry(cfg.Retry), } } -func toKafkaRetry(cfg kafkaexporter.MetadataRetry) kafka.MetadataRetryArguments { - return kafka.MetadataRetryArguments{ +func toKafkaRetry(cfg kafkaexporter.MetadataRetry) otelcol.KafkaMetadataRetryArguments { + return otelcol.KafkaMetadataRetryArguments{ MaxRetries: cfg.Max, Backoff: cfg.Backoff, } diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy index b98eabaf2f..bdb45cfc7b 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy @@ -11,8 +11,8 @@ otelcol.receiver.kafka "default" { sasl { username = "fakeusername" password = "fakepassword" - mechanism = "somemechanism" - version = 5 + mechanism = "AWS_MSK_IAM" + version = 1 aws_msk { region = "us-east-1" @@ -33,14 +33,56 @@ otelcol.receiver.kafka "default" { } output { - metrics = [otelcol.exporter.otlp.default.input] - logs = [otelcol.exporter.otlp.default.input] - traces = [otelcol.exporter.otlp.default.input] + metrics = [otelcol.exporter.kafka.default.input] + logs = [otelcol.exporter.kafka.default.input] + traces = [otelcol.exporter.kafka.default.input] } } -otelcol.exporter.otlp "default" { - client { - endpoint = "database:4317" +otelcol.exporter.kafka "default" { + protocol_version = "2.0.0" + brokers = ["redpanda:9092"] + resolve_canonical_bootstrap_servers_only = true + client_id = "otelcol" + topic_from_attribute = "my_topic" + encoding = "otlp_json" + partition_traces_by_id = true + partition_metrics_by_resource_attributes = true + timeout = "11s" + + authentication { + plaintext { + username = "fakeusername" + password = "fakepassword" + } + + sasl { + username = "fakeusername" + password = "fakepassword" + mechanism = "SCRAM-SHA-256" + version = 1 + + aws_msk { + region = "us-east-1" + broker_addr = "broker:9092" + } + } + + tls { + insecure = true + } + + kerberos { + service_name = "someservice" + realm = "myrealm" + username = "fakeusername" + password = "fakepassword" + } + } + + producer { + max_message_bytes = 1000001 + compression = "snappy" + flush_max_messages = 11 } } diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.yaml b/internal/converter/internal/otelcolconvert/testdata/kafka.yaml index fb8455bbf5..d5f563c3de 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.yaml +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.yaml @@ -9,8 +9,8 @@ receivers: sasl: username: fakeusername password: fakepassword - mechanism: somemechanism - version: 5 + mechanism: AWS_MSK_IAM + version: 1 aws_msk: region: us-east-1 broker_addr: broker:9092 @@ -24,20 +24,52 @@ receivers: exporters: - otlp: - endpoint: database:4317 + kafka: + brokers: redpanda:9092 + protocol_version: 2.0.0 + resolve_canonical_bootstrap_servers_only: true + client_id: otelcol + topic_from_attribute: my_topic + partition_traces_by_id: true + partition_metrics_by_resource_attributes: true + timeout: 11s + encoding: otlp_json + auth: + plain_text: + username: fakeusername + password: fakepassword + sasl: + username: fakeusername + password: fakepassword + mechanism: SCRAM-SHA-256 + version: 1 + aws_msk: + region: us-east-1 + broker_addr: broker:9092 + tls: + insecure: true + kerberos: + username: fakeusername + password: fakepassword + service_name: someservice + realm: myrealm + producer: + max_message_bytes: 1000001 + compression: snappy + required_acks: 0 + flush_max_messages: 11 service: pipelines: metrics: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] logs: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] traces: receivers: [kafka] processors: [] - exporters: [otlp] + exporters: [kafka] diff --git a/internal/tools/docs_generator/docs_updated_test.go b/internal/tools/docs_generator/docs_updated_test.go index 65358f908a..2a67fd3dda 100644 --- a/internal/tools/docs_generator/docs_updated_test.go +++ b/internal/tools/docs_generator/docs_updated_test.go @@ -64,12 +64,12 @@ func runForGenerator(t *testing.T, g generator.DocsGenerator) { } actual, err := g.Read() - require.NoError(t, err, "failed to read existing generated docs for %q, try running 'go generate ./docs'", g.Name()) + require.NoError(t, err, "failed to read existing generated docs for %q, try running 'go generate ./internal/tools/docs_generator/", g.Name()) require.Contains( t, actual, strings.TrimSpace(generated), - "outdated docs detected when running %q, try updating with 'go generate ./docs'", + "outdated docs detected when running %q, try updating with 'go generate ./internal/tools/docs_generator/", g.Name(), ) }