Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make prefetch configurable #1188

Merged
merged 1 commit into from Jun 25, 2018
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+10 −9
Diff settings

Always

Just for now

Make prefetch configurable

  • Loading branch information...
DeonHeyns committed Jun 25, 2018
commit 5f1cde554250af24329197dacde082e634e57967
@@ -1,5 +1,4 @@

using RabbitMQ.Client;
using RabbitMQ.Client;
using RabbitMQ.Util;

namespace ServiceStack.RabbitMq
@@ -8,7 +7,7 @@ public class RabbitMqBasicConsumer : DefaultBasicConsumer
{
readonly SharedQueue<BasicGetResult> queue;

public RabbitMqBasicConsumer(IModel model)
public RabbitMqBasicConsumer(IModel model)
: this(model, new SharedQueue<BasicGetResult>()) { }

public RabbitMqBasicConsumer(IModel model, SharedQueue<BasicGetResult> queue)
@@ -15,6 +15,10 @@ public class RabbitMqProducer : IMessageProducer, IOneWayClient
public Action OnPublishedCallback { get; set; }
public Action<string, IBasicProperties, IMessage> PublishMessageFilter { get; set; }
public Action<string, BasicGetResult> GetMessageFilter { get; set; }
//http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
//http://www.rabbitmq.com/amqp-0-9-1-reference.html
public uint PrefetchCount { get; set; } = 20;
public ushort PrefetchSize { get; set; } = 0;

private IConnection connection;
public IConnection Connection
@@ -37,9 +41,7 @@ public IModel Channel
if (channel == null || !channel.IsOpen)
{
channel = Connection.OpenChannel();
//http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
//http://www.rabbitmq.com/amqp-0-9-1-reference.html
channel.BasicQos(prefetchCount: 20, prefetchSize: 0, global: false);
channel.BasicQos(PrefetchCount, PrefetchSize, global: false);
}
return channel;
}
@@ -123,9 +125,9 @@ public virtual void PublishMessage(string exchange, string routingKey, IBasicPro
{
try
{
// In case of server named queues (client declared queue with channel.declare()), assume queue already exists
// In case of server named queues (client declared queue with channel.declare()), assume queue already exists
//(redeclaration would result in error anyway since queue was marked as exclusive) and publish to default exchange
if (routingKey.IsServerNamedQueue())
if (routingKey.IsServerNamedQueue())
{
Channel.BasicPublish("", routingKey, basicProperties, body);
}
@@ -200,7 +202,7 @@ public virtual void Dispose()
catch (Exception ex)
{
Log.Error("Error trying to dispose RabbitMqProducer model", ex);
}
}
channel = null;
}
if (connection != null)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.