diff --git a/src/KafkaFlow.Abstractions/Authentication/IOAuthBearerAuthenticator.cs b/src/KafkaFlow.Abstractions/Authentication/IOAuthBearerAuthenticator.cs new file mode 100644 index 000000000..21953f160 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Authentication/IOAuthBearerAuthenticator.cs @@ -0,0 +1,26 @@ +using System.Collections.Generic; + +namespace KafkaFlow.Authentication; + +/// +/// Authentication handler for OAuth Bearer. +/// +public interface IOAuthBearerAuthenticator +{ + /// + /// Set SASL/OAUTHBEARER token and metadata. The SASL/OAUTHBEARER token refresh callback or event handler should invoke this method upon + /// success. The extension keys must not include the reserved key "`auth`", and all extension keys and values must conform to the required + /// format as per https://tools.ietf.org/html/rfc7628#section-3.1. + /// + /// The mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1 + /// When the token expires, in terms of the number of milliseconds since the epoch + /// The mandatory Kafka principal name associated with the token + /// Optional SASL extensions dictionary, to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1 + void SetToken(string tokenValue, long lifetimeMs, string principalName, IDictionary extensions = null); + + /// + /// SASL/OAUTHBEARER token refresh failure indicator. The SASL/OAUTHBEARER token refresh callback or event handler should invoke this method upon failure. + /// + /// Mandatory human readable error reason for failing to acquire a token + void SetTokenFailure(string error); +} diff --git a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs index 26d64c25e..3b4177c05 100644 --- a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs +++ b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs @@ -1,12 +1,12 @@ -namespace KafkaFlow.Configuration -{ - /// SaslOauthbearerMethod enum values - public enum SaslOauthbearerMethod - { - /// Default - Default, - - /// Oidc - Oidc, - } -} +namespace KafkaFlow.Configuration +{ + /// SaslOauthbearerMethod enum values + public enum SaslOauthbearerMethod + { + /// Default + Default, + + /// Oidc + Oidc, + } +} diff --git a/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs b/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs index f6b614289..2c0654282 100644 --- a/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs +++ b/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs @@ -1,3 +1,6 @@ +using KafkaFlow.Authentication; +using System; + namespace KafkaFlow.Configuration; /// @@ -249,4 +252,9 @@ public class SecurityInformation /// importance: low /// public string SaslOauthbearerScope { get; set; } -} \ No newline at end of file + + /// + /// Gets or sets the OAuthBearerTokenRefreshHandler for custom OAuth authentication. + /// + public Action OAuthBearerTokenRefreshHandler { get; set; } +} diff --git a/src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs b/src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs new file mode 100644 index 000000000..7aa57f3a1 --- /dev/null +++ b/src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs @@ -0,0 +1,24 @@ +using System.Collections.Generic; +using Confluent.Kafka; + +namespace KafkaFlow.Authentication; + +internal readonly struct OAuthBearerAuthenticator : IOAuthBearerAuthenticator +{ + private readonly IClient _client; + + public OAuthBearerAuthenticator(IClient client) + { + _client = client; + } + + public void SetToken(string tokenValue, long lifetimeMs, string principalName, IDictionary extensions = null) + { + _client.OAuthBearerSetToken(tokenValue, lifetimeMs, principalName, extensions); + } + + public void SetTokenFailure(string error) + { + _client.OAuthBearerSetTokenFailure(error); + } +} diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs index 28e7381f0..7a4b84620 100644 --- a/src/KafkaFlow/Clusters/ClusterManager.cs +++ b/src/KafkaFlow/Clusters/ClusterManager.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Confluent.Kafka; using Confluent.Kafka.Admin; +using KafkaFlow.Authentication; using KafkaFlow.Configuration; namespace KafkaFlow.Clusters; @@ -32,8 +33,22 @@ public ClusterManager(ILogHandler logHandler, ClusterConfiguration configuration config.ReadSecurityInformationFrom(configuration); - return new AdminClientBuilder(config) - .Build(); + var adminClientBuilder = new AdminClientBuilder(config); + + var security = configuration.GetSecurityInformation(); + + if (security?.OAuthBearerTokenRefreshHandler != null) + { + var handler = security.OAuthBearerTokenRefreshHandler; + + adminClientBuilder.SetOAuthBearerTokenRefreshHandler((client, _) => + { + var authenticator = new OAuthBearerAuthenticator(client); + handler(authenticator); + }); + } + + return adminClientBuilder.Build(); }); } diff --git a/src/KafkaFlow/Configuration/ClusterConfiguration.cs b/src/KafkaFlow/Configuration/ClusterConfiguration.cs index c1b3fee25..6e35340a0 100644 --- a/src/KafkaFlow/Configuration/ClusterConfiguration.cs +++ b/src/KafkaFlow/Configuration/ClusterConfiguration.cs @@ -14,6 +14,8 @@ public class ClusterConfiguration private readonly List _producers = new(); private readonly List _consumers = new(); private readonly ReadOnlyCollection _topicsToCreateIfNotExist; + private SecurityInformation _securityInformation; + private bool _securityInformationLoaded; /// /// Initializes a new instance of the class. @@ -103,5 +105,14 @@ public class ClusterConfiguration /// Gets the kafka security information /// /// - public SecurityInformation GetSecurityInformation() => _securityInformationHandler?.Invoke(); + public SecurityInformation GetSecurityInformation() + { + if (!_securityInformationLoaded) + { + _securityInformation = _securityInformationHandler?.Invoke(); + _securityInformationLoaded = true; + } + + return _securityInformation; + } } diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs index 8dad2ab5d..099b5b482 100644 --- a/src/KafkaFlow/Consumers/Consumer.cs +++ b/src/KafkaFlow/Consumers/Consumer.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; +using KafkaFlow.Authentication; using KafkaFlow.Configuration; namespace KafkaFlow.Consumers; @@ -235,25 +236,35 @@ private void EnsureConsumer() var kafkaConfig = this.Configuration.GetKafkaConfig(); - var consumerBuilder = new ConsumerBuilder(kafkaConfig); - - _consumer = - consumerBuilder - .SetPartitionsAssignedHandler( - (consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions)) - .SetPartitionsRevokedHandler( - (consumer, partitions) => - { - this.Assignment = new List(); - this.Subscription = new List(); - _currentPartitionsOffsets.Clear(); - _flowManager.Stop(); - - _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); - }) - .SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error))) - .SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics))) - .Build(); + var consumerBuilder = new ConsumerBuilder(kafkaConfig) + .SetPartitionsAssignedHandler( + (consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions)) + .SetPartitionsRevokedHandler( + (consumer, partitions) => + { + this.Assignment = new List(); + this.Subscription = new List(); + _currentPartitionsOffsets.Clear(); + _flowManager.Stop(); + _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + }) + .SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error))) + .SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics))); + + var security = this.Configuration.ClusterConfiguration.GetSecurityInformation(); + + if (security?.OAuthBearerTokenRefreshHandler != null) + { + var handler = security.OAuthBearerTokenRefreshHandler; + + consumerBuilder.SetOAuthBearerTokenRefreshHandler((client, _) => + { + var authenticator = new OAuthBearerAuthenticator(client); + handler(authenticator); + }); + } + + _consumer = consumerBuilder.Build(); if (this.Configuration.Topics.Any()) { diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index f696214f4..13161360f 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -2,6 +2,7 @@ using System.Text; using System.Threading.Tasks; using Confluent.Kafka; +using KafkaFlow.Authentication; using KafkaFlow.Configuration; namespace KafkaFlow.Producers; @@ -261,6 +262,19 @@ private static void FillContextWithResultMetadata(IMessageContext context, Deliv } }); + var security = _configuration.Cluster.GetSecurityInformation(); + + if (security?.OAuthBearerTokenRefreshHandler != null) + { + var handler = security.OAuthBearerTokenRefreshHandler; + + producerBuilder.SetOAuthBearerTokenRefreshHandler((client, _) => + { + var authenticator = new OAuthBearerAuthenticator(client); + handler(authenticator); + }); + } + return _producer = _configuration.CustomFactory( producerBuilder.Build(), _producerDependencyScope.Resolver);