Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
6e1d53c
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
09aa64a
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
1f9b74a
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
ca35c22
fix: initialize AWS provider in sync mode for MSK IAM
kalavt Nov 26, 2025
4c19da4
fix(aws): force credential refresh in provider refresh functions
kalavt Nov 27, 2025
9aa3ebc
Merge branch 'fluent:master' into fix/aws-msk-iam-optimization
kalavt Nov 27, 2025
35bcf13
fix(aws): Minor leak on empty_payload_hex when canonical request buil…
kalavt Nov 27, 2025
d45dab6
aws: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
8434f7d
fix(aws): AWS MSK IAM authentication failures caused by stale credent…
kalavt Nov 27, 2025
05ecb6d
aws: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
862a4ec
fix(aws): AWS MSK IAM authentication failures on low traffic and Miss…
kalavt Nov 28, 2025
6dde002
fix(aws): Fix potential overflow in md_lifetime_ms on 32‑bit time_t
kalavt Nov 28, 2025
f343778
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
b34bff6
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
3bbbde2
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
8892291
Merge branch 'fluent:master' into fix/aws-msk-iam-optimization
kalavt Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism &&
strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {

/*
* Enable SASL queue for background callbacks BEFORE registering OAuth callback.
* This allows librdkafka to handle OAuth token refresh in a background thread,
* which is essential for idle connections where rd_kafka_poll() is not called.
*/
rd_kafka_conf_enable_sasl_queue(ctx->conf, 1);

ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
ctx->conf,
ctx->aws_msk_iam_cluster_arn,
Expand Down Expand Up @@ -243,6 +250,27 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
return NULL;
}

#ifdef FLB_HAVE_AWS_MSK_IAM
/*
* Enable SASL background callbacks for MSK IAM to ensure OAuth tokens
* are refreshed automatically even on idle connections.
*/
if (ctx->msk_iam) {
rd_kafka_error_t *error;
error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk);
if (error) {
flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s. "
"OAuth tokens may not refresh on idle connections.",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
}
else {
flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, "
"OAuth tokens will be refreshed automatically in background thread");
}
}
#endif

#ifdef FLB_HAVE_AVRO_ENCODER
/* Config AVRO */
tmp = flb_output_get_property("schema_str", ins);
Expand Down
1 change: 1 addition & 0 deletions src/aws/flb_aws_credentials_ec2.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) {
int ret = -1;

flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider");

if (try_lock_provider(provider)) {
ret = get_creds_ec2(implementation);
unlock_provider(provider);
Expand Down
3 changes: 1 addition & 2 deletions src/aws/flb_aws_credentials_profile.c
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,7 @@ static int get_shared_credentials(char* credentials_path,

if (flb_read_file(credentials_path, &buf, &size) < 0) {
if (errno == ENOENT) {
AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist",
credentials_path);
AWS_CREDS_DEBUG("Shared credentials file %s does not exist", credentials_path);
} else {
flb_errno();
AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s",
Expand Down
3 changes: 2 additions & 1 deletion src/aws/flb_aws_credentials_sts.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ int refresh_fn_sts(struct flb_aws_provider *provider) {
struct flb_aws_provider_sts *implementation = provider->implementation;

flb_debug("[aws_credentials] Refresh called on the STS provider");

if (try_lock_provider(provider)) {
ret = sts_assume_role_request(implementation->sts_client,
&implementation->creds, implementation->uri,
Expand Down Expand Up @@ -480,6 +480,7 @@ int refresh_fn_eks(struct flb_aws_provider *provider) {
struct flb_aws_provider_eks *implementation = provider->implementation;

flb_debug("[aws_credentials] Refresh called on the EKS provider");

if (try_lock_provider(provider)) {
ret = assume_with_web_identity(implementation);
unlock_provider(provider);
Expand Down
Loading