Skip to content

Commit

Permalink
feat: evolve worker distribution strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch authored and joelfoliveira committed Nov 17, 2023
1 parent d5a9c21 commit f0eb873
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 104 deletions.
Expand Up @@ -136,19 +136,19 @@ public interface IConsumerConfigurationBuilder
/// <summary>
/// Sets the strategy to choose a worker when a message arrives
/// </summary>
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam>
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam>
/// <param name="factory">A factory to create the instance</param>
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory)
where T : class, IDistributionStrategy;
IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>(Factory<T> factory)
where T : class, IWorkerDistributionStrategy;

/// <summary>
/// Sets the strategy to choose a worker when a message arrives
/// </summary>
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam>
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam>
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy;
IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>()
where T : class, IWorkerDistributionStrategy;

/// <summary>
/// Configures the consumer for manual message completion.
Expand Down Expand Up @@ -191,4 +191,4 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
/// <returns></returns>
IConsumerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs);
}
}
}

Check warning on line 194 in src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

File is required to end with a single newline character
26 changes: 0 additions & 26 deletions src/KafkaFlow.Abstractions/IDistributionStrategy.cs

This file was deleted.

23 changes: 23 additions & 0 deletions src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs
@@ -0,0 +1,23 @@
namespace KafkaFlow;

using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// An interface used to create a distribution strategy
/// </summary>
public interface IWorkerDistributionStrategy
{
/// <summary>
/// Initializes the distribution strategy, this method is called when a consumer is started
/// </summary>
/// <param name="workers">List of workers to be initialized</param>
void Initialize(IReadOnlyList<IWorker> workers);

/// <summary>
/// Retrieves an available worker based on the provided distribution strategy context.
/// </summary>
/// <param name="context">The distribution strategy context containing message and consumer details.</param>
/// <returns>The selected <see cref="IWorker"/> instance.</returns>
ValueTask<IWorker> GetWorkerAsync(WorkerDistributionContext context);
}
5 changes: 5 additions & 0 deletions src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
Expand Up @@ -7,4 +7,9 @@
<Description>Contains all KafkaFlow extendable interfaces</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Memory" Version="4.5.5" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

</Project>
57 changes: 57 additions & 0 deletions src/KafkaFlow.Abstractions/WorkerDistributionContext.cs
@@ -0,0 +1,57 @@
namespace KafkaFlow;

using System;
using System.Threading;

/// <summary>
/// Represents a strategy context for distributing workers based on specific message and consumer details.
/// </summary>
public ref struct WorkerDistributionContext
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkerDistributionContext"/> struct.
/// </summary>
/// <param name="consumerName">Name of the consumer.</param>
/// <param name="topic">Topic associated with the message.</param>
/// <param name="partition">Partition of the topic.</param>
/// <param name="rawMessageKey">Raw key of the message.</param>
/// <param name="consumerStoppedCancellationToken">A cancellation token that is cancelled when the consumer has stopped</param>
public WorkerDistributionContext(
string consumerName,
string topic,
int partition,
ReadOnlyMemory<byte>? rawMessageKey,
CancellationToken consumerStoppedCancellationToken)
{
this.ConsumerName = consumerName;
this.Topic = topic;
this.Partition = partition;
this.RawMessageKey = rawMessageKey;
this.ConsumerStoppedCancellationToken = consumerStoppedCancellationToken;
}

/// <summary>
/// Gets the name of the consumer.
/// </summary>
public string ConsumerName { get; }

/// <summary>
/// Gets the topic associated with the message.
/// </summary>
public string Topic { get; }

/// <summary>
/// Gets the partition number of the topic.
/// </summary>
public int Partition { get; }

/// <summary>
/// Gets the raw key of the message.
/// </summary>
public ReadOnlyMemory<byte>? RawMessageKey { get; }

/// <summary>
/// Gets the cancellation token that is cancelled when the consumer has stopped
/// </summary>
public CancellationToken ConsumerStoppedCancellationToken { get; }
}
4 changes: 2 additions & 2 deletions src/KafkaFlow/Configuration/ConsumerConfiguration.cs
Expand Up @@ -20,7 +20,7 @@ internal class ConsumerConfiguration : IConsumerConfiguration
TimeSpan workersCountEvaluationInterval,
int bufferSize,
TimeSpan workerStopTimeout,
Factory<IDistributionStrategy> distributionStrategyFactory,
Factory<IWorkerDistributionStrategy> distributionStrategyFactory,
IReadOnlyList<MiddlewareConfiguration> middlewaresConfigurations,
bool autoMessageCompletion,
bool noStoreOffsets,
Expand Down Expand Up @@ -69,7 +69,7 @@ internal class ConsumerConfiguration : IConsumerConfiguration
"The value must be greater than 0");
}

public Factory<IDistributionStrategy> DistributionStrategyFactory { get; }
public Factory<IWorkerDistributionStrategy> DistributionStrategyFactory { get; }

public IReadOnlyList<MiddlewareConfiguration> MiddlewaresConfigurations { get; }

Expand Down
10 changes: 5 additions & 5 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Expand Up @@ -36,7 +36,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private ConsumerInitialState initialState = ConsumerInitialState.Running;
private int statisticsInterval;

private Factory<IDistributionStrategy> distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
private Factory<IWorkerDistributionStrategy> distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
private TimeSpan autoCommitInterval = TimeSpan.FromSeconds(5);

private ConsumerCustomFactory customFactory = (consumer, _) => consumer;
Expand Down Expand Up @@ -157,15 +157,15 @@ public IConsumerConfigurationBuilder WithWorkerStopTimeout(TimeSpan timeout)
return this;
}

