Skip to content

Consumer

Daniel Blankensteiner edited this page Feb 21, 2021 · 3 revisions

Consumer

Need to get messages from Pulsar and acknowledge them, so that Pulsar can keep track of your progress? Then a consumer is what we want.

Creating a consumer

Start with creating a client.

When creating a consumer we have these options:

  • ConsumerName - Set the consumer name. This is optional.
  • InitialPosition - Set initial position for the subscription. The default is 'Latest'.
  • PriorityLevel - Set the priority level for the shared subscription consumer. The default is 0.
  • MessagePrefetchCount - Number of messages that will be prefetched. The default is 1000.
  • ReadCompacted - Whether to read from the compacted topic. The default is 'false'.
  • SubscriptionName - Set the subscription name for this consumer. This is required.
  • SubscriptionType - Set the subscription type for this consumer. The default is 'Exclusive'.
  • Topic - Set the topic for this consumer. This is required.

Only the subscription name and topic are required.

Create a consumer using the builder

var consumer = client.NewConsumer()
                     .SubscriptionName("MySubscription")
                     .Topic("persistent://public/default/mytopic")
                     .Create();

Create a consumer without the builder

var options = new ConsumerOptions("MySubscription", "persistent://public/default/mytopic");
var consumer = client.CreateConsumer(options);

Receiving messages

We can either 'Receive' a single message or have an IAsyncEnumerable for 'Messages'.

Receive a single message

If we just want a single message.

var message = await consumer.Receive();
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));

Receive messages as an IAsyncEnumerable

There's an extension method in "DotPulsar.Extensions" for reading messages as an IAsyncEnumerable.

await foreach (var message in consumer.Messages())
{
    Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}

The message has these properties:

  • MessageId - The id of the message.
  • Data - The data payload as a ReadOnlySequence.
  • EventTime - Set the event time of the message.
  • Key - Set the key of the message for routing policy.
  • KeyBytes - Set the key of the message for routing policy.
  • OrderingKey - Set the ordering key of the message for message dispatch in SubscriptionType.KeyShared mode.
  • ProducerName - The name of the producer that produced the message.
  • PublishTime - When the message was published.
  • SequenceId - Set the sequence id of the message.

Acknowledging messages

We have two options when acknowledging messages.

Single acknowledgment

To just acknowledge a single message:

await consumer.Acknowledge(message);

Cumulative acknowledgment

To acknowledge all messages up to and include then provided message:

await consumer.AcknowledgeCumulative(message);

Please do note: We can't do this if the subscription is shared!

Seeking

Need to reset the cursor? What's easy:

await consumer.Seek(MessageId.Earliest); // Provide the message-id to reset to.

Please do note: When seeking we will shortly be disconnected, as that's how Pulsar handles seek requests.

Unsubscribing

As easy as:

await consumer.Unsubscribe();

Please do note: We can't use the consumer after unsubscribing and therefore should dispose it.

Monitoring consumer state

Monitoring the state is recommended and described here.

Disposing the consumer

The Consumer implements IAsyncDisposable and should be disposed when it's no longer needed. Disposing the client will dispose all consumers.