-
Notifications
You must be signed in to change notification settings - Fork 478
/
otelcol.receiver.kafka.md
358 lines (262 loc) · 14.1 KB
/
otelcol.receiver.kafka.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
---
aliases:
- /docs/grafana-cloud/agent/flow/reference/components/otelcol.receiver.kafka/
- /docs/grafana-cloud/monitor-infrastructure/agent/flow/reference/components/otelcol.receiver.kafka/
- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/otelcol.receiver.kafka/
- /docs/grafana-cloud/send-data/agent/flow/reference/components/otelcol.receiver.kafka/
canonical: https://grafana.com/docs/agent/latest/flow/reference/components/otelcol.receiver.kafka/
description: Learn about otelcol.receiver.kafka
title: otelcol.receiver.kafka
---
# otelcol.receiver.kafka
`otelcol.receiver.kafka` accepts telemetry data from a Kafka broker and
forwards it to other `otelcol.*` components.
> **NOTE**: `otelcol.receiver.kafka` is a wrapper over the upstream
> OpenTelemetry Collector `kafka` receiver from the `otelcol-contrib`
> distribution. Bug reports or feature requests will be redirected to the
> upstream repository, if necessary.
Multiple `otelcol.receiver.kafka` components can be specified by giving them
different labels.
## Usage
```river
otelcol.receiver.kafka "LABEL" {
brokers = ["BROKER_ADDR"]
protocol_version = "PROTOCOL_VERSION"
output {
metrics = [...]
logs = [...]
traces = [...]
}
}
```
## Arguments
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`brokers` | `array(string)` | Kafka brokers to connect to. | | yes
`protocol_version` | `string` | Kafka protocol version to use. | | yes
`topic` | `string` | Kafka topic to read from. | | no
`encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no
`group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no
`client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no
`initial_offset` | `string` | Initial offset to use if no offset was previously committed. | `"latest"` | no
`resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no
If `topic` is not set, different topics will be used for different telemetry signals:
* Metrics will be received from an `otlp_metrics` topic.
* Traces will be received from an `otlp_spans` topic.
* Logs will be received from an `otlp_logs` topic.
If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block.
For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces.
If it contains only metrics, then `otelcol.receiver.kafka` should be configured to output only metrics.
The `encoding` argument determines how to decode messages read from Kafka.
`encoding` must be one of the following strings:
* `"otlp_proto"`: Decode messages as OTLP protobuf.
* `"jaeger_proto"`: Decode messages as a single Jaeger protobuf span.
* `"jaeger_json"`: Decode messages as a single Jaeger JSON span.
* `"zipkin_proto"`: Decode messages as a list of Zipkin protobuf spans.
* `"zipkin_json"`: Decode messages as a list of Zipkin JSON spans.
* `"zipkin_thrift"`: Decode messages as a list of Zipkin Thrift spans.
* `"raw"`: Copy the log message bytes into the body of a log record.
* `"text"`: Decode the log message as text and insert it into the body of a log record.
By default, UTF-8 is used to decode. A different encoding can be chosen by using `text_<ENCODING>`. For example, `text_utf-8` or `text_shift_jis`.
* `"json"`: Decode the JSON payload and insert it into the body of a log record.
* `"azure_resource_logs"`: The payload is converted from Azure Resource Logs format to an OTLP log.
`"otlp_proto"` must be used to read all telemetry types from Kafka; other
encodings are signal-specific.
`initial_offset` must be either `"latest"` or `"earliest"`.
## Blocks
The following blocks are supported inside the definition of
`otelcol.receiver.kafka`:
Hierarchy | Block | Description | Required
--------- | ----- | ----------- | --------
authentication | [authentication][] | Configures authentication for connecting to Kafka brokers. | no
authentication > plaintext | [plaintext][] | Authenticates against Kafka brokers with plaintext. | no
authentication > sasl | [sasl][] | Authenticates against Kafka brokers with SASL. | no
authentication > sasl > aws_msk | [aws_msk][] | Additional SASL parameters when using AWS_MSK_IAM. | no
authentication > tls | [tls][] | Configures TLS for connecting to the Kafka brokers. | no
authentication > kerberos | [kerberos][] | Authenticates against Kafka brokers with Kerberos. | no
metadata | [metadata][] | Configures how to retrieve metadata from Kafka brokers. | no
metadata > retry | [retry][] | Configures how to retry metadata retrieval. | no
autocommit | [autocommit][] | Configures how to automatically commit updated topic offsets to back to the Kafka brokers. | no
message_marking | [message_marking][] | Configures when Kafka messages are marked as read. | no
header_extraction | [header_extraction][] | Extract headers from Kafka records. | no
debug_metrics | [debug_metrics][] | Configures the metrics which this component generates to monitor its state. | no
output | [output][] | Configures where to send received telemetry data. | yes
The `>` symbol indicates deeper levels of nesting. For example,
`authentication > tls` refers to a `tls` block defined inside an
`authentication` block.
[authentication]: #authentication-block
[plaintext]: #plaintext-block
[sasl]: #sasl-block
[aws_msk]: #aws_msk-block
[tls]: #tls-block
[kerberos]: #kerberos-block
[metadata]: #metadata-block
[retry]: #retry-block
[autocommit]: #autocommit-block
[message_marking]: #message_marking-block
[header_extraction]: #header_extraction-block
[debug_metrics]: #debug_metrics-block
[output]: #output-block
### authentication block
The `authentication` block holds the definition of different authentication
mechanisms to use when connecting to Kafka brokers. It doesn't support any
arguments and is configured fully through inner blocks.
### plaintext block
The `plaintext` block configures `PLAIN` authentication against Kafka brokers.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`username` | `string` | Username to use for `PLAIN` authentication. | | yes
`password` | `secret` | Password to use for `PLAIN` authentication. | | yes
### sasl block
The `sasl` block configures SASL authentication against Kafka brokers.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`username` | `string` | Username to use for SASL authentication. | | yes
`password` | `secret` | Password to use for SASL authentication. | | yes
`mechanism` | `string` | SASL mechanism to use when authenticating. | | yes
`version` | `number` | Version of the SASL Protocol to use when authenticating. | `0` | no
The `mechanism` argument can be set to one of the following strings:
* `"PLAIN"`
* `"AWS_MSK_IAM"`
* `"SCRAM-SHA-256"`
* `"SCRAM-SHA-512"`
When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block][aws_msk] must also be provided.
The `version` argument can be set to either `0` or `1`.
### aws_msk block
The `aws_msk` block configures extra parameters for SASL authentication when
using the `AWS_MSK_IAM` mechanism.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`region` | `string` | AWS region the MSK cluster is based in. | | yes
`broker_addr` | `string` | MSK address to connect to for authentication. | | yes
### tls block
The `tls` block configures TLS settings used for connecting to the Kafka
brokers. If the `tls` block isn't provided, TLS won't be used for
communication.
{{< docs/shared lookup="flow/reference/components/otelcol-tls-config-block.md" source="agent" version="<AGENT_VERSION>" >}}
### kerberos block
The `kerberos` block configures Kerberos authentication against the Kafka
broker.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`service_name` | `string` | Kerberos service name. | | no
`realm` | `string` | Kerberos realm. | | no
`use_keytab` | `string` | Enables using keytab instead of password. | | no
`username` | `string` | Kerberos username to authenticate as. | | yes
`password` | `secret` | Kerberos password to authenticate with. | | no
`config_file` | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`). | | no
`keytab_file` | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). | | no
When `use_keytab` is `false`, the `password` argument is required. When
`use_keytab` is `true`, the file pointed to by the `keytab_file` argument is
used for authentication instead. At most one of `password` or `keytab_file`
must be provided.
### metadata block
The `metadata` block configures how to retrieve and store metadata from the
Kafka broker.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true` | no
If the `include_all_topics` argument is `true`, `otelcol.receiver.kafka`
maintains a full set of metadata for all topics rather than the minimal set
that has been necessary so far. Including the full set of metadata is more
convenient for users but can consume a substantial amount of memory if you have
many topics and partitions.
Retrieving metadata may fail if the Kafka broker is starting up at the same
time as the `otelcol.receiver.kafka` component. The [`retry` child
block][retry] can be provided to customize retry behavior.
### retry block
The `retry` block configures how to retry retrieving metadata when retrieval
fails.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`max_retries` | `number` | How many times to reattempt retrieving metadata. | `3` | no
`backoff` | `duration` | Time to wait between retries. | `"250ms"` | no
### autocommit block
The `autocommit` block configures how to automatically commit updated topic
offsets back to the Kafka brokers.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`enable` | `bool` | Enable autocommitting updated topic offsets. | `true` | no
`interval` | `duration` | How frequently to autocommit. | `"1s"` | no
### message_marking block
The `message_marking` block configures when Kafka messages are marked as read.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`after_execution` | `bool` | Mark messages after forwarding telemetry data to other components. | `false` | no
`include_unsuccessful` | `bool` | Whether failed forwards should be marked as read. | `false` | no
By default, a Kafka message is marked as read immediately after it is retrieved
from the Kafka broker. If the `after_execution` argument is true, messages are
only read after the telemetry data is forwarded to components specified in [the
`output` block][output].
When `after_execution` is true, messages are only marked as read when they are
decoded successfully and components where the data was forwarded did not return
an error. If the `include_unsuccessful` argument is true, messages are marked
as read even if decoding or forwarding failed. Setting `include_unsuccessful`
has no effect if `after_execution` is `false`.
> **WARNING**: Setting `after_execution` to `true` and `include_unsuccessful`
> to `false` can block the entire Kafka partition if message processing returns
> a permanent error, such as failing to decode.
### header_extraction block
The `header_extraction` block configures how to extract headers from Kafka records.
The following arguments are supported:
Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`extract_headers` | `bool` | Enables attaching header fields to resource attributes. | `false` | no
`headers` | `list(string)` | A list of headers to extract from the Kafka record. | `[]` | no
Regular expressions are not allowed in the `headers` argument. Only exact matching will be performed.
### debug_metrics block
{{< docs/shared lookup="flow/reference/components/otelcol-debug-metrics-block.md" source="agent" version="<AGENT_VERSION>" >}}
### output block
{{< docs/shared lookup="flow/reference/components/output-block.md" source="agent" version="<AGENT_VERSION>" >}}
## Exported fields
`otelcol.receiver.kafka` does not export any fields.
## Component health
`otelcol.receiver.kafka` is only reported as unhealthy if given an invalid
configuration.
## Debug information
`otelcol.receiver.kafka` does not expose any component-specific debug
information.
## Example
This example forwards read telemetry data through a batch processor before
finally sending it to an OTLP-capable endpoint:
```river
otelcol.receiver.kafka "default" {
brokers = ["localhost:9092"]
protocol_version = "2.0.0"
output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}
otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}
otelcol.exporter.otlp "default" {
client {
endpoint = env("OTLP_ENDPOINT")
}
}
```
<!-- START GENERATED COMPATIBLE COMPONENTS -->
## Compatible components
`otelcol.receiver.kafka` can accept arguments from the following components:
- Components that export [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-exporters)
{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}
<!-- END GENERATED COMPATIBLE COMPONENTS -->