Skip to content
Olivier Coanet edited this page Aug 17, 2022 · 11 revisions

Disruptor

The type you are probably the most interested in is Disruptor<T>. It contains a ring buffer and has convenient methods to register consumers. The ring buffer has a fixed capacity and cannot be resized, so you need to take a moment to think about the appropriate capacity for your use case.

// Size of the ring buffer, must be power of 2.
const int bufferSize = 1024;

// Create the disruptor
var disruptor = new Disruptor<SampleEvent>(() => new SampleEvent(), bufferSize);

// Add a consumer
disruptor.HandleEventsWith(new SampleEventHandler());

// Start the disruptor (start the consumer threads)
disruptor.Start();

var ringBuffer = disruptor.RingBuffer;

// Use the ring buffer to publish events

Events

The generic parameter of the disruptor is the event type, which is the type of the messages that will be produced and consumed. This type is a POCO, it has no required base type or interface. It must simply be a class in the Disruptor or a struct in the ValueDisruptor.

public class SampleEvent
{
    public int Id { get; set; }
    public double Value { get; set; }
    public DateTime TimestampUtc { get; set; }
}

The events are preallocated, so they must be mutable. It might be a good idea to create initialization methods to avoid making all setters public and to make explicit which field must be initialized.

public class SampleEvent
{
    public int Id { get; private set; }
    public double Value { get; private set; }
    public DateTime TimestampUtc { get; private set; }

    public void Initialize(int id, double value, DateTime timestampUtc)
    {
        Id = id;
        Value = value;
        TimestampUtc = timestampUtc;
    }
}

Consumers

To add a consumer, you need to create a type that implements IEventHandler<T>. There are also other event handler types.

public class SampleEventHandler : IEventHandler<SampleEvent>
{
    public void OnEvent(SampleEvent data, long sequence, bool endOfBatch)
    {
        Console.WriteLine($"Event: {data.Id} => {data.Value}");
    }
}

You need to register your handlers in the disruptor. You can configure chains, and even graphs of consumers.

Example 1: Consumer chain

In this setup, events will be processed by Handler1 and then Handler2.

flowchart LR
    subgraph Step 1
    Handler1
    end
    subgraph Step 2
    Handler2
    end
    Handler1 --> Handler2
// Configure a simple chain of consumers.
disruptor.HandleEventsWith(new Handler1()).Then(new Handler2());

Example 2: Consumer graph

In this setup, events will be processed by Handler1, then by Handler2A and Handler2B in parallel, and finally by Handler3. Handler3 will not start processing an event until it was processed by both Handler2A and Handler2B.

flowchart LR
    subgraph Step 1
    Handler1
    end
    subgraph Step 2
    Handler2A
    Handler2B
    end
    subgraph Step 3
    Handler3
    end
    Handler1 --> Handler2A
    Handler1 --> Handler2B
    Handler2A --> Handler3
    Handler2B --> Handler3
// Configure a graph of consumers.
var step1 = disruptor.HandleEventsWith(new Handler1());
var step2A = step1.Then(new Handler2A());
var step2B = step1.Then(new Handler2B());
step2A.And(step2B).Then(new Handler3());

Once your event handlers are setup, you can start your disruptor and then begin publishing events.

What happens if you start publishing events before registering event handlers?

It is incorrect: the handlers will only consume the events that are published after registration. Events published on a ring buffer without consumers are lost.

What happens if you start publishing events after registering handlers but before starting the disruptor?

It is possible, but at some point the ring buffer will get full and your publisher will get blocked by the backpressure.

Producers

The disruptor ring buffer is used to publish events. There are multiple publication options available.

// (1) Claim the next sequence
var sequence = _ringBuffer.Next();
try
{
    // (2) Get and configure the event for the sequence
    var data = _ringBuffer[sequence];
    data.Initialize(id, value, DateTime.UtcNow);
}
finally
{
    // (3) Publish the event
    _ringBuffer.Publish(sequence);
}

Publishing events involves multi-threading coordination between producers and consumers, but also among parallel producers. If your application has only a single producer, you should configure the disruptor accordingly to remove the unneeded coordination and benefit from a performance gain.

// Create a disruptor for single producer usage.
var producerType = ProducerType.Single;
var taskScheduler = TaskScheduler.Default;
var waitStrategy = new BlockingWaitStrategy();
var factory = () => new SampleEvent();

var disruptor = new Disruptor<SampleEvent>(factory, 1024, taskScheduler, producerType, waitStrategy);

Wait strategies

Coordination between producers and consumers is performed by the IWaitStrategy. There are multiple wait strategies available in the API and you can even provide your own implementation.