From 78273abc90bff866cac2ff193872fb0ca1c0d3ee Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 2 Jul 2025 15:18:14 -0600 Subject: [PATCH 1/3] pipeline: input: kafka: document AWS MSK IAM Signed-off-by: Eduardo Silva --- pipeline/inputs/kafka.md | 84 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 7 deletions(-) diff --git a/pipeline/inputs/kafka.md b/pipeline/inputs/kafka.md index 123bc9105..db7575822 100644 --- a/pipeline/inputs/kafka.md +++ b/pipeline/inputs/kafka.md @@ -1,6 +1,8 @@ -# Kafka +# Kafka Consumer -The _Kafka_ input plugin subscribes to one or more Kafka topics to collect messages from an [Apache Kafka](https://kafka.apache.org/) service. +The _Kafka_ input plugin enables Fluent Bit to consume messages directly from one or more [Apache Kafka](https://kafka.apache.org/) topics. By subscribing to specified topics, this plugin efficiently collects and forwards Kafka messages for further processing within your Fluent Bit pipeline. + +Starting with version 4.0.4, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access. This plugin uses the official [librdkafka C library](https://github.com/edenhill/librdkafka) as a built-in dependency. @@ -18,6 +20,15 @@ This plugin uses the official [librdkafka C library](https://github.com/edenhill | `rdkafka.{property}` | `{property}` can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | _none_ | | `threaded` | Indicates whether to run this input in its own [thread](../../administration/multithreading.md#inputs). | `false` | + +## Configuration parameters for AWS MSK clusters based on IAM authentication + +| Property | Description | Type | Required | +|----------|-------------|------|----------| +| `aws_msk_iam` | Enable AWS MSK IAM authentication | Boolean | No (default: false) | +| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction | String | Yes (when `aws_msk_iam` is true) | + + ## Get started To subscribe to or collect messages from Apache Kafka, run the plugin from the command line or through the configuration file as shown below. @@ -30,7 +41,7 @@ The Kafka plugin can read parameters through the `-p` argument (property): $ fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic ``` -### Configuration file +### Configuration file (recommended) In your main configuration file append the following: @@ -44,7 +55,7 @@ pipeline: brokers: 192.168.1.3:9092 topics: some-topic poll_ms: 100 - + outputs: - name: stdout match: '*' @@ -83,13 +94,13 @@ pipeline: topics: fb-source poll_ms: 100 format: json - + filters: - name: lua match: '*' script: kafka.lua call: modify_kafka_message - + outputs: - name: kafka brokers: kafka-broker:9092 @@ -128,4 +139,63 @@ Since the payload will be in JSON format, the plugin is configured to parse the Every message received is then processed with `kafka.lua` and sent back to the `fb-sink` topic of the same broker. -The example can be executed locally with `make start` in the `examples/kafka_filter` directory (`docker/compose` is used). \ No newline at end of file +The example can be executed locally with `make start` in the `examples/kafka_filter` directory (`docker/compose` is used). + +## AWS MSK IAM Authentication + +*Available since Fluent Bit v4.0.4* + +Fluent Bit supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM. This allows you to securely connect to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control. + +### Prerequisites + +**Build Requirements** + +If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support: + +- The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment. + +**Runtime Requirements** +- **Network Access:** Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup). +- **AWS Credentials:** Provide credentials using any supported AWS method: + - IAM roles (recommended for EC2, ECS, or EKS) + - Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) + - AWS credentials file (`~/.aws/credentials`) + - Instance metadata service (IMDS) + + Note these credentials are discovery by default when `aws_msk_iam` flag is enabled. + +- **IAM Permissions:** The credentials must allow access to the target MSK cluster (see example policy below). + +### Configuration Parameters + +| Property | Description | Type | Required | +|---------------------------|-----------------------------------------------------|---------|-------------------------------| +| `aws_msk_iam` | Enable AWS MSK IAM authentication | Boolean | No (default: false) | +| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction | String | Yes (if `aws_msk_iam` is true)| + +### Example AWS IAM Policy + +> **Note:** IAM policies and permissions can be complex and may vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, please consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security. + +The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": [ + "kafka-cluster:*", + "kafka-cluster:DescribeCluster", + "kafka-cluster:ReadData", + "kafka-cluster:DescribeTopic", + "kafka-cluster:Connect" + ], + "Resource": "*" + } + ] +} +``` \ No newline at end of file From 78fb7edaaeb1ad1adc700a0d7071d21851c9f5e8 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 2 Jul 2025 15:28:19 -0600 Subject: [PATCH 2/3] pipeline: input: kafka: adjust details for AWS MSK IAM Signed-off-by: Eduardo Silva --- pipeline/inputs/kafka.md | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pipeline/inputs/kafka.md b/pipeline/inputs/kafka.md index db7575822..27ed258f5 100644 --- a/pipeline/inputs/kafka.md +++ b/pipeline/inputs/kafka.md @@ -20,15 +20,6 @@ This plugin uses the official [librdkafka C library](https://github.com/edenhill | `rdkafka.{property}` | `{property}` can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | _none_ | | `threaded` | Indicates whether to run this input in its own [thread](../../administration/multithreading.md#inputs). | `false` | - -## Configuration parameters for AWS MSK clusters based on IAM authentication - -| Property | Description | Type | Required | -|----------|-------------|------|----------| -| `aws_msk_iam` | Enable AWS MSK IAM authentication | Boolean | No (default: false) | -| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction | String | Yes (when `aws_msk_iam` is true) | - - ## Get started To subscribe to or collect messages from Apache Kafka, run the plugin from the command line or through the configuration file as shown below. @@ -174,6 +165,23 @@ If you are compiling Fluent Bit from source, ensure the following requirements a | `aws_msk_iam` | Enable AWS MSK IAM authentication | Boolean | No (default: false) | | `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction | String | Yes (if `aws_msk_iam` is true)| + +### Configuration Example + +```yaml +pipeline: + inputs: + - name: kafka + brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098 + topics: my-topic + aws_msk_iam: true + aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3 + + outputs: + - name: stdout + match: '*' +``` + ### Example AWS IAM Policy > **Note:** IAM policies and permissions can be complex and may vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, please consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security. From 3529804973e897c5fc13fced948f39854d8132e5 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 2 Jul 2025 15:28:35 -0600 Subject: [PATCH 3/3] pipeline: output: kafka: document AWS MSK IAM support Signed-off-by: Eduardo Silva --- pipeline/outputs/kafka.md | 108 ++++++++++++++++++++++++++++++++------ 1 file changed, 93 insertions(+), 15 deletions(-) diff --git a/pipeline/outputs/kafka.md b/pipeline/outputs/kafka.md index 4599b62da..9e5399f31 100644 --- a/pipeline/outputs/kafka.md +++ b/pipeline/outputs/kafka.md @@ -1,24 +1,26 @@ -# Kafka +# Kafka Producer -Kafka output plugin allows to ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin use the official [librdkafka C library](https://github.com/edenhill/librdkafka) \(built-in dependency\) +Kafka output plugin, producer, allows to ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin use the official [librdkafka C library](https://github.com/edenhill/librdkafka) \(built-in dependency\) + +Starting with version 4.0.4, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access. ## Configuration Parameters | Key | Description | default | | :--- | :--- | :--- | -| format | Specify data format, options available: json, msgpack, raw. | json | -| message\_key | Optional key to store the message | | -| message\_key\_field | If set, the value of Message\_Key\_Field in the record will indicate the message key. If not set nor found in the record, Message\_Key will be used \(if set\). | | -| timestamp\_key | Set the key to store the record timestamp | @timestamp | -| timestamp\_format | Specify timestamp format, should be 'double', '[iso8601](https://en.wikipedia.org/wiki/ISO_8601)' (seconds precision) or 'iso8601_ns' (fractional seconds precision) | double | -| brokers | Single or multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092. | | -| topics | Single entry or list of topics separated by comma \(,\) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic\_Key will be used. | fluent-bit | -| topic\_key | If multiple Topics exists, the value of Topic\_Key in the record will indicate the topic to use. E.g: if Topic\_Key is _router_ and the record is {"key1": 123, "router": "route\_2"}, Fluent Bit will use topic _route\_2_. Note that if the value of Topic\_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used. | | -| dynamic\_topic | adds unknown topics \(found in Topic\_Key\) to Topics. So in Topics only a default topic needs to be configured | Off | -| queue\_full\_retries | Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The `queue_full_retries` option set the number of local retries to enqueue the data. The default value is 10 times, the interval between each retry is 1 second. Setting the `queue_full_retries` value to `0` set's an unlimited number of retries. | 10 | -| rdkafka.{property} | `{property}` can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | | -| raw\_log\_key | When using the raw format and set, the value of raw\_log\_key in the record will be send to kafka as the payload. | | -| workers | The number of [workers](../../administration/multithreading.md#outputs) to perform flush operations for this output. | `0` | +| 'format' | Specify data format, options available: json, msgpack, raw. | json | +| 'message_key' | Optional key to store the message | | +| 'message_key_field' | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | | +| 'timestamp_key' | Set the key to store the record timestamp | @timestamp | +| 'timestamp_format' | Specify timestamp format, should be 'double', '[iso8601](https://en.wikipedia.org/wiki/ISO_8601)' (seconds precision) or 'iso8601_ns' (fractional seconds precision) | double | +| 'brokers' | Single or multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092. | | +| 'topics' | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic_Key will be used. | fluent-bit | +| 'topic_key' | If multiple Topics exists, the value of Topic_Key in the record will indicate the topic to use. E.g: if Topic_Key is _router_ and the record is {"key1": 123, "router": "route_2"}, Fluent Bit will use topic _route_2_. Note that if the value of Topic_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used. | | +| 'dynamic_topic' | adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured | Off | +| 'queue_full_retries' | Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The `queue_full_retries` option set the number of local retries to enqueue the data. The default value is 10 times, the interval between each retry is 1 second. Setting the `queue_full_retries` value to `0` set's an unlimited number of retries. | 10 | +| 'rdkafka.{property}' | '{property}' can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | | +| 'raw_log_key' | When using the raw format and set, the value of raw_log_key in the record will be send to kafka as the payload. | | +| 'workers' | The number of [workers](../../administration/multithreading.md#outputs) to perform flush operations for this output. | `0` | > Setting `rdkafka.log.connection.close` to `false` and `rdkafka.request.required.acks` to 1 are examples of recommended settings of librdfkafka properties. @@ -49,6 +51,8 @@ In your main configuration file append the following _Input_ & _Output_ sections Topics test ``` + + ### Avro Support Fluent-bit comes with support for avro encoding for the out_kafka plugin. @@ -141,3 +145,77 @@ key, and the _payloadkey_ value as the payload. Raw_Log_Key payloadkey Message_Key_Field msgkey ``` + +## AWS MSK IAM Authentication + +*Available since Fluent Bit v4.0.4* + +Fluent Bit supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This allows you to securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control. + +### Prerequisites + +**Build Requirements** +If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support: + +- The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment. + +**Runtime Requirements** +- **Network Access:** Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup). +- **AWS Credentials:** Provide credentials using any supported AWS method: + - IAM roles (recommended for EC2, ECS, or EKS) + - Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) + - AWS credentials file (`~/.aws/credentials`) + - Instance metadata service (IMDS) + + Note these credentials are discovery by default when `aws_msk_iam` flag is enabled. + +- **IAM Permissions:** The credentials must allow access to the target MSK cluster (see example policy below). + +### Configuration Parameters + +| Property | Description | Type | Required | +|---------------------------|-----------------------------------------------------|---------|-------------------------------| +| `aws_msk_iam` | Enable AWS MSK IAM authentication | Boolean | No (default: false) | +| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction | String | Yes (if `aws_msk_iam` is true)| + +### Configuration Example + +```yaml +pipeline: + inputs: + - name: random + + outputs: + - name: kafka + match: "*" + brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098 + topics: my-topic + aws_msk_iam: true + aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3 +``` + +### Example AWS IAM Policy + +> **Note:** IAM policies and permissions can be complex and may vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, please consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security. + +The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": [ + "kafka-cluster:*", + "kafka-cluster:DescribeCluster", + "kafka-cluster:ReadData", + "kafka-cluster:DescribeTopic", + "kafka-cluster:Connect" + ], + "Resource": "*" + } + ] +} +```