Skip to content

Commit

Permalink
Make gossip available as a mem stream: $mem-gossip
Browse files Browse the repository at this point in the history
Works very similarly to the $mem-node-state stream. Factored out a helper class.
Both mem streams are in the same logical log because the SubscriptionsService
currently supports one physical log and one inmemory log.
  • Loading branch information
timothycoleman committed Feb 1, 2024
1 parent a0bbea0 commit 8023d6a
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Net;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Core.Cluster;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Core.Services;
using EventStore.Core.Services.Storage.InMemory;
using Xunit;

namespace EventStore.Core.XUnit.Tests.Services.Storage.InMemory;

public class GossipListenerServiceTests {
private readonly GossipListenerService _sut;
private readonly ChannelReader<Message> _channelReader;
private readonly Guid _nodeId = Guid.NewGuid();

private readonly JsonSerializerOptions _options = new() {
Converters = {
new JsonStringEnumConverter(),
},
};

public GossipListenerServiceTests() {
var channel = Channel.CreateUnbounded<Message>();
_channelReader = channel.Reader;
_sut = new GossipListenerService(
nodeId: _nodeId,
publisher: new EnvelopePublisher(new ChannelEnvelope(channel)),
new InMemoryLog());
}

[Fact]
public async Task notify_state_change() {
static int random() => Random.Shared.Next(65000);

var member = MemberInfo.ForVNode(
instanceId: Guid.NewGuid(),
timeStamp: DateTime.Now,
state: Data.VNodeState.DiscoverLeader,
isAlive: true,
internalTcpEndPoint: default,
internalSecureTcpEndPoint: new DnsEndPoint("myhost", random()),
externalTcpEndPoint: default,
externalSecureTcpEndPoint: new DnsEndPoint("myhost", random()),
httpEndPoint: new DnsEndPoint("myhost", random()),
advertiseHostToClientAs: "advertiseHostToClientAs",
advertiseHttpPortToClientAs: random(),
advertiseTcpPortToClientAs: random(),
lastCommitPosition: random(),
writerCheckpoint: random(),
chaserCheckpoint: random(),
epochPosition: random(),
epochNumber: random(),
epochId: Guid.NewGuid(),
nodePriority: random(),
isReadOnlyReplica: true);

// when
_sut.Handle(new GossipMessage.GossipUpdated(new ClusterInfo(member)));

// then
var @event = Assert.IsType<StorageMessage.InMemoryEventCommitted>(await _channelReader.ReadAsync());

Assert.Equal(SystemStreams.GossipStream, @event.Event.EventStreamId);
Assert.Equal(GossipListenerService.EventType, @event.Event.EventType);
Assert.Equal(0, @event.Event.EventNumber);

var expectedBytes = JsonSerializer.SerializeToUtf8Bytes(
new {
NodeId = _nodeId,
Members = new[] { new ClientClusterInfo.ClientMemberInfo(member) },
},
_options);

var actualBytes = @event.Event.Data;

Assert.Equal(
Encoding.UTF8.GetString(expectedBytes),
Encoding.UTF8.GetString(actualBytes.Span));

Assert.Equal(expectedBytes, actualBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Core.Services;
using EventStore.Core.Services.Storage;
using EventStore.Core.Services.Storage.InMemory;
using Xunit;

namespace EventStore.Core.XUnit.Tests.Services.Storage;
namespace EventStore.Core.XUnit.Tests.Services.Storage.InMemory;

public class InMemoryStreamReaderTests {
private readonly InMemoryStreamReader _sut;
private readonly NodeStateListenerService _listener;

public InMemoryStreamReaderTests() {
var channel = Channel.CreateUnbounded<Message>();
_listener = new NodeStateListenerService(new EnvelopePublisher(new ChannelEnvelope(channel)));
_listener = new NodeStateListenerService(
new EnvelopePublisher(new ChannelEnvelope(channel)),
new InMemoryLog());
_sut = new InMemoryStreamReader(new Dictionary<string, IInMemoryStreamReader> {
[SystemStreams.NodeStateStream] = _listener,
});
Expand Down Expand Up @@ -145,7 +147,7 @@ public class ReadForwardTests : InMemoryStreamReaderTests {
// - not find event 49 (like we do for regular maxCount reads, even if old events have been scavenged)
// and not reach the end of the stream (nextEventNumber <= 49 so that we can read it in subsequent pages)
// current implementation finds the event.
for (int i = 0; i < 50; i++)
for (var i = 0; i < 50; i++)
_listener.Handle(new SystemMessage.BecomeLeader(Guid.NewGuid()));
var correlation = Guid.NewGuid();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
using System;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Core.Services;
using EventStore.Core.Services.Storage.InMemory;
using Xunit;

namespace EventStore.Core.XUnit.Tests.Services.VNode;
namespace EventStore.Core.XUnit.Tests.Services.Storage.InMemory;

public class NodeStateListenerServiceTests {
private readonly NodeStateListenerService _sut;
Expand All @@ -18,7 +18,9 @@ public class NodeStateListenerServiceTests {
public NodeStateListenerServiceTests() {
var channel = Channel.CreateUnbounded<Message>();
_channelReader = channel.Reader;
_sut = new NodeStateListenerService(new EnvelopePublisher(new ChannelEnvelope(channel)));
_sut = new NodeStateListenerService(
new EnvelopePublisher(new ChannelEnvelope(channel)),
new InMemoryLog());
}

[Fact]
Expand All @@ -30,7 +32,7 @@ public class NodeStateListenerServiceTests {
Assert.Equal(NodeStateListenerService.EventType, @event.Event.EventType);
Assert.Equal(0, @event.Event.EventNumber);
Assert.Equal(JsonSerializer.SerializeToUtf8Bytes(new {
state = VNodeState.Leader.ToString(),
State = VNodeState.Leader.ToString(),
}), @event.Event.Data.ToArray());
}
}
12 changes: 11 additions & 1 deletion src/EventStore.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
using EventStore.Core.Caching;
using EventStore.Core.Certificates;
using EventStore.Core.Cluster;
using EventStore.Core.Services.Storage.InMemory;
using EventStore.Core.Services.PeriodicLogs;
using EventStore.Core.Synchronization;
using EventStore.Core.Telemetry;
Expand Down Expand Up @@ -747,14 +748,23 @@ public class ClusterVNode<TStreamId> :

monitoringRequestBus.Subscribe<MonitoringMessage.InternalStatsRequest>(storageWriter);

// Mem streams
var memLog = new InMemoryLog();

// Gossip listener
var gossipListener = new GossipListenerService(NodeInfo.InstanceId, _mainQueue, memLog);
_mainBus.Subscribe<GossipMessage.GossipUpdated>(gossipListener);

// Node state listener
var nodeStatusListener = new NodeStateListenerService(_mainQueue);
var nodeStatusListener = new NodeStateListenerService(_mainQueue, memLog);
_mainBus.Subscribe<SystemMessage.StateChangeMessage>(nodeStatusListener);

var inMemReader = new InMemoryStreamReader(new Dictionary<string, IInMemoryStreamReader> {
[SystemStreams.GossipStream] = gossipListener,
[SystemStreams.NodeStateStream] = nodeStatusListener,
});

// Storage Reader
var storageReader = new StorageReaderService<TStreamId>(_mainQueue, _mainBus, readIndex,
logFormat.SystemStreams,
readerThreadsCount, Db.Config.WriterCheckpoint.AsReadOnly(), inMemReader, _queueStatsManager,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using EventStore.Core.Bus;
using EventStore.Core.Messages;

namespace EventStore.Core.Services.Storage.InMemory;

public class GossipListenerService :
IInMemoryStreamReader,
IHandle<GossipMessage.GossipUpdated> {

private readonly SingleEventInMemoryStream _stream;
private readonly Guid _nodeId;
public const string EventType = "$GossipUpdated";

private readonly JsonSerializerOptions _options = new() {
Converters = {
new JsonStringEnumConverter(),
},
};

public GossipListenerService(Guid nodeId, IPublisher publisher, InMemoryLog memLog) {
_stream = new(publisher, memLog, SystemStreams.GossipStream);
_nodeId = nodeId;
}

public void Handle(GossipMessage.GossipUpdated message) {
// SystemStreams.GossipStream is a system stream so only readable by admins
// we use ClientMemberInfo because plugins will consume this stream and
// it is less likely to change than the internal gossip.
var payload = new {
NodeId = _nodeId,
Members = message.ClusterInfo.Members.Select(static x =>
new Cluster.ClientClusterInfo.ClientMemberInfo(x)),
};

var data = JsonSerializer.SerializeToUtf8Bytes(payload, _options);
_stream.Write(EventType, data);
}

public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards(
ClientMessage.ReadStreamEventsForward msg) => _stream.ReadForwards(msg);

public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards(
ClientMessage.ReadStreamEventsBackward msg) => _stream.ReadBackwards(msg);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using EventStore.Core.Messages;

namespace EventStore.Core.Services.Storage;
namespace EventStore.Core.Services.Storage.InMemory;

public interface IInMemoryStreamReader {
ClientMessage.ReadStreamEventsForwardCompleted ReadForwards(ClientMessage.ReadStreamEventsForward msg);
Expand Down
14 changes: 14 additions & 0 deletions src/EventStore.Core/Services/Storage/InMemory/InMemoryLog.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Threading;

namespace EventStore.Core.Services.Storage.InMemory;

// does not support $all style reads, but tracks the LastCommitPosition so that
// the long poll mechanism works in the SubscriptionsService.
// note that the SubscriptionsService currently only supports one physical log
// and one inmemory log so we can't have separate inmemory logs per inmemory stream.
public class InMemoryLog {
long _lastCommitPosition = 0L;

public long GetLastCommitPosition() => Interlocked.Read(ref _lastCommitPosition);
public long GetNextCommitPosition() => Interlocked.Increment(ref _lastCommitPosition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using EventStore.Core.Data;
using EventStore.Core.Messages;

namespace EventStore.Core.Services.Storage;
namespace EventStore.Core.Services.Storage.InMemory;

public class InMemoryStreamReader : IInMemoryStreamReader {
private readonly Dictionary<string, IInMemoryStreamReader> _readers;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using EventStore.Core.Bus;
using EventStore.Core.Messages;

namespace EventStore.Core.Services.Storage.InMemory;

// threading: we expect to handle one StateChangeMessage at a time, but Reads can happen concurrently
// with those handlings and with other reads.
public class NodeStateListenerService :
IInMemoryStreamReader,
IHandle<SystemMessage.StateChangeMessage> {

private readonly SingleEventInMemoryStream _stream;

public const string EventType = "$NodeStateChanged";

private readonly JsonSerializerOptions _options = new() {
Converters = {
new JsonStringEnumConverter(),
},
};

public NodeStateListenerService(IPublisher publisher, InMemoryLog memLog) {
_stream = new(publisher, memLog, SystemStreams.NodeStateStream);
}

public void Handle(SystemMessage.StateChangeMessage message) {
var payload = new { message.State };
var data = JsonSerializer.SerializeToUtf8Bytes(payload, _options);
_stream.Write(EventType, data);
}

public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards(
ClientMessage.ReadStreamEventsForward msg) => _stream.ReadForwards(msg);

public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards(
ClientMessage.ReadStreamEventsBackward msg) => _stream.ReadBackwards(msg);
}
Loading

0 comments on commit 8023d6a

Please sign in to comment.