Skip to content

Commit

Permalink
Added the ability to change the prefetch count of the queue system.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jroland committed Apr 23, 2012
1 parent 98a756d commit 1cd6461
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
10 changes: 7 additions & 3 deletions Source/EasyNetQ/RabbitBus.cs
Expand Up @@ -32,7 +32,7 @@ public class RabbitBus : IBus, IRawByteBus

// prefetchCount determines how many messages will be allowed in the local in-memory queue
// setting to zero makes this infinite, but risks an out-of-memory exception.
private const int prefetchCount = 1000;
private readonly ushort _prefetchCount;

public RabbitBus(
SerializeType serializeType,
Expand All @@ -41,7 +41,8 @@ public class RabbitBus : IBus, IRawByteBus
IConnectionFactory connectionFactory,
IEasyNetQLogger logger,
Func<string> getCorrelationId,
IConventions conventions = null)
IConventions conventions = null,
ushort prefetchCount = 1000)
{
if(serializeType == null)
{
Expand All @@ -64,6 +65,8 @@ public class RabbitBus : IBus, IRawByteBus
throw new ArgumentNullException("getCorrelationId");
}

_prefetchCount = prefetchCount;

// Use default conventions if none were supplied.
if (conventions == null)
conventions = new Conventions();
Expand All @@ -74,6 +77,7 @@ public class RabbitBus : IBus, IRawByteBus
this.serializer = serializer;
this.getCorrelationId = getCorrelationId;
this.conventions = conventions;


connection = new PersistentConnection(connectionFactory, logger);
connection.Connected += OnConnected;
Expand Down Expand Up @@ -217,7 +221,7 @@ public void SubscribeAsync<T>(string subscriptionId, Func<T, Task> onMessage)
modelList.Add( channel );
DeclarePublishExchange(channel, exchangeName);
channel.BasicQos(0, prefetchCount, false);
channel.BasicQos(0, _prefetchCount, false);
var queue = channel.QueueDeclare(
queueName, // queue
Expand Down
15 changes: 9 additions & 6 deletions Source/EasyNetQ/RabbitHutch.cs
Expand Up @@ -24,9 +24,9 @@ public static class RabbitHutch
/// <returns>
/// A new RabbitBus instance.
/// </returns>
public static IBus CreateBus(string connectionString)
public static IBus CreateBus(string connectionString, ushort prefetchCount = 1000)
{
return CreateBus(connectionString, new ConsoleLogger());
return CreateBus(connectionString, new ConsoleLogger(), prefetchCount);
}

/// <summary>
Expand All @@ -45,7 +45,7 @@ public static IBus CreateBus(string connectionString)
/// <returns>
/// A new RabbitBus instance.
/// </returns>
public static IBus CreateBus(string connectionString, IEasyNetQLogger logger)
public static IBus CreateBus(string connectionString, IEasyNetQLogger logger, ushort prefetchCount = 1000)
{
if(connectionString == null)
{
Expand All @@ -63,7 +63,8 @@ public static IBus CreateBus(string connectionString, IEasyNetQLogger logger)
connectionValues.VirtualHost,
connectionValues.UserName,
connectionValues.Password,
logger);
logger,
prefetchCount);
}

/// <summary>
Expand All @@ -87,7 +88,7 @@ public static IBus CreateBus(string connectionString, IEasyNetQLogger logger)
/// <returns>
/// A new RabbitBus instance.
/// </returns>
public static IBus CreateBus(string hostName, string virtualHost, string username, string password, IEasyNetQLogger logger)
public static IBus CreateBus(string hostName, string virtualHost, string username, string password, IEasyNetQLogger logger, ushort prefetchCount = 1000)
{
if(hostName == null)
{
Expand Down Expand Up @@ -128,7 +129,9 @@ public static IBus CreateBus(string hostName, string virtualHost, string usernam
new QueueingConsumerFactory(logger, consumerErrorStrategy),
connectionFactory,
logger,
CorrelationIdGenerator.GetCorrelationId);
CorrelationIdGenerator.GetCorrelationId,
null,
prefetchCount);
}

/// <summary>
Expand Down

0 comments on commit 1cd6461

Please sign in to comment.