Skip to content

ConsumerMethod

Brian Lehnen edited this page Apr 8, 2026 · 2 revisions

Consuming Method Expressions

You may consume method expressions in a couple of different ways. Either via dedicated worker threads, or via a pool of shared threads between multiple queues ConsumerMethodAsync

Creating the queue

You'll need the following

  • The transport you're connecting to, determined by the transport init module you specify when creating the container.
  • The connection string to the transport
  • The name of the queue
var queueConnection = new QueueConnection(queueName, connectionString);
using (var schedulerContainer = new SchedulerContainer())
{
    using (var scheduler = schedulerContainer.CreateTaskScheduler())
    {
        var factory = schedulerContainer.CreateTaskFactory(scheduler);
        scheduler.Start();
        using (var queueContainer = new QueueContainer<QueueInit>())
        {
            using (var queue = queueContainer.CreateConsumerMethodQueueScheduler(queueConnection, factory))
            {
                queue.Configuration.Worker.WorkerCount = 4;

                var notifications = new ConsumerQueueNotifications(
                    (n) => logger.LogError($"Processing has failed {Environment.NewLine}{n.Error}"),
                    (n) => logger.LogError($"Processing has failed to dequeue a message {Environment.NewLine}{n.Error}"),
                    (n) => logger.LogError($"Processing has failed {Environment.NewLine}{n.MessageId}{Environment.NewLine}{n.Error}"),
                    (n) => logger.LogError($"Processing has triggered a poison message {Environment.NewLine}{n.MessageId}{Environment.NewLine}{n.Error}"),
                    (n) => logger.LogWarning($"Processing has triggered a rollback {Environment.NewLine}{n.MessageId}{Environment.NewLine}{n.Error}"),
                    (n) => logger.LogInformation($"Processing completed {n.MessageId}"));

                queue.Start(notifications);
                Console.WriteLine("Processing messages - press any key to stop");
                Console.ReadKey((true));
            }
        }
    }
}

Unlike the typed-message consumer, no delegate for message processing is used. Instead, the lambda expression is executed. All types must be resolvable, or errors will be thrown.

You may want to check for the following conditions in your code

  • The queue shutting down. A cancel token is provided for this.
    • If the transport supports rollback, you may throw an operation canceled exception to requeue the message

For example, here is how you can check to see if cancellation is requested, and also force a requeue. Note that we are verifying that the transport supports rollbacks first.

You will have to pass the IWorkerNotifications instance to your method expression in order to check these flags.

i.e.

var result = queue.Send((m, w) => Console.WriteLine(w.TransportSupportsRollback));
if (w.TransportSupportsRollback && w.MessageCancellation.Token.IsCancellationRequested)
{
	logger.LogDebug("Cancel has been requested - aborting");
	w.MessageCancellation.Token.ThrowIfCancellationRequested();
}
Stopping the queue

To stop the queue, call dispose on it.

queue.Dispose();

Calling dispose on the queue container will dispose all queues created by that container as well.

Dispose is a blocking operation. Depending on your configuration settings and how quickly your message consuming code responds to cancels, it may take a while to return.

For a complete working example, see SQLServerConsumerLinq in the samples repository.

Clone this wiki locally