Skip to content

Commit

Permalink
Kafka: Added possibility for SASL Authorization with KafkaCluster (#3953
Browse files Browse the repository at this point in the history
)

* Implemented SASL Kafka Auth

* Added new SASL Properties to ProducerConfig

* Added 2 headers

---------

Co-authored-by: Yannick Laubscher <yannick.laubscher@swissteach.ch>
  • Loading branch information
Snotax and yannicklaubscherswt committed Apr 25, 2023
1 parent 598d456 commit 86de5d3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 10 deletions.
31 changes: 31 additions & 0 deletions src/activities/Elsa.Activities.Kafka/Configuration/KafkaOptions.cs
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Elsa.Activities.Kafka.Configuration
{
public class KafkaOptions
{
/// <summary>
/// Username for BasicAuth Connection
/// </summary>
public string? SaslUsername { get; set; }

/// <summary>
/// Password for BasicAuth Connection
/// </summary>
public string? SaslPassword { get; set; }

/// <summary>
/// Mechanism to be used for SASL authentication
/// </summary>
public Confluent.Kafka.SaslMechanism SaslMechanism { get; set; }

/// <summary>
/// Security protocol to be used for communication with brokers
/// </summary>
public Confluent.Kafka.SecurityProtocol SecurityProtocol { get; set; }
}
}
Expand Up @@ -19,10 +19,13 @@ namespace Elsa.Activities.Kafka.Extensions
{
public static class ServiceCollectionExtensions
{
public static ElsaOptionsBuilder AddKafkaActivities(this ElsaOptionsBuilder options)
public static ElsaOptionsBuilder AddKafkaActivities(this ElsaOptionsBuilder options, KafkaOptions? kafkaOptions = null)
{
kafkaOptions = kafkaOptions ?? new KafkaOptions();

options.Services
.AddSingleton<BusClientFactory>()
.AddSingleton(kafkaOptions)
.AddSingleton<IMessageReceiverClientFactory>(sp => sp.GetRequiredService<BusClientFactory>())
.AddSingleton<IMessageSenderClientFactory>(sp => sp.GetRequiredService<BusClientFactory>())
.AddSingleton<IWorkerManager, WorkerManager>()
Expand Down
Expand Up @@ -11,7 +11,13 @@ public class BusClientFactory : IMessageReceiverClientFactory, IMessageSenderCli
private readonly IDictionary<int, IClient> _senders = new Dictionary<int, IClient>();

private readonly SemaphoreSlim _semaphore = new(1);


private readonly KafkaOptions _options;
public BusClientFactory(KafkaOptions options)
{
_options = options;
}

public async Task<IClient> GetReceiverAsync(KafkaConfiguration configuration, CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellationToken);
Expand All @@ -21,7 +27,7 @@ public async Task<IClient> GetReceiverAsync(KafkaConfiguration configuration, Ca
if (_receivers.TryGetValue(configuration.GetHashCode(), out var messageReceiver))
return messageReceiver;

var newMessageReceiver = new Client(configuration);
var newMessageReceiver = new Client(configuration, _options);
_receivers.Add(configuration.GetHashCode(), newMessageReceiver);
return newMessageReceiver;
}
Expand Down Expand Up @@ -57,7 +63,7 @@ public async Task<IClient> GetSenderAsync(KafkaConfiguration configuration, Canc
if (_senders.TryGetValue(configuration.GetHashCode(), out var messageSender))
return messageSender;

var newMessageSender = new Client(configuration);
var newMessageSender = new Client(configuration, _options);
_senders.Add(configuration.GetHashCode(), newMessageSender);
return newMessageSender;
}
Expand Down
16 changes: 13 additions & 3 deletions src/activities/Elsa.Activities.Kafka/Services/Client.cs
Expand Up @@ -17,10 +17,12 @@ public class Client : IClient
private IConsumer<Ignore, byte[]>? _consumer;
private Func<KafkaMessageEvent, Task>? _messageHandler;
private Func<Exception, Task>? _errHandler;
private readonly KafkaOptions _kafkaOptions;

public Client(KafkaConfiguration configuration)
public Client(KafkaConfiguration configuration, KafkaOptions options)
{
Configuration = configuration;
_kafkaOptions = options;
}

public KafkaConfiguration Configuration { get; }
Expand All @@ -40,6 +42,10 @@ public Task StartProcessing(string topic, string group)
BootstrapServers = Configuration.ConnectionString,
GroupId = group,
AutoOffsetReset = Configuration.AutoOffsetReset,
SaslMechanism = _kafkaOptions.SaslMechanism,
SaslUsername = _kafkaOptions.SaslUsername,
SecurityProtocol = _kafkaOptions.SecurityProtocol,
SaslPassword = _kafkaOptions.SaslPassword,
};

_consumer = new ConsumerBuilder<Ignore, byte[]>(consumerConfig).Build();
Expand All @@ -54,7 +60,11 @@ public async Task PublishMessage(string message)
{
var producerConfig = new ProducerConfig()
{
BootstrapServers = Configuration.ConnectionString
BootstrapServers = Configuration.ConnectionString,
SaslMechanism = _kafkaOptions.SaslMechanism,
SaslPassword = _kafkaOptions.SaslPassword,
SaslUsername = _kafkaOptions.SaslUsername,
SecurityProtocol = _kafkaOptions.SecurityProtocol,
};

using var producer = new ProducerBuilder<Null, string>(producerConfig).Build();
Expand All @@ -71,7 +81,7 @@ public async Task Dispose()
{
_consumer.Unsubscribe();
_consumer.Close();
}
}
}
catch (Exception e)
{
Expand Down
Expand Up @@ -22,16 +22,19 @@ public class WorkerManager : IWorkerManager
private readonly ICollection<Worker> _workers;
private readonly ILogger _logger;
private readonly IBookmarkSerializer _bookmarkSerializer;

private readonly KafkaOptions _kafkaOptions;

public WorkerManager(
IServiceProvider serviceProvider,
ILogger<WorkerManager> logger,
IBookmarkSerializer bookmarkSerializer)
IBookmarkSerializer bookmarkSerializer,
KafkaOptions? kafkaOptions)
{
_serviceProvider = serviceProvider;
_logger = logger;
_bookmarkSerializer = bookmarkSerializer;
_workers = new List<Worker>();
_kafkaOptions = kafkaOptions ?? new KafkaOptions();
}

public async Task CreateWorkersAsync(IReadOnlyCollection<Trigger> triggers, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -116,7 +119,7 @@ private async Task GetOrCreateWorkerAsync(string tag, KafkaConfiguration configu
_serviceProvider,
configuration.Topic,
configuration.Group ?? "",
new Client(configuration),
new Client(configuration, _kafkaOptions),
(Func<Worker, IClient, Task>)(async (w, c) => await RemoveAndRespawnWorkerAsync(w, c, tag, configuration)));

_logger.LogDebug("Created worker for {QueueOrTopic}", worker.Topic);
Expand Down

0 comments on commit 86de5d3

Please sign in to comment.