Skip to content

Commit

Permalink
Fix invalid kafka user agent (#30933)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiliuTo committed Sep 21, 2022
1 parent bb73895 commit f17a602
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 340 deletions.
1 change: 1 addition & 0 deletions sdk/spring/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module.
#### Bugs Fixed
- Fix bug: Cannot configure "azure" authorization client. [#30354](https://github.com/Azure/azure-sdk-for-java/issues/30354).
- Fix parameter `requested_token_use` missing when using On behalf of process [#30359](https://github.com/Azure/azure-sdk-for-java/issues/30359).
- Fix the invalid user agent for Apache Kafka [#30574](https://github.com/Azure/azure-sdk-for-java/pull/30933).
- Fix Kafka `OAuth2AuthenticateCallbackHandler` cannot work with Kafka refreshing login mechanism [#30719](https://github.com/Azure/azure-sdk-for-java/issues/30719).
- Fix the cloud type cannot be configured for a consumer/producer/processor of Service Bus / Event Hubs bug [#30936](https://github.com/Azure/azure-sdk-for-java/issues/30936).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,35 @@
import com.azure.spring.cloud.autoconfigure.context.AzureGlobalProperties;
import com.azure.spring.cloud.service.implementation.kafka.AzureKafkaProperties;
import com.azure.spring.cloud.service.implementation.kafka.KafkaOAuth2AuthenticateCallbackHandler;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.azure.spring.cloud.service.implementation.kafka.AzureKafkaPropertiesUtils.convertConfigMapToAzureProperties;
import static com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier.AZURE_SPRING_EVENT_HUBS_KAFKA_OAUTH;
import static com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier.VERSION;
import static com.azure.spring.cloud.core.implementation.util.AzurePropertiesUtils.copyPropertiesIgnoreNull;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL;
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
import static org.springframework.util.StringUtils.delimitedListToStringArray;

public final class AzureKafkaAutoconfigurationUtils {
public final class AzureKafkaConfigurationUtils {

public static final Map<String, String> KAFKA_OAUTH_CONFIGS;
public static final String SECURITY_PROTOCOL_CONFIG_SASL = SASL_SSL.name();
Expand All @@ -32,13 +43,14 @@ public final class AzureKafkaAutoconfigurationUtils {
public static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH =
KafkaOAuth2AuthenticateCallbackHandler.class.getName();

private static final Logger LOGGER = LoggerFactory.getLogger(AzureKafkaAutoconfigurationUtils.class);
//TODO(yiiuTo): add reference doc here for the log.
private static final Logger LOGGER = LoggerFactory.getLogger(AzureKafkaConfigurationUtils.class);
//TODO(yiliuTo): add reference doc here for the log.
private static final String LOG_OAUTH_AUTOCONFIGURATION_RECOMMENDATION = "Currently {} authentication mechanism is used, recommend to use Spring Cloud Azure auto-configuration for Kafka OAUTHBEARER authentication"
+ " which supports various Azure Identity credentials. To leverage the auto-configuration for OAuth2, you can just remove all your security, sasl and credential configurations of Kafka and Event Hubs."
+ " And configure Kafka bootstrap servers instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093.";
private static final String LOG_OAUTH_AUTOCONFIGURATION_CONFIGURE = "Spring Cloud Azure auto-configuration for Kafka OAUTHBEARER authentication will be loaded to configure your Kafka security and sasl properties to support Azure Identity credentials.";
private static final String LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE = "OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.";
private static final String KAFKA_OAUTH2_USER_AGENT = "." + AZURE_SPRING_EVENT_HUBS_KAFKA_OAUTH;

static {
Map<String, String> configs = new HashMap<>();
Expand All @@ -49,7 +61,26 @@ public final class AzureKafkaAutoconfigurationUtils {
KAFKA_OAUTH_CONFIGS = Collections.unmodifiableMap(configs);
}

private AzureKafkaAutoconfigurationUtils() {
private AzureKafkaConfigurationUtils() {
}

/**
* Configure Spring Cloud Azure user-agent for Kafka client. This method is idempotent to avoid configuring UA repeatedly.
*/
public static synchronized void configureKafkaUserAgent() {
Method dataMethod = ReflectionUtils.findMethod(ApiVersionsRequest.class, "data");
if (dataMethod != null) {
ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build();
ApiVersionsRequestData apiVersionsRequestData = (ApiVersionsRequestData) ReflectionUtils.invokeMethod(dataMethod, apiVersionsRequest);
if (apiVersionsRequestData != null) {
String clientSoftwareName = apiVersionsRequestData.clientSoftwareName();
if (clientSoftwareName != null && !clientSoftwareName.contains(KAFKA_OAUTH2_USER_AGENT)) {
apiVersionsRequestData.setClientSoftwareName(apiVersionsRequestData.clientSoftwareName()
+ KAFKA_OAUTH2_USER_AGENT);
apiVersionsRequestData.setClientSoftwareVersion(VERSION);
}
}
}
}

/**
Expand All @@ -60,6 +91,10 @@ private AzureKafkaAutoconfigurationUtils() {
* @return whether we need to configure with Spring Cloud Azure MSI support or not.
*/
public static boolean needConfigureSaslOAuth(Map<String, Object> sourceProperties) {
return meetAzureBootstrapServerConditions(sourceProperties) && meetSaslOAuthConditions(sourceProperties);
}

private static boolean meetSaslOAuthConditions(Map<String, Object> sourceProperties) {
String securityProtocol = (String) sourceProperties.get(SECURITY_PROTOCOL_CONFIG);
String saslMechanism = (String) sourceProperties.get(SASL_MECHANISM);
if (securityProtocol == null || (SECURITY_PROTOCOL_CONFIG_SASL.equalsIgnoreCase(securityProtocol)
Expand All @@ -70,25 +105,47 @@ public static boolean needConfigureSaslOAuth(Map<String, Object> sourcePropertie
return false;
}

private static boolean meetAzureBootstrapServerConditions(Map<String, Object> sourceProperties) {
Object bootstrapServers = sourceProperties.get(BOOTSTRAP_SERVERS_CONFIG);
List<String> serverList;
if (bootstrapServers instanceof String) {
serverList = Arrays.asList(delimitedListToStringArray((String) bootstrapServers, ","));
} else if (bootstrapServers instanceof Iterable<?>) {
serverList = new ArrayList<>();
for (Object obj : (Iterable) bootstrapServers) {
if (obj instanceof String) {
serverList.add((String) obj);
} else {
LOGGER.debug("Kafka bootstrap server configuration doesn't meet passwordless requirements.");
return false;
}
}
} else {
LOGGER.debug("Kafka bootstrap server configuration doesn't meet passwordless requirements.");
return false;
}

return serverList.size() == 1 && serverList.get(0).endsWith(":9093");
}

/**
* Configure necessary OAuth properties for kafka properties and log for the changes.
* Configure necessary OAuth properties for kafka properties.
*
* @param propertiesToConfigure kafka properties to be customized
*/
public static void configureOAuthProperties(Map<String, String> propertiesToConfigure) {
propertiesToConfigure.putAll(AzureKafkaAutoconfigurationUtils.KAFKA_OAUTH_CONFIGS);
logConfigureOAuthProperties();
propertiesToConfigure.putAll(AzureKafkaConfigurationUtils.KAFKA_OAUTH_CONFIGS);
}

/**
* Configure necessary OAuth properties for kafka properties and log for the changes.
*/
public static void logConfigureOAuthProperties() {
LOGGER.warn(LOG_OAUTH_AUTOCONFIGURATION_CONFIGURE);
LOGGER.warn(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL);
LOGGER.warn(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_MECHANISM, SASL_MECHANISM_OAUTH);
LOGGER.warn(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_JAAS_CONFIG, SASL_JAAS_CONFIG_OAUTH);
LOGGER.warn(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_LOGIN_CALLBACK_HANDLER_CLASS,
LOGGER.info(LOG_OAUTH_AUTOCONFIGURATION_CONFIGURE);
LOGGER.debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL);
LOGGER.debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_MECHANISM, SASL_MECHANISM_OAUTH);
LOGGER.debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_JAAS_CONFIG, SASL_JAAS_CONFIG_OAUTH);
LOGGER.debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_LOGIN_CALLBACK_HANDLER_CLASS,
SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@
// Licensed under the MIT License.
package com.azure.spring.cloud.autoconfigure.kafka;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

import com.azure.core.credential.TokenCredential;
import com.azure.spring.cloud.autoconfigure.context.AzureGlobalProperties;
import com.azure.spring.cloud.core.implementation.credential.resolver.AzureTokenCredentialResolver;
import com.azure.spring.cloud.service.implementation.kafka.AzureKafkaProperties;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.requests.ApiVersionsRequest;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
Expand All @@ -23,16 +20,15 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.ReflectionUtils;

import static com.azure.spring.cloud.autoconfigure.context.AzureContextUtils.DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaAutoconfigurationUtils.KAFKA_OAUTH_CONFIGS;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaAutoconfigurationUtils.buildAzureProperties;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaAutoconfigurationUtils.logConfigureOAuthProperties;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaAutoconfigurationUtils.needConfigureSaslOAuth;
import static com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier.AZURE_SPRING_EVENT_HUBS_KAFKA_OAUTH;
import static com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier.VERSION;

import static com.azure.spring.cloud.service.implementation.kafka.AzureKafkaPropertiesUtils.AZURE_TOKEN_CREDENTIAL;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaConfigurationUtils.KAFKA_OAUTH_CONFIGS;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaConfigurationUtils.buildAzureProperties;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaConfigurationUtils.configureKafkaUserAgent;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaConfigurationUtils.logConfigureOAuthProperties;
import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureKafkaConfigurationUtils.needConfigureSaslOAuth;

/**
* {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support. Provide Azure Identity-based
Expand All @@ -58,51 +54,42 @@ public class AzureEventHubsKafkaOAuth2AutoConfiguration {
this.tokenCredentialResolver = resolver;
this.defaultTokenCredential = defaultTokenCredential;
this.azureGlobalProperties = azureGlobalProperties;
configureKafkaUserAgent();
}

@Bean
DefaultKafkaConsumerFactoryCustomizer azureOAuth2KafkaConsumerFactoryCustomizer() {
Map<String, Object> updateConfigs = new HashMap<>();
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
configureOAuth2Properties(updateConfigs, consumerProperties);
if (needConfigureSaslOAuth(consumerProperties)) {
configureOAuth2Properties(consumerProperties, updateConfigs);
configureKafkaUserAgent();
}
return factory -> factory.updateConfigs(updateConfigs);
}

@Bean
DefaultKafkaProducerFactoryCustomizer azureOAuth2KafkaProducerFactoryCustomizer() {
Map<String, Object> updateConfigs = new HashMap<>();
Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
configureOAuth2Properties(updateConfigs, producerProperties);
if (needConfigureSaslOAuth(producerProperties)) {
configureOAuth2Properties(producerProperties, updateConfigs);
configureKafkaUserAgent();
}
return factory -> factory.updateConfigs(updateConfigs);
}

private void configureOAuth2Properties(Map<String, Object> updateConfigs, Map<String, Object> sourceKafkaProperties) {
if (needConfigureSaslOAuth(sourceKafkaProperties)) {
AzureKafkaProperties azureKafkaProperties = buildAzureProperties(sourceKafkaProperties,
azureGlobalProperties);
updateConfigs.put(AZURE_TOKEN_CREDENTIAL, resolveSpringCloudAzureTokenCredential(azureKafkaProperties));
updateConfigs.putAll(KAFKA_OAUTH_CONFIGS);
logConfigureOAuthProperties();
}

private void configureOAuth2Properties(Map<String, Object> sourceKafkaProperties, Map<String, Object> updateConfigs) {
AzureKafkaProperties azureKafkaProperties = buildAzureProperties(sourceKafkaProperties,
azureGlobalProperties);
updateConfigs.put(AZURE_TOKEN_CREDENTIAL, resolveSpringCloudAzureTokenCredential(azureKafkaProperties));
updateConfigs.putAll(KAFKA_OAUTH_CONFIGS);
logConfigureOAuthProperties();
}

private TokenCredential resolveSpringCloudAzureTokenCredential(AzureKafkaProperties azureKafkaProperties) {
TokenCredential tokenCredential = tokenCredentialResolver.resolve(azureKafkaProperties);
return tokenCredential == null ? defaultTokenCredential : tokenCredential;
}

private void configureKafkaUserAgent() {
Method dataMethod = ReflectionUtils.findMethod(ApiVersionsRequest.class, "data");
if (dataMethod != null) {
ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest.Builder().build();
ApiVersionsRequestData apiVersionsRequestData = (ApiVersionsRequestData) ReflectionUtils.invokeMethod(dataMethod, apiVersionsRequest);
if (apiVersionsRequestData != null) {
apiVersionsRequestData.setClientSoftwareName(apiVersionsRequestData.clientSoftwareName()
+ "/" + AZURE_SPRING_EVENT_HUBS_KAFKA_OAUTH);
apiVersionsRequestData.setClientSoftwareVersion(VERSION);
}
}
}

}
Loading

0 comments on commit f17a602

Please sign in to comment.