Skip to content

Commit

Permalink
Add KafkaEventAggregator (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
meghasemim1999 committed Oct 26, 2023
1 parent c3a367a commit 9a8efe2
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 49 deletions.
3 changes: 2 additions & 1 deletion Source/BSN.Commons/Infrastructure/Kafka/IKafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace BSN.Commons.Infrastructure.Kafka
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
namespace BSN.Commons.Infrastructure.Kafka
using System;

namespace BSN.Commons.Infrastructure.Kafka
{
/// <summary>
/// Represents a factory class for IKafkaConsumer classes
/// </summary>
/// <typeparam name="T">the type which the created consumer classes are based on</typeparam>
public interface IKafkaConsumerFactory<T>
public interface IKafkaConsumerFactory<T> : IDisposable
{
/// <summary>
/// Creates a IKafkaConsumer
Expand Down
11 changes: 9 additions & 2 deletions Source/BSN.Commons/Infrastructure/Kafka/IKafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace BSN.Commons.Infrastructure.Kafka
{
Expand Down Expand Up @@ -26,7 +27,13 @@ public interface IKafkaProducer<T>
/// </summary>
/// <param name="message">The message that is being produced</param>
/// <exception cref="KafkaProduceException{T}"></exception>
/// <returns>Whether the produce operation was successful or not</returns>
void ProduceAsync(T message);
Task ProduceAsync(T message);

/// <summary>
/// Produces the Kafka api, raises exceptions for errors
/// </summary>
/// <param name="message">The message that is being produced</param>
/// <exception cref="KafkaProduceException{T}"></exception>
void Produce(T message);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
namespace BSN.Commons.Infrastructure.Kafka
using System;

namespace BSN.Commons.Infrastructure.Kafka
{
/// <summary>
/// Represents a factory class for IKafkaProducer classes
/// </summary>
/// <typeparam name="T">the type which the created producer classes are based on</typeparam>
public interface IKafkaProducerFactory<T>
public interface IKafkaProducerFactory<T> : IDisposable
{
/// <summary>
/// Creates a IKafkaProducer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,5 @@ public interface IKafkaProducerOptions
/// List of kafka bootstrap servers seperated by ;
/// </summary>
string BootstrapServers { get; }

/// <summary>
/// Represents the Size that a Receiving message can have in Bytes
/// </summary>
string ReceiveMessageMaxBytes { get; }
}
}
9 changes: 7 additions & 2 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace BSN.Commons.Infrastructure.Kafka
/// <inheritdoc />
public class KafkaConsumer<T> : IKafkaConsumer<T>
{
/// <param name="consumer">the consumer engine provided by creator for example factory class</param>
public KafkaConsumer(IConsumer<Null, T> consumer)
/// <param name="consumer">the consumer engine provided by factory class</param>
internal KafkaConsumer(IConsumer<Null, T> consumer)
{
_consumer = consumer;
}
Expand All @@ -23,6 +23,11 @@ public Task<T> ConsumeAsync(CancellationToken ct = default)
return result.Message.Value;
});
}

internal void Dispose()
{
_consumer.Dispose();
}

private readonly IConsumer<Null, T> _consumer;
}
Expand Down
59 changes: 45 additions & 14 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System.Collections.Generic;

namespace BSN.Commons.Infrastructure.Kafka
{
Expand All @@ -9,30 +11,59 @@ public class KafkaConsumerFactory<T> : IKafkaConsumerFactory<T>
/// <param name="options">Default Options for KafkaConsumers</param>
public KafkaConsumerFactory(IKafkaConsumerOptions options)
{
_defaultConsumerConfig = new ConsumerConfig
_defaultConsumerOptions = options;
_consumers = new Dictionary<string, KafkaConsumer<T>>();
}

/// <summary>
/// Creates a new Kafka consumer with the specified topic and group id.
/// this returns a thread safe consumer and can be used in multi-threaded environments.
/// for multiple calls with the same topic and group id, the same consumer will be returned.
/// </summary>
/// <param name="topic"></param>
/// <param name="groupId"></param>
/// <returns></returns>
public IKafkaConsumer<T> Create(string topic, string groupId)
{
string consumerKey = topic + ":" + groupId;

if (_consumers.ContainsKey(consumerKey))
{
return _consumers[consumerKey];
}

var config = new ConsumerConfig()
{
BootstrapServers = options.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest
BootstrapServers = _defaultConsumerOptions.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
};

// Here we did this because the ReceiveMessageMaxBytes in ProducerConfig type
// is int and can not accept high values that we expect
_defaultConsumerConfig.Set("receive.message.max.bytes", options.ReceiveMessageMaxBytes);
}
config.Set("receive.message.max.bytes", _defaultConsumerOptions.ReceiveMessageMaxBytes);

// Here Null means that the key in kafka message is null
// it helps equal distribution of messages in the kafka cluster
var consumerEngine = new ConsumerBuilder<Null, T>(config).Build();
consumerEngine.Subscribe(topic);

var consumer = new KafkaConsumer<T>(consumerEngine);

_consumers.Add(consumerKey, consumer);

return consumer;
}

/// <inheritdoc />
public IKafkaConsumer<T> Create(string topic, string groupId)
public void Dispose()
{
lock (this)
foreach (var consumer in _consumers)
{
var config = new ConsumerConfig(_defaultConsumerConfig) { GroupId = groupId };
var consumer = new ConsumerBuilder<Null, T>(config).Build();
consumer.Subscribe(topic);

return new KafkaConsumer<T>(consumer);
consumer.Value.Dispose();
}
}

private readonly ConsumerConfig _defaultConsumerConfig;

private readonly Dictionary<string, KafkaConsumer<T>> _consumers;
private readonly IKafkaConsumerOptions _defaultConsumerOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
/// <inheritdoc />
public class KafkaConsumerOptions : IKafkaConsumerOptions
{
/// <param name="bootstrapServers"><see cref="BootstrapServers"/></param>
/// <param name="receiveMessageMaxBytes"><see cref="ReceiveMessageMaxBytes"/></param>
/// <summary>
/// Initializes a new instance of the <see cref="KafkaConsumerOptions"/> class.
/// </summary>
/// <param name="bootstrapServers">List of kafka bootstrap servers seperated by ;</param>
/// <param name="receiveMessageMaxBytes">Represents the Size that a Receiving message can have in Bytes</param>
public KafkaConsumerOptions(string bootstrapServers, string receiveMessageMaxBytes)
{
BootstrapServers = bootstrapServers;
Expand Down
23 changes: 19 additions & 4 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Confluent.Kafka;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace BSN.Commons.Infrastructure.Kafka
{
Expand All @@ -8,14 +9,14 @@ public class KafkaProducer<T> : IKafkaProducer<T>
/// <param name="producer">the producer engine provided by creator for example factory class</param>
/// <param name="topic">Represents the topic which the producer will produce on,
/// Read https://dattell.com/data-architecture-blog/what-is-a-kafka-topic/ for further info.</param>
public KafkaProducer(IProducer<Null, T> producer, string topic)
internal KafkaProducer(IProducer<Null, T> producer, string topic)
{
_producer = producer;
_topic = topic;
}

/// <inheritdoc />
public async void ProduceAsync(T message)
public async Task ProduceAsync(T message)
{
var messageObject = new Message<Null, T> { Value = message };

Expand All @@ -29,8 +30,22 @@ public async void ProduceAsync(T message)
}
}

private readonly string _topic;
/// <inheritdoc />
public void Produce(T message)
{
var messageObject = new Message<Null, T> { Value = message };

try
{
_producer.Produce(_topic, messageObject);
}
catch (ProduceException<Null, T> e)
{
throw new KafkaProduceException<T>(e.Error.Reason);
}
}

private readonly string _topic;
private readonly IProducer<Null, T> _producer;
}
}
30 changes: 22 additions & 8 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaProducerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Confluent.Kafka;
using System.Collections.Generic;
using Confluent.Kafka;

namespace BSN.Commons.Infrastructure.Kafka
{
Expand All @@ -14,19 +15,32 @@ public KafkaProducerFactory(IKafkaProducerOptions options)
BootstrapServers = options.BootstrapServers,
};

// Here we did this because the ReceiveMessageMaxBytes in ProducerConfig type
// is int and can not accept high values that we expect
producerConfig.Set("receive.message.max.bytes", options.ReceiveMessageMaxBytes);

_sharedProducer = new ProducerBuilder<Null, T>(producerConfig).Build();
_sharedProducerEngine = new ProducerBuilder<Null, T>(producerConfig).Build();
_producers = new Dictionary<string, KafkaProducer<T>>();
}