public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory)
where T : class, IDistributionStrategy
public IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>(Factory<T> factory)
where T : class, IWorkerDistributionStrategy
{
this.distributionStrategyFactory = factory;
return this;
}

public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
where T : class, IDistributionStrategy
public IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>()
where T : class, IWorkerDistributionStrategy
{
this.DependencyConfigurator.AddTransient<T>();
this.distributionStrategyFactory = resolver => resolver.Resolve<T>();
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Configuration/IConsumerConfiguration.cs
Expand Up @@ -13,7 +13,7 @@ public interface IConsumerConfiguration
/// <summary>
/// Gets the consumer worker distribution strategy
/// </summary>
Factory<IDistributionStrategy> DistributionStrategyFactory { get; }
Factory<IWorkerDistributionStrategy> DistributionStrategyFactory { get; }

/// <summary>
/// Gets the consumer middlewares configurations
Expand Down
16 changes: 11 additions & 5 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Expand Up @@ -14,15 +14,15 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool
private readonly IDependencyResolver consumerDependencyResolver;
private readonly IMiddlewareExecutor middlewareExecutor;
private readonly ILogHandler logHandler;
private readonly Factory<IDistributionStrategy> distributionStrategyFactory;
private readonly Factory<IWorkerDistributionStrategy> distributionStrategyFactory;
private readonly IOffsetCommitter offsetCommitter;

private readonly Event workerPoolStoppedSubject;

private TaskCompletionSource<object> startedTaskSource = new();
private List<IConsumerWorker> workers = new();

private IDistributionStrategy distributionStrategy;
private IWorkerDistributionStrategy distributionStrategy;
private IOffsetManager offsetManager;

public ConsumerWorkerPool(
Expand Down Expand Up @@ -85,7 +85,7 @@ public async Task StartAsync(IReadOnlyCollection<TopicPartition> partitions, int
.ConfigureAwait(false);

this.distributionStrategy = this.distributionStrategyFactory(this.consumerDependencyResolver);
this.distributionStrategy.Init(this.workers.AsReadOnly());
this.distributionStrategy.Initialize(this.workers.AsReadOnly());

this.startedTaskSource.TrySetResult(null);
}
Expand Down Expand Up @@ -130,7 +130,13 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati
await this.startedTaskSource.Task.ConfigureAwait(false);

var worker = (IConsumerWorker)await this.distributionStrategy
.GetWorkerAsync(message.Message.Key, stopCancellationToken)
.GetWorkerAsync(
new WorkerDistributionContext(
this.consumer.Configuration.ConsumerName,
message.Topic,
message.Partition.Value,
message.Message.Key,
stopCancellationToken))
.ConfigureAwait(false);

if (worker is null)
Expand Down Expand Up @@ -166,4 +172,4 @@ private MessageContext CreateMessageContext(ConsumeResult<byte[], byte[]> messag
return context;
}
}
}
}
@@ -1,44 +1,42 @@
namespace KafkaFlow.Consumers.DistributionStrategies
namespace KafkaFlow.Consumers.DistributionStrategies;

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

/// <summary>
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed)
/// </summary>
public class BytesSumDistributionStrategy : IWorkerDistributionStrategy
{
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
private IReadOnlyList<IWorker> workers;

/// <summary>
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed)
/// </summary>
public class BytesSumDistributionStrategy : IDistributionStrategy
/// <inheritdoc />
public void Initialize(IReadOnlyList<IWorker> workers)
{
private IReadOnlyList<IWorker> workers;
this.workers = workers;
}

/// <inheritdoc />
public void Init(IReadOnlyList<IWorker> workers)
/// <inheritdoc />
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionContext context)
{
if (context.RawMessageKey is null || this.workers.Count == 1)
{
this.workers = workers;
return new ValueTask<IWorker>(this.workers[0]);
}

/// <inheritdoc />
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken)
{
if (partitionKey is null || this.workers.Count == 1)
{
return Task.FromResult(this.workers[0]);
}
var bytesSum = 0;

var bytesSum = 0;

for (int i = 0; i < partitionKey.Length; i++)
{
bytesSum += partitionKey[i];
}

return Task.FromResult(
cancellationToken.IsCancellationRequested
? null
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count));
for (var i = 0; i < context.RawMessageKey.Value.Length; i++)
{
bytesSum += context.RawMessageKey.Value.Span[i];
}

return new ValueTask<IWorker>(
context.ConsumerStoppedCancellationToken.IsCancellationRequested
? null
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count));
}
}
@@ -1,32 +1,30 @@
namespace KafkaFlow.Consumers.DistributionStrategies
namespace KafkaFlow.Consumers.DistributionStrategies;

using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;

/// <summary>
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee
/// </summary>
public class FreeWorkerDistributionStrategy : IWorkerDistributionStrategy
{
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>();

/// <summary>
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee
/// </summary>
public class FreeWorkerDistributionStrategy : IDistributionStrategy
/// <inheritdoc />
public void Initialize(IReadOnlyList<IWorker> workers)
{
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>();

/// <inheritdoc />
public void Init(IReadOnlyList<IWorker> workers)
foreach (var worker in workers)
{
foreach (var worker in workers)
{
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker)));
this.freeWorkers.Writer.TryWrite(worker);
}
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker)));
this.freeWorkers.Writer.TryWrite(worker);
}
}

/// <inheritdoc />
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken)
{
return this.freeWorkers.Reader.ReadAsync(cancellationToken).AsTask();
}
/// <inheritdoc />
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionContext context)
{
return this.freeWorkers.Reader.ReadAsync(context.ConsumerStoppedCancellationToken);
}
}
}

0 comments on commit f0eb873

Please sign in to comment.