diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index b4bb9be6acf..dca5bf958c9 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -214,6 +214,13 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + /* + * Enable SASL queue for background callbacks BEFORE registering OAuth callback. + * This allows librdkafka to handle OAuth token refresh in a background thread, + * which is essential for idle connections where rd_kafka_poll() is not called. + */ + rd_kafka_conf_enable_sasl_queue(ctx->conf, 1); + ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, ctx->conf, ctx->aws_msk_iam_cluster_arn, @@ -243,6 +250,27 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, return NULL; } +#ifdef FLB_HAVE_AWS_MSK_IAM + /* + * Enable SASL background callbacks for MSK IAM to ensure OAuth tokens + * are refreshed automatically even on idle connections. + */ + if (ctx->msk_iam) { + rd_kafka_error_t *error; + error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); + if (error) { + flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s. " + "OAuth tokens may not refresh on idle connections.", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } + else { + flb_plg_info(ctx->ins, "MSK IAM: SASL background callbacks enabled, " + "OAuth tokens will be refreshed automatically in background thread"); + } + } +#endif + #ifdef FLB_HAVE_AVRO_ENCODER /* Config AVRO */ tmp = flb_output_get_property("schema_str", ins); diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index 2722e26d223..9aa1444f1fb 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -130,6 +130,7 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) { int ret = -1; flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); + if (try_lock_provider(provider)) { ret = get_creds_ec2(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_credentials_profile.c b/src/aws/flb_aws_credentials_profile.c index 48cb9299572..7ad7099ff45 100644 --- a/src/aws/flb_aws_credentials_profile.c +++ b/src/aws/flb_aws_credentials_profile.c @@ -663,8 +663,7 @@ static int get_shared_credentials(char* credentials_path, if (flb_read_file(credentials_path, &buf, &size) < 0) { if (errno == ENOENT) { - AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist", - credentials_path); + AWS_CREDS_DEBUG("Shared credentials file %s does not exist", credentials_path); } else { flb_errno(); AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s", diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index 554fac20353..155a41d3998 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -175,7 +175,7 @@ int refresh_fn_sts(struct flb_aws_provider *provider) { struct flb_aws_provider_sts *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the STS provider"); - + if (try_lock_provider(provider)) { ret = sts_assume_role_request(implementation->sts_client, &implementation->creds, implementation->uri, @@ -480,6 +480,7 @@ int refresh_fn_eks(struct flb_aws_provider *provider) { struct flb_aws_provider_eks *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the EKS provider"); + if (try_lock_provider(provider)) { ret = assume_with_web_identity(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index cf8af7d0cc8..c90c3c468d2 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -37,14 +38,20 @@ #include #include -/* Lightweight config - NO persistent AWS provider */ +/* + * OAuth token lifetime of 5 minutes (industry standard). + * Matches AWS Go SDK and Kafka Connect implementations. + */ +#define MSK_IAM_TOKEN_LIFETIME_SECONDS 300 + struct flb_aws_msk_iam { - struct flb_config *flb_config; /* For creating AWS provider on-demand */ + struct flb_config *flb_config; flb_sds_t region; flb_sds_t cluster_arn; + struct flb_tls *cred_tls; + struct flb_aws_provider *provider; }; -/* Utility functions - same as before */ static int to_encode(char c) { if ((c >= '0' && c <= '9') || @@ -162,12 +169,11 @@ static char *extract_region(const char *arn) return out; } -/* Stateless payload generator - creates AWS provider on demand */ +/* Payload generator - builds MSK IAM authentication payload */ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, - const char *host) + const char *host, + struct flb_aws_credentials *creds) { - struct flb_aws_provider *temp_provider = NULL; - struct flb_aws_credentials *creds = NULL; flb_sds_t payload = NULL; int encode_result; char *p; @@ -205,46 +211,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, /* Validate inputs */ if (!config || !config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set or invalid"); return NULL; } if (!host || strlen(host) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: host is required"); - return NULL; - } - - flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", - host, config->region); - - /* Create AWS provider on-demand */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (!temp_provider) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider"); + flb_error("[aws_msk_iam] host is required"); return NULL; } - if (temp_provider->provider_vtable->init(temp_provider) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider"); - flb_aws_provider_destroy(temp_provider); - return NULL; - } - - /* Get credentials */ - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - if (!creds) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials"); - flb_aws_provider_destroy(temp_provider); - return NULL; - } - - if (!creds->access_key_id || !creds->secret_access_key) { - flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); - flb_aws_credentials_destroy(creds); - flb_aws_provider_destroy(temp_provider); + if (!creds || !creds->access_key_id || !creds->secret_access_key) { + flb_error("[aws_msk_iam] invalid or incomplete credentials"); return NULL; } @@ -269,19 +246,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: Encode the action parameter */ action_enc = uri_encode_params("kafka-cluster:Connect", 21); if (!action_enc) { goto error; } - /* Build canonical query string with ACTION parameter first (alphabetical order) */ + /* Build canonical query string */ query = flb_sds_create_size(8192); if (!query) { goto error; } - /* note: Action must be FIRST in alphabetical order */ query = flb_sds_printf(&query, "Action=%s&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=%s" "&X-Amz-Date=%s&X-Amz-Expires=900", @@ -290,27 +265,23 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Add session token if present (before SignedHeaders alphabetically) */ + /* Add session token if present */ if (creds->session_token && flb_sds_len(creds->session_token) > 0) { session_token_enc = uri_encode_params(creds->session_token, flb_sds_len(creds->session_token)); if (!session_token_enc) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to encode session token"); goto error; } tmp = flb_sds_printf(&query, "&X-Amz-Security-Token=%s", session_token_enc); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append session token to query"); goto error; } query = tmp; } - /* Add SignedHeaders LAST (alphabetically after Security-Token) */ tmp = flb_sds_printf(&query, "&X-Amz-SignedHeaders=host"); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append SignedHeaders"); goto error; } query = tmp; @@ -321,10 +292,8 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: MSK IAM canonical request format - use SHA256 of empty string, not UNSIGNED-PAYLOAD */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) "", 0, empty_payload_hash, sizeof(empty_payload_hash)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash empty payload"); goto error; } @@ -338,17 +307,15 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, query, host, empty_payload_hex); flb_sds_destroy(empty_payload_hex); - empty_payload_hex = NULL; /* Prevent double-free */ + empty_payload_hex = NULL; if (!canonical) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to build canonical request"); goto error; } - /* Hash canonical request immediately */ + /* Hash canonical request */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) canonical, flb_sds_len(canonical), sha256_buf, sizeof(sha256_buf)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash canonical request"); goto error; } @@ -384,34 +351,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, len = strlen(datestamp); if (hmac_sha256_sign(key_date, (unsigned char *) key, flb_sds_len(key), (unsigned char *) datestamp, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign date"); goto error; } - /* Clean up key immediately after use - prevent double-free */ flb_sds_destroy(key); key = NULL; len = strlen(config->region); if (hmac_sha256_sign(key_region, key_date, 32, (unsigned char *) config->region, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign region"); goto error; } if (hmac_sha256_sign(key_service, key_region, 32, (unsigned char *) "kafka-cluster", 13) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign service"); goto error; } if (hmac_sha256_sign(key_signing, key_service, 32, (unsigned char *) "aws4_request", 12) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create signing key"); goto error; } if (hmac_sha256_sign(sig, key_signing, 32, (unsigned char *) string_to_sign, flb_sds_len(string_to_sign)) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign request"); goto error; } @@ -420,101 +381,42 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Append signature to query */ tmp = flb_sds_printf(&query, "&X-Amz-Signature=%s", hexsig); if (!tmp) { goto error; } query = tmp; - /* Build the complete presigned URL */ + /* Build complete presigned URL */ presigned_url = flb_sds_create_size(16384); if (!presigned_url) { goto error; } - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); + presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s&User-Agent=fluent-bit-msk-iam", + host, query); if (!presigned_url) { goto error; } - /* Base64 URL encode the presigned URL */ + /* Base64 URL encode */ url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ + encoded_len = ((url_len + 2) / 3) * 4 + 1; payload = flb_sds_create_size(encoded_len); if (!payload) { goto error; } - encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, - (const unsigned char*) presigned_url, url_len); - if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); - goto error; - } - flb_sds_len_set(payload, actual_encoded_len); - - /* Convert to Base64 URL encoding (replace + with -, / with _, remove padding =) */ - p = payload; - while (*p) { - if (*p == '+') { - *p = '-'; - } - else if (*p == '/') { - *p = '_'; - } - p++; - } - - /* Remove padding */ - len = flb_sds_len(payload); - while (len > 0 && payload[len-1] == '=') { - len--; - } - flb_sds_len_set(payload, len); - payload[len] = '\0'; - - /* Build the complete presigned URL */ - flb_sds_destroy(presigned_url); - presigned_url = flb_sds_create_size(16384); - if (!presigned_url) { - goto error; - } - - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); - if (!presigned_url) { - goto error; - } - - /* Add User-Agent parameter to the signed URL (like Go implementation) */ - tmp = flb_sds_printf(&presigned_url, "&User-Agent=fluent-bit-msk-iam"); - if (!tmp) { - goto error; - } - presigned_url = tmp; - - /* Base64 URL encode the presigned URL (RawURLEncoding - no padding like Go) */ - url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ - - flb_sds_destroy(payload); - payload = flb_sds_create_size(encoded_len); - if (!payload) { - goto error; - } - encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, (const unsigned char *) presigned_url, url_len); if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); goto error; } - /* Update the SDS length to match actual encoded length */ flb_sds_len_set(payload, actual_encoded_len); - /* Convert to Base64 URL encoding AND remove padding (RawURLEncoding like Go) */ + /* Convert to Base64 URL encoding and remove padding */ p = payload; while (*p) { if (*p == '+') { @@ -526,7 +428,6 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, p++; } - /* Remove ALL padding (RawURLEncoding) */ final_len = flb_sds_len(payload); while (final_len > 0 && payload[final_len-1] == '=') { final_len--; @@ -534,7 +435,7 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, flb_sds_len_set(payload, final_len); payload[final_len] = '\0'; - /* Clean up before successful return */ + /* Clean up */ flb_sds_destroy(credential); flb_sds_destroy(credential_enc); flb_sds_destroy(canonical); @@ -547,65 +448,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } return payload; error: - /* Clean up everything - check for NULL to prevent double-free */ - if (credential) { - flb_sds_destroy(credential); - } - if (credential_enc) { - flb_sds_destroy(credential_enc); - } - if (canonical) { - flb_sds_destroy(canonical); - } - if (hexhash) { - flb_sds_destroy(hexhash); - } - if (string_to_sign) { - flb_sds_destroy(string_to_sign); - } - if (hexsig) { - flb_sds_destroy(hexsig); - } - if (query) { - flb_sds_destroy(query); - } - if (action_enc) { - flb_sds_destroy(action_enc); - } - if (presigned_url) { - flb_sds_destroy(presigned_url); - } - if (key) { /* Only destroy if not already destroyed */ - flb_sds_destroy(key); - } - if (payload) { - flb_sds_destroy(payload); - } - if (session_token_enc) { - flb_sds_destroy(session_token_enc); - } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } + if (credential) flb_sds_destroy(credential); + if (credential_enc) flb_sds_destroy(credential_enc); + if (canonical) flb_sds_destroy(canonical); + if (hexhash) flb_sds_destroy(hexhash); + if (string_to_sign) flb_sds_destroy(string_to_sign); + if (hexsig) flb_sds_destroy(hexsig); + if (query) flb_sds_destroy(query); + if (action_enc) flb_sds_destroy(action_enc); + if (presigned_url) flb_sds_destroy(presigned_url); + if (key) flb_sds_destroy(key); + if (payload) flb_sds_destroy(payload); + if (session_token_enc) flb_sds_destroy(session_token_enc); + if (empty_payload_hex) flb_sds_destroy(empty_payload_hex); return NULL; } - -/* Stateless callback - creates AWS provider on-demand for each refresh */ +/* OAuth token refresh callback */ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) @@ -614,7 +478,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_sds_t payload = NULL; rd_kafka_resp_err_t err; char errstr[512]; - int64_t now; + time_t now; int64_t md_lifetime_ms; const char *s3_suffix = "-s3"; size_t arn_len; @@ -622,93 +486,91 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, struct flb_aws_msk_iam *config; struct flb_aws_credentials *creds = NULL; struct flb_kafka_opaque *kafka_opaque; - struct flb_aws_provider *temp_provider = NULL; (void) oauthbearer_config; kafka_opaque = (struct flb_kafka_opaque *) opaque; if (!kafka_opaque || !kafka_opaque->msk_iam_ctx) { - flb_error("[aws_msk_iam] oauthbearer_token_refresh_cb: invalid opaque context"); + flb_error("[aws_msk_iam] invalid opaque context"); rd_kafka_oauthbearer_set_token_failure(rk, "invalid context"); return; } - flb_debug("[aws_msk_iam] running OAuth bearer token refresh callback"); - - /* get the msk_iam config (not persistent context!) */ config = kafka_opaque->msk_iam_ctx; - /* validate region (mandatory) */ if (!config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set"); rd_kafka_oauthbearer_set_token_failure(rk, "region not set"); return; } - /* Determine host endpoint */ + /* Determine MSK endpoint */ if (config->cluster_arn) { arn_len = strlen(config->cluster_arn); suffix_len = strlen(s3_suffix); if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) { snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] MSK Serverless cluster, using generic endpoint: %s", host); } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); } } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); } - flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); + flb_debug("[aws_msk_iam] OAuth token refresh callback triggered"); + + /* Refresh credentials */ + if (config->provider->provider_vtable->refresh(config->provider) < 0) { + flb_warn("[aws_msk_iam] credential refresh failed, will retry on next callback"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential refresh failed"); + return; + } + + /* Get credentials */ + creds = config->provider->provider_vtable->get_credentials(config->provider); + if (!creds) { + flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); + return; + } - /* Generate payload using stateless function - creates and destroys AWS provider internally */ - payload = build_msk_iam_payload(config, host); + /* Generate payload */ + payload = build_msk_iam_payload(config, host, creds); if (!payload) { flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); + flb_aws_credentials_destroy(creds); rd_kafka_oauthbearer_set_token_failure(rk, "payload generation failed"); return; } - /* Get credentials for principal (create temporary provider just for this) */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (temp_provider) { - if (temp_provider->provider_vtable->init(temp_provider) == 0) { - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - } - } - + /* + * Set OAuth token with fixed 5-minute lifetime (AWS industry standard). + * librdkafka's background thread will automatically trigger a refresh callback + * at 80% of the token's lifetime (4 minutes) to ensure the token never expires, + * even on completely idle connections. + */ now = time(NULL); - md_lifetime_ms = (now + 900) * 1000; + md_lifetime_ms = ((int64_t)now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; err = rd_kafka_oauthbearer_set_token(rk, payload, md_lifetime_ms, - creds ? creds->access_key_id : "unknown", + creds->access_key_id, NULL, 0, errstr, sizeof(errstr)); + flb_aws_credentials_destroy(creds); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr); rd_kafka_oauthbearer_set_token_failure(rk, errstr); } else { - flb_info("[aws_msk_iam] OAuth bearer token successfully set"); - } - - /* Clean up everything immediately - no memory leaks possible! */ - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); + flb_info("[aws_msk_iam] OAuth bearer token successfully set with %d second lifetime", + MSK_IAM_TOKEN_LIFETIME_SECONDS); } if (payload) { @@ -716,7 +578,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, } } -/* Register callback with lightweight config - keeps your current interface */ +/* Register OAuth callback */ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config, rd_kafka_conf_t *kconf, const char *cluster_arn, @@ -725,26 +587,21 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con struct flb_aws_msk_iam *ctx; char *region_str; - flb_info("[aws_msk_iam] registering OAuth callback with cluster ARN: %s", cluster_arn); - if (!cluster_arn) { flb_error("[aws_msk_iam] cluster ARN is required"); return NULL; } - /* Allocate lightweight config - NO AWS provider! */ ctx = flb_calloc(1, sizeof(struct flb_aws_msk_iam)); if (!ctx) { flb_errno(); return NULL; } - /* Store the flb_config for on-demand provider creation */ ctx->flb_config = config; ctx->cluster_arn = flb_sds_create(cluster_arn); if (!ctx->cluster_arn) { - flb_error("[aws_msk_iam] failed to create cluster ARN string"); flb_free(ctx); return NULL; } @@ -752,7 +609,7 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con /* Extract region */ region_str = extract_region(cluster_arn); if (!region_str || strlen(region_str) == 0) { - flb_error("[aws_msk_iam] failed to extract region from cluster ARN: %s", cluster_arn); + flb_error("[aws_msk_iam] failed to extract region from ARN"); flb_sds_destroy(ctx->cluster_arn); flb_free(ctx); if (region_str) flb_free(region_str); @@ -763,34 +620,76 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con flb_free(region_str); if (!ctx->region) { - flb_error("[aws_msk_iam] failed to create region string"); flb_sds_destroy(ctx->cluster_arn); flb_free(ctx); return NULL; } - flb_info("[aws_msk_iam] extracted region: %s", ctx->region); + /* Create TLS instance */ + ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + FLB_LOG_DEBUG, + NULL, NULL, NULL, NULL, NULL, NULL); + if (!ctx->cred_tls) { + flb_error("[aws_msk_iam] failed to create TLS instance"); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + + /* Create AWS provider */ + ctx->provider = flb_standard_chain_provider_create(config, + ctx->cred_tls, + ctx->region, + NULL, NULL, + flb_aws_client_generator(), + NULL); + if (!ctx->provider) { + flb_error("[aws_msk_iam] failed to create AWS credentials provider"); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } - /* Set the callback and opaque */ + /* Initialize provider */ + ctx->provider->provider_vtable->sync(ctx->provider); + if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { + flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); + flb_aws_provider_destroy(ctx->provider); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + ctx->provider->provider_vtable->async(ctx->provider); + + /* Register callback */ rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); flb_kafka_opaque_set(opaque, NULL, ctx); rd_kafka_conf_set_opaque(kconf, opaque); - flb_info("[aws_msk_iam] OAuth callback registered successfully"); - return ctx; } -/* Simple destroy - just config cleanup, no AWS provider to leak! */ +/* Destroy MSK IAM config */ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) { if (!ctx) { return; } - flb_info("[aws_msk_iam] destroying MSK IAM config"); + if (ctx->provider) { + flb_aws_provider_destroy(ctx->provider); + } - /* NO AWS provider to destroy! */ + if (ctx->cred_tls) { + flb_tls_destroy(ctx->cred_tls); + } + if (ctx->region) { flb_sds_destroy(ctx->region); }