Skip to content

Commit

Permalink
Defaulting the prefetch count to num threads
Browse files Browse the repository at this point in the history
Fixes #1
  • Loading branch information
andreasohlund committed Feb 6, 2015
1 parent 287f9f0 commit 4b8e93e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public class ConnectionConfigurationTests
}

[Test]
public void Should_default_the_prefetch_count() {
public void Should_not_default_the_prefetch_count() {
connectionString = ("host=localhost");
connectionConfiguration = parser.Parse(connectionString);
Assert.AreEqual(ConnectionConfiguration.DefaultPrefetchCount, connectionConfiguration.PrefetchCount);
Assert.AreEqual(0, connectionConfiguration.PrefetchCount);
}

[Test]
Expand Down
2 changes: 0 additions & 2 deletions src/NServiceBus.RabbitMQ/Config/ConnectionConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class ConnectionConfiguration : IConnectionConfiguration
{
public const ushort DefaultHeartBeatInSeconds = 5;
public const int DefaultDequeueTimeout = 1;
public const ushort DefaultPrefetchCount = 1;
public const ushort DefaultPort = 5672;
public static TimeSpan DefaultWaitTimeForConfirms = TimeSpan.FromSeconds(30);
IDictionary<string, object> clientProperties = new Dictionary<string, object>();
Expand Down Expand Up @@ -47,7 +46,6 @@ public ConnectionConfiguration()
Password = "guest";
RequestedHeartbeat = DefaultHeartBeatInSeconds;
DequeueTimeout = DefaultDequeueTimeout;
PrefetchCount = DefaultPrefetchCount;
MaxWaitTimeForConfirms = DefaultWaitTimeForConfirms;
RetryDelay = TimeSpan.FromSeconds(10);
SetDefaultClientProperties();
Expand Down
16 changes: 14 additions & 2 deletions src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ public void Init(Address address, TransactionSettings transactionSettings, Func<

public void Start(int maximumConcurrencyLevel)
{
if (receiveOptions.DefaultPrefetchCount > 0)
{
actualPrefetchCount = receiveOptions.DefaultPrefetchCount;
}
else
{
actualPrefetchCount = Convert.ToUInt16(maximumConcurrencyLevel);

Logger.InfoFormat("No prefetch count configured, defaulting to {0} (the configured concurrency level)", actualPrefetchCount);
}


var secondaryReceiveSettings = receiveOptions.GetSettings(workQueue);

actualConcurrencyLevel = maximumConcurrencyLevel + secondaryReceiveSettings.MaximumConcurrencyLevel;
Expand Down Expand Up @@ -127,7 +139,7 @@ void ConsumeMessages(object state)

using (var channel = connection.CreateModel())
{
channel.BasicQos(0, receiveOptions.PrefetchCount, false);
channel.BasicQos(0, actualPrefetchCount, false);

var consumer = new QueueingBasicConsumer(channel);

Expand Down Expand Up @@ -256,7 +268,7 @@ void Purge()
int actualConcurrencyLevel;
readonly IManageRabbitMqConnections connectionManager;
readonly ReceiveOptions receiveOptions;

ushort actualPrefetchCount;


class ConsumeParams
Expand Down
6 changes: 3 additions & 3 deletions src/NServiceBus.RabbitMQ/ReceiveOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ namespace NServiceBus.Transports.RabbitMQ
class ReceiveOptions
{
public MessageConverter Converter { get; private set; }
public ushort PrefetchCount { get; private set; }
public ushort DefaultPrefetchCount { get; private set; }
public int DequeueTimeout { get; private set; }
public bool PurgeOnStartup { get; private set; }
public string ConsumerTag { get; private set; }

public ReceiveOptions(Func<string, SecondaryReceiveSettings> getSecondaryReceiveSettings,
MessageConverter converter,
ushort prefetchCount,
ushort defaultPrefetchCount,
int dequeueTimeout,
bool purgeOnStartup,
string consumerTag)
{
Converter = converter;
PrefetchCount = prefetchCount;
DefaultPrefetchCount = defaultPrefetchCount;
DequeueTimeout = dequeueTimeout;
PurgeOnStartup = purgeOnStartup;
ConsumerTag = consumerTag;
Expand Down

0 comments on commit 4b8e93e

Please sign in to comment.