Skip to content

Monitoring

entvex edited this page Jan 25, 2024 · 8 revisions

Monitoring

Consumers, producers, and readers all have states that can be monitored.

Monitoring the state is recommended because DotPulsar will retry/reconnect indefinitely on certain fault scenarios (while first switching to the 'Disconnected' state). Some fault conditions are considered final and they will cause DotPulsar to throw an exception and move to a final state, after which it should be disposed.

There are two ways to monitor the state:

  • Set a StateChangeHandler
  • Using OnStateChangeFrom/OnStateChangeTo or StateChangedFrom/StatedChangeTo

Most will, like the sample applications in the repository, just use the first option.

States

Let’s take a look at the different states before we dive into monitoring them.

When a topic is partitioned, the number of sub-consumers, producers, or readers created is equal to the number of partitions with each of them tracking their own internal state. Therefore, a non-partitioned topic will only have one, while a topic with three partitions will have three.

It’s important to keep in mind that the state of one of the sub-consumers, producers, or readers could affect the overall state. For example, if a consumer connected to a topic with three partitions and all of them are in the Active state, the overall state is Active. But if one is Faulted, all of them are.

The state ReachedEndOfTopic can co-exist with other states.

Consumer states

  • Active (All is well)
  • Inactive (All is well. The subscription type is 'Failover' and we are not the active consumer)
  • Closed (The consumer or PulsarClient has been disposed)
  • Disconnected (The connection was lost and attempts are being made to reconnect)
  • PartiallyConnected (Some of the sub-consumers are disconnected)
  • Faulted (An unrecoverable error has occurred)
  • ReachedEndOfTopic (No more messages will be delivered)
Consumer state
Active Inactive Disconnected Overall state
X X PartiallyConnected
X X Inactive
X X PartiallyConnected
X X X PartiallyConnected

Producer states

  • Closed (The producer or PulsarClient has been disposed)
  • Connected (All is well)
  • Disconnected (The connection was lost and attempts are being made to reconnect)
  • PartiallyConnected (Some of the sub-producers are disconnected)
  • Faulted (An unrecoverable error has occurred)
  • WaitingForExclusive (The producer is connected but waiting for exclusive access)
  • Fenced (The producer has been fenced by the broker)
  • ReachedEndOfTopic (No more messages will be delivered)
Producer state
Connected WaitingForExclusive Disconnected Overall state
X X PartiallyConnected
X X WaitingForExclusive
X X PartiallyConnected
X X X PartiallyConnected

If one sub-producer is fenced, then all sub-producers are fenced.

Reader states

  • Closed (The reader or PulsarClient has been disposed)
  • Connected: (All is well)
  • Disconnected (The connection was lost and attempts are being made to reconnect)
  • PartiallyConnected (Some of the sub-readers are disconnected)
  • Faulted (An unrecoverable error has occurred)
  • ReachedEndOfTopic (No more messages will be delivered)
Reader state
Connected Disconnected Overall state
X X PartiallyConnected

How to monitor the state using StateChangeHandler

This is the easiest option and can be used in three ways:

  • Provide an implementation of IHandleStateChanged
  • Provide a handler using Action<ConsumerStateChanged, CancellationToken>
  • Provide a handler using Func<ConsumerStateChanged, CancellationToken, ValueTask>

The state is always first 'Disconnected' and the handler will be called for the first time when the state moves away from 'Disconnected'. Once the state is final the handler is called for the last time.

The sample applications (Consuming, Reading, and Producing) are using the option with Action<ConsumerStateChanged, CancellationToken>.

await using var consumer = client.NewConsumer()
                                 .StateChangedHandler(Monitor)
                                 .SubscriptionName("MySubscription")
                                 .Topic("persistent://public/default/mytopic")
                                 .Create();

Monitoring consumer state

private static void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
{
    Console.WriteLine($"The consumer for topic '{stateChanged.Consumer.Topic}' changed state to '{stateChanged.ConsumerState}'");
}

Monitoring reader state

private static void Monitor(ReaderStateChanged stateChanged, CancellationToken cancellationToken)
{
    Console.WriteLine($"The reader for topic '{stateChanged.Reader.Topic}' changed state to '{stateChanged.ReaderState}'");
}

Monitoring producer state

private static void Monitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
{
    Console.WriteLine($"The producer for topic '{stateChanged.Producer.Topic}' changed state to '{stateChanged.ProducerState}'");
}

How to monitor the state using OnStateChange[From/To] or StateChanged[From/To]

There is a small difference between OnStateChange[From/To] and StateChanged[From/To].

  • OnStateChangeFrom and OnStateChangeTo will return the new state.
  • StateChangedFrom and StateChangedTo will return an object containing the new state and a reference to the Reader, Consumer, or Producer that changed state.

Monitoring the state is easy, just call StateChangedFrom or StateChangedTo on a consumer, producer or reader. Let's see how:

var stateChanged = await consumer.StateChangedFrom(ConsumerState.Active, cancellationToken);

The variable 'stateChanged' will tell us both which consumer changed state and to which state. Some states are final, meaning the state can no longer change. For consumers 'Closed', 'Faulted' and 'ReachedEndOfTopic' are final states. When the consumer enters a final state, all monitoring tasks are completed. So if we are monitoring going to 'Disconnected' and the consumer is 'Closed', then the task will complete and return 'Closed'.

Monitoring consumer state

private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
{
    var state = ConsumerState.Disconnected;

    while (!cancellationToken.IsCancellationRequested)
    {
        var stateChanged = await consumer.StateChangedFrom(state, cancellationToken);
        state = stateChanged.ConsumerState;

        Console.WriteLine($"The consumer for topic '{stateChanged.Consumer.Topic}' changed state to '{stateChanged.ConsumerState}'");

        if (consumer.IsFinalState(state))
            return;
    }
}

Monitoring reader state

private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
{
    var state = ReaderState.Disconnected;

    while (!cancellationToken.IsCancellationRequested)
    {
        var stateChanged = await reader.StateChangedFrom(state, cancellationToken);
        state = stateChanged.ReaderState;

        Console.WriteLine($"The reader for topic '{stateChanged.Reader.Topic}' changed state to '{stateChanged.ReaderState}'");

        if (reader.IsFinalState(state))
            return;
    }
}

Monitoring producer state

private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
{
    var state = ProducerState.Disconnected;

    while (!cancellationToken.IsCancellationRequested)
    {
        var stateChanged = await producer.StateChangedFrom(state, cancellationToken);
        state = stateChanged.ProducerState;

        Console.WriteLine($"The producer for topic '{stateChanged.Producer.Topic}' changed state to '{stateChanged.ProducerState}'");

        if (producer.IsFinalState(state))
            return;
    }
}

Delay the monitor using DelayedStateMonitor

The DelayedStateMonitor extension method sets up a delayed state monitor for the Consumer, Reader, or Producer. This is the recommended way of ignoring short disconnects that are expected when working with a distributed system.

    _ = consumer.DelayedStateMonitor(       // Recommended way of ignoring the short disconnects expected when working with a distributed system
        ConsumerState.Active,               // Operational state
        TimeSpan.FromSeconds(5),            // The amount of time allowed in non-operational state before we act
        _logger.ConsumerLostConnection,     // Invoked if we are NOT back in operational state after 5 seconds
        _logger.ConsumerRegainedConnection, // Invoked when we are in operational state again
        cancellationToken);