Skip to content

hongliyu2002/Orleans.EventStore

Repository files navigation

EventStore Providers for Microsoft Orleans

Event Sourcing

Silo Configuration

var eventStoreConnectionString = "esdb://123.60.184.85:2113?tls=false";
silo.AddEventStoreBasedLogConsistencyProvider(Constants.LogConsistencyStoreName, 
        options =>
        {
            options.ClientSettings = EventStoreClientSettings.Create(eventStoreConnectionString);
        })
.AddMemoryGrainStorage(Constants.LogSnapshotStoreName);

Persistence

Silo Configuration

var eventStoreConnectionString = "esdb://123.60.184.85:2113?tls=false";
silo.AddEventStoreGrainStorage(Constants.StateStoreName, 
        options =>
        {
            options.ClientSettings = EventStoreClientSettings.Create(eventStoreConnectionString);
        })

Using EventStore DB as a Grain Storage Provider has an interesting feature: all data of the grain state changes are kept in EventStore, just like some time-series databases. By subscribing to the stream of this state, the data can be dynamically updated to the state database, such as SQL Server, making it easy to implement certain CQRS functionality.

Streaming

Now supports rewindable feature!

private async Task JoinChannel(string? username, string? channel, long version)
{
    try
    {
        _currentUsername = username.IsNullOrWhiteSpace() ? "(anonymous)" : username;
        _currentChannel = channel.IsNullOrWhiteSpace() ? "(channel unknown)" : channel;
        _channelGrain = _clusterClient.GetGrain<IChannelGrain>(_currentChannel);
        _streamId = await _channelGrain.Join(_currentUsername!);
        _stream = _streamProvider.GetStream<ChatMessage>(_streamId);
        // Providing a specific SequenceToken allows subscribing from a specific point in time.
        _subscription = await _stream.SubscribeAsync(new StreamObserver(MessagesListBox), new EventSequenceTokenV2(version));
        _joined = true;
    }
    catch (Exception ex)
    {
        MessageBox.Show(this, ex.Message, "Error occurred, Please try again...");
    }
}

Silo configuration:

silo.AddStreaming();
silo.AddEventStoreStreams(Constants.StreamProviderName, configurator =>
{
    var eventStoreConnectionString = "esdb://123.60.184.85:2113?tls=false";
    configurator.ConfigureEventStore(optionsBuilder =>
    {
        optionsBuilder.Configure(options =>
        {
            options.ClientSettings = EventStoreClientSettings.Create(eventStoreConnectionString);
            options.Name = "ChatRoomV2";
            // Configure Queues
            options.Queues = new List<string>
            {
                "ChatRoomV2-11111",
                "ChatRoomV2-22222",
                "ChatRoomV2-33333",
                "ChatRoomV2-44444"
            };
        });
    });

    configurator.ConfigureReceiver(optionsBuilder =>
    {
        optionsBuilder.Configure(options =>
        {
            options.SubscriptionSettings = new PersistentSubscriptionSettings(checkPointAfter: TimeSpan.FromMinutes(1), checkPointLowerBound: 1);
            options.PrefetchCount = 50;
        });
    });

    configurator.ConfigureCachePressuring(optionsBuilder =>
    {
        optionsBuilder.Configure(options =>
        {
            options.SlowConsumingMonitorFlowControlThreshold = 0.5;
            options.SlowConsumingMonitorPressureWindowSize = TimeSpan.FromMinutes(30);
        });
    });

    configurator.UseEventStoreCheckpointer(optionsBuilder =>
    {
        optionsBuilder.Configure(options =>
        {
            options.ClientSettings = EventStoreClientSettings.Create(eventStoreConnectionString);
            options.PersistInterval = TimeSpan.FromSeconds(30);
        });
    });

    configurator.ConfigureStreamPubSub();
    configurator.UseConsistentRingQueueBalancer();
});

Cluster Client Configuration:

client.AddStreaming();
client.AddEventStoreStreams(Constants.StreamProviderName, configurator =>
{
    configurator.ConfigureEventStore(optionsBuilder =>
    {
        optionsBuilder.Configure(options =>
        {
            options.ClientSettings = EventStoreClientSettings.Create(eventStoreConnectionString);
            options.Name = "ChatRoomV2";
            options.Queues = new List<string>
            {
                "ChatRoomV2-11111",
                "ChatRoomV2-22222",
                "ChatRoomV2-33333",
                "ChatRoomV2-44444"
            };
        });
    });
    configurator.ConfigureStreamPubSub();
});

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages