Skip to content

Commit

Permalink
Simplify concurrency for riders. Improve benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed Feb 15, 2023
1 parent afd8bb9 commit 864d694
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 27 deletions.
36 changes: 21 additions & 15 deletions src/MassTransit/Util/PartitionChannelExecutorPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,50 @@ namespace MassTransit.Util
public class PartitionChannelExecutorPool<T> :
IChannelExecutorPool<T>
{
readonly PartitionKeyProvider<T> _partitionKeyProvider;
readonly IHashGenerator _hashGenerator;
readonly ChannelExecutor[] _partitions;
readonly PartitionKeyProvider<T> _partitionKeyProvider;
readonly Lazy<ChannelExecutor>[] _partitions;

public PartitionChannelExecutorPool(PartitionKeyProvider<T> partitionKeyProvider, IHashGenerator hashGenerator, int concurrencyLimit,
int concurrentDeliveryLimit = 1)
{
_partitionKeyProvider = partitionKeyProvider;
_hashGenerator = hashGenerator;
_partitions = Enumerable.Range(0, concurrencyLimit)
.Select(x => new ChannelExecutor(concurrentDeliveryLimit))
.Select(x => new Lazy<ChannelExecutor>(() => new ChannelExecutor(concurrentDeliveryLimit)))
.ToArray();
}

public Task Push(T partition, Func<Task> handle, CancellationToken cancellationToken)
public async ValueTask DisposeAsync()
{
foreach (Lazy<ChannelExecutor> partition in _partitions)
{
if (partition.IsValueCreated)
await partition.Value.DisposeAsync().ConfigureAwait(false);
}
}

public Task Push(T partition, Func<Task> method, CancellationToken cancellationToken = default)
{
var executor = GetExecutor(partition);
return executor.Push(handle, cancellationToken);
var executor = GetChannelExecutor(partition);
return executor.Push(method, cancellationToken);
}

public Task Run(T partition, Func<Task> method, CancellationToken cancellationToken = default)
{
var executor = GetExecutor(partition);
var executor = GetChannelExecutor(partition);
return executor.Run(method, cancellationToken);
}

ChannelExecutor GetExecutor(T partition)
ChannelExecutor GetChannelExecutor(T partition)
{
if (_partitions.Length == 1)
return _partitions[0].Value;

var partitionKey = _partitionKeyProvider(partition);
var hash = partitionKey?.Length > 0 ? _hashGenerator.Hash(partitionKey) : 0;
var index = hash % _partitions.Length;
return _partitions[index];
}

public async ValueTask DisposeAsync()
{
foreach (var partition in _partitions)
await partition.DisposeAsync().ConfigureAwait(false);
return _partitions[index].Value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ public class EventHubDataReceiver :
Agent,
IEventHubDataReceiver
{
readonly CancellationTokenSource _checkpointTokenSource;
readonly EventProcessorClient _client;
readonly ReceiveEndpointContext _context;
readonly TaskCompletionSource<bool> _deliveryComplete;
readonly IReceivePipeDispatcher _dispatcher;
readonly EventProcessorClient _client;
readonly IChannelExecutorPool<ProcessEventArgs> _executorPool;
readonly CancellationTokenSource _checkpointTokenSource;
readonly IProcessorLockContext _lockContext;

public EventHubDataReceiver(ReceiveSettings receiveSettings, ReceiveEndpointContext context, ProcessorContext processorContext)
Expand Down Expand Up @@ -155,8 +155,8 @@ async Task ActiveAndActualAgentsCompleted(StopContext context)
class CombinedChannelExecutorPool :
IChannelExecutorPool<ProcessEventArgs>
{
readonly IChannelExecutorPool<ProcessEventArgs> _partitionExecutorPool;
readonly IChannelExecutorPool<ProcessEventArgs> _keyExecutorPool;
readonly IChannelExecutorPool<ProcessEventArgs> _partitionExecutorPool;

public CombinedChannelExecutorPool(IChannelExecutorPool<ProcessEventArgs> partitionExecutorPool, ReceiveSettings receiveSettings)
{
Expand Down
10 changes: 10 additions & 0 deletions tests/MassTransit.Benchmark/KafkaOptionSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,29 @@ public KafkaOptionSet()
{
Add<string>("h|host:", "The host name of the broker", x => Host = x);
Add<ushort>("pc|partitions:", "The number of partitions for the topic", x => PartitionCount = x);
Add<ushort>("kc|keys:", "The number of keys for the topic", x => KeysCount = x);
Add<ushort>("cc|consumers:", "The number of concurrent kafka consumers", x => ConcurrentConsumerLimit = x);
Add<string>("t|topic:", "The name of the topic to use", x => Topic = x);

Host = "localhost:9092";
Topic = "benchmark-topic";
PartitionCount = 2;
KeysCount = 20;
ConcurrentConsumerLimit = 1;
}

public string Host { get; set; }
public string Topic { get; set; }
public ushort PartitionCount { get; set; }
public ushort KeysCount { get; set; }
public ushort ConcurrentConsumerLimit { get; set; }

public void ShowOptions()
{
Console.WriteLine("Host: {0}", Host);
Console.WriteLine("Topic: {0}", Topic);
Console.WriteLine("Partitions Count: {0}", PartitionCount);
Console.WriteLine("Keys Count: {0}", KeysCount);
Console.WriteLine("Concurrent Consumers: {0}", ConcurrentConsumerLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ namespace MassTransitBenchmark.Latency;
using System.Threading.Tasks;
using BusOutbox;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using MassTransit;
using MassTransit.KafkaIntegration;
using MassTransit.Middleware;
using Microsoft.Extensions.DependencyInjection;


public class KafkaMessageLatencyTransport :
IMessageLatencyTransport
{
readonly IAdminClient _adminClient;
readonly ClientConfig _clientConfig;
readonly Murmur3UnsafeHashGenerator _hashGenerator;
readonly KafkaKeyResolver<int, LatencyTestMessage> _keyResolver;
readonly KafkaOptionSet _options;
readonly IPipe<KafkaSendContext<LatencyTestMessage>> _partitionPipe;
readonly IMessageLatencySettings _settings;
ITopicProducer<LatencyTestMessage> _producer;
ServiceProvider _provider;
Expand All @@ -21,15 +29,24 @@ public KafkaMessageLatencyTransport(KafkaOptionSet options, IMessageLatencySetti
{
_options = options;
_settings = settings;
_partitionPipe = GetPartitionPipe();
_keyResolver = GetKeyResolver();
_hashGenerator = new Murmur3UnsafeHashGenerator();
_clientConfig = new ClientConfig { BootstrapServers = _options.Host };

_adminClient = new AdminClientBuilder(new AdminClientConfig(_clientConfig))
.Build();
}

public Task Send(LatencyTestMessage message)
{
return _producer.Produce(message);
return _producer.Produce(message, _partitionPipe);
}

public async Task Start(Action<IReceiveEndpointConfigurator> callback, IReportConsumerMetric reportConsumerMetric)
{
await CreateTopic();

_provider = new ServiceCollection()
.AddTextLogger(Console.Out)
.AddSingleton(reportConsumerMetric)
Expand All @@ -41,20 +58,17 @@ public async Task Start(Action<IReceiveEndpointConfigurator> callback, IReportCo
{
r.AddConsumer<MessageLatencyConsumer>();
r.AddProducer<LatencyTestMessage>(_options.Topic);
r.AddProducer(_options.Topic, _keyResolver);
r.UsingKafka((context, k) =>
r.UsingKafka(_clientConfig, (context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<LatencyTestMessage>(_options.Topic, nameof(KafkaMessageLatencyTransport), e =>
{
e.PrefetchCount = _settings.PrefetchCount;
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.CreateIfMissing(p =>
{
p.NumPartitions = _options.PartitionCount;
});
if (_options.ConcurrentConsumerLimit > 0)
e.ConcurrentConsumerLimit = _options.ConcurrentConsumerLimit;
if (_settings.ConcurrencyLimit > 0)
e.ConcurrentMessageLimit = _settings.ConcurrencyLimit;
Expand All @@ -77,5 +91,72 @@ public async ValueTask DisposeAsync()
await _scope.DisposeAsync();

await _provider.StopHostedServices();

await DeleteTopic();

_adminClient.Dispose();
}

async Task DeleteTopic()
{
try
{
await _adminClient.DeleteTopicsAsync(new[] { _options.Topic });
}
catch (DeleteTopicsException)
{
}
}

async Task CreateTopic()
{
try
{
await _adminClient.CreateTopicsAsync(new[]
{
new TopicSpecification
{
Name = _options.Topic,
NumPartitions = _options.PartitionCount,
ReplicationFactor = 1
}
});
}
catch (CreateTopicsException)
{
}

// Adding some delays for kafka to settle
await Task.Delay(TimeSpan.FromMilliseconds(100));
}

KafkaKeyResolver<int, LatencyTestMessage> GetKeyResolver()
{
if (_options.KeysCount > 1)
{
return context =>
{
var partitionKey = context.Message.CorrelationId.ToByteArray();
var hash = partitionKey?.Length > 0 ? _hashGenerator.Hash(partitionKey) : 0;
return (int)(hash % _options.KeysCount);
};
}

return _ => 1;
}

IPipe<KafkaSendContext<LatencyTestMessage>> GetPartitionPipe()
{
if (_options.PartitionCount > 1)
{
return Pipe.Execute<KafkaSendContext<LatencyTestMessage>>(ctx =>
{
var partitionKey = ctx.Message.CorrelationId.ToByteArray();
var hash = partitionKey?.Length > 0 ? _hashGenerator.Hash(partitionKey) : 0;
ctx.Partition = (int)(hash % _options.PartitionCount);
});
}

return Pipe.Empty<KafkaSendContext<LatencyTestMessage>>();
}
}
17 changes: 17 additions & 0 deletions tests/MassTransit.Benchmark/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,20 @@ services:
- "SA_PASSWORD=Password12!"
ports:
- 1433:1433

redpanda:
image: docker.redpanda.com/vectorized/redpanda:latest
command:
- redpanda start
- --smp 1
- --overprovisioned
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr localhost:8082
ports:
- 8081:8081
- 8082:8082
- 9092:9092
- 9644:9644
- 29092:29092

0 comments on commit 864d694

Please sign in to comment.