/// <inheritdoc />
public IKafkaProducer<T> Create(string topic)
{
return new KafkaProducer<T>(_sharedProducer, topic);
if (_producers.ContainsKey(topic))
{
return _producers[topic];
}

var producer = new KafkaProducer<T>(_sharedProducerEngine, topic);

_producers.Add(topic, producer);

return producer;
}

/// <inheritdoc />
public void Dispose()
{
_sharedProducerEngine?.Dispose();
}

private readonly IProducer<Null, T> _sharedProducer;
private readonly IProducer<Null, T> _sharedProducerEngine;
private readonly Dictionary<string, KafkaProducer<T>> _producers;
}
}
12 changes: 5 additions & 7 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaProducerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@
/// <inheritdoc />
public class KafkaProducerOptions : IKafkaProducerOptions
{
/// <param name="bootstrapServers"><see cref="BootstrapServers"/></param>
/// <param name="receiveMessageMaxBytes"><see cref="ReceiveMessageMaxBytes"/></param>
public KafkaProducerOptions(string bootstrapServers, string receiveMessageMaxBytes)
/// <summary>
/// Initializes a new instance of the <see cref="KafkaProducerOptions"/> class.
/// </summary>
/// <param name="bootstrapServers"></param>
public KafkaProducerOptions(string bootstrapServers)
{
BootstrapServers = bootstrapServers;
ReceiveMessageMaxBytes = receiveMessageMaxBytes;
}

/// <inheritdoc />
public string BootstrapServers { get; }

/// <inheritdoc />
public string ReceiveMessageMaxBytes { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace BSN.Commons.Infrastructure.MessageBroker.Kafka
{
/// <summary>
/// Represents options for configuring a Kafka connection.
/// </summary>
public class KafkaConnectionOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="KafkaConnectionOptions"/> class.
/// </summary>
/// <param name="bootstrapServers"></param>
/// <param name="consumerGroupId"></param>
/// <param name="receiveMessageMaxBytes"></param>
public KafkaConnectionOptions(string bootstrapServers, string consumerGroupId, string receiveMessageMaxBytes)
{
BootstrapServers = bootstrapServers;
ConsumerGroupId = consumerGroupId;
ReceiveMessageMaxBytes = receiveMessageMaxBytes;
}

/// <summary>
/// these servers are used to bootstrap the initial connection to the Kafka cluster.
/// it is a list of host/port pairs separated by commas.
/// for example, "broker1:9092,broker2:9092".
/// </summary>
public string BootstrapServers { get; }

/// <summary>
/// this is a unique string that identifies the consumer group this consumer belongs to.
/// each message sent to a topic is delivered to one consumer instance within each subscribing consumer group.
/// it is useful for parallelism, fault tolerance, and scalability.
/// </summary>
public string ConsumerGroupId { get; }

/// <summary>
/// the maximum number of bytes in a message batch.
/// </summary>
public string ReceiveMessageMaxBytes { get; }
}
}
Loading

0 comments on commit 9a8efe2

Please sign in to comment.