diff --git a/src/EventStore.Core.XUnit.Tests/Services/GossipListenerServiceTests.cs b/src/EventStore.Core.XUnit.Tests/Services/GossipListenerServiceTests.cs new file mode 100644 index 00000000000..93f1b5e36d5 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Services/GossipListenerServiceTests.cs @@ -0,0 +1,88 @@ +using System; +using System.Net; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Channels; +using System.Threading.Tasks; +using EventStore.Core.Cluster; +using EventStore.Core.Messages; +using EventStore.Core.Messaging; +using EventStore.Core.Services; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Services.VNode; + +public class GossipListenerServiceTests { + private readonly GossipListenerService _sut; + private readonly ChannelReader _channelReader; + private readonly Guid _nodeId = Guid.NewGuid(); + + private readonly JsonSerializerOptions _options = new() { + Converters = { + new JsonStringEnumConverter(), + }, + }; + + public GossipListenerServiceTests() { + var channel = Channel.CreateUnbounded(); + _channelReader = channel.Reader; + var commitPosition = 0; + _sut = new GossipListenerService( + nodeId: _nodeId, + publisher: new EnvelopePublisher(new ChannelEnvelope(channel)), + getNextCommitPosition: () => ++commitPosition); + } + + [Fact] + public async Task notify_state_change() { + static int random() => Random.Shared.Next(65000); + + var member = MemberInfo.ForVNode( + instanceId: Guid.NewGuid(), + timeStamp: DateTime.Now, + state: Data.VNodeState.DiscoverLeader, + isAlive: true, + internalTcpEndPoint: default, + internalSecureTcpEndPoint: new DnsEndPoint("myhost", random()), + externalTcpEndPoint: default, + externalSecureTcpEndPoint: new DnsEndPoint("myhost", random()), + httpEndPoint: new DnsEndPoint("myhost", random()), + advertiseHostToClientAs: "advertiseHostToClientAs", + advertiseHttpPortToClientAs: random(), + advertiseTcpPortToClientAs: random(), + lastCommitPosition: random(), + writerCheckpoint: random(), + chaserCheckpoint: random(), + epochPosition: random(), + epochNumber: random(), + epochId: Guid.NewGuid(), + nodePriority: random(), + isReadOnlyReplica: true); + + // when + _sut.Handle(new GossipMessage.GossipUpdated(new ClusterInfo(member))); + + // then + var @event = Assert.IsType(await _channelReader.ReadAsync()); + + Assert.Equal(SystemStreams.GossipStream, @event.Event.EventStreamId); + Assert.Equal(GossipListenerService.EventType, @event.Event.EventType); + Assert.Equal(0, @event.Event.EventNumber); + + var expectedBytes = JsonSerializer.SerializeToUtf8Bytes( + new { + NodeId = _nodeId, + Members = new[] { new ClientClusterInfo.ClientMemberInfo(member) }, + }, + _options); + + var actualBytes = @event.Event.Data; + + Assert.Equal( + Encoding.UTF8.GetString(expectedBytes), + Encoding.UTF8.GetString(actualBytes.Span)); + + Assert.Equal(expectedBytes, actualBytes); + } +} diff --git a/src/EventStore.Core.XUnit.Tests/Services/VNode/NodeStateListenerServiceTests.cs b/src/EventStore.Core.XUnit.Tests/Services/NodeStateListenerServiceTests.cs similarity index 85% rename from src/EventStore.Core.XUnit.Tests/Services/VNode/NodeStateListenerServiceTests.cs rename to src/EventStore.Core.XUnit.Tests/Services/NodeStateListenerServiceTests.cs index e0f4d9186cb..c1a6719ac88 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/VNode/NodeStateListenerServiceTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/NodeStateListenerServiceTests.cs @@ -1,6 +1,5 @@ using System; using System.Text.Json; -using System.Text.Json.Nodes; using System.Threading.Channels; using System.Threading.Tasks; using EventStore.Core.Data; @@ -18,7 +17,10 @@ public class NodeStateListenerServiceTests { public NodeStateListenerServiceTests() { var channel = Channel.CreateUnbounded(); _channelReader = channel.Reader; - _sut = new NodeStateListenerService(new EnvelopePublisher(new ChannelEnvelope(channel))); + var commitPosition = 0; + _sut = new NodeStateListenerService( + new EnvelopePublisher(new ChannelEnvelope(channel)), + () => ++commitPosition); } [Fact] @@ -30,7 +32,7 @@ public class NodeStateListenerServiceTests { Assert.Equal(NodeStateListenerService.EventType, @event.Event.EventType); Assert.Equal(0, @event.Event.EventNumber); Assert.Equal(JsonSerializer.SerializeToUtf8Bytes(new { - state = VNodeState.Leader.ToString(), + State = VNodeState.Leader.ToString(), }), @event.Event.Data.ToArray()); } } diff --git a/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemoryStreamReaderTests.cs b/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemoryStreamReaderTests.cs index 91930dc4e37..445a42353b0 100644 --- a/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemoryStreamReaderTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Services/Storage/InMemoryStreamReaderTests.cs @@ -17,7 +17,10 @@ public class InMemoryStreamReaderTests { public InMemoryStreamReaderTests() { var channel = Channel.CreateUnbounded(); - _listener = new NodeStateListenerService(new EnvelopePublisher(new ChannelEnvelope(channel))); + var commitPosition = 0; + _listener = new NodeStateListenerService( + new EnvelopePublisher(new ChannelEnvelope(channel)), + () => ++commitPosition); _sut = new InMemoryStreamReader(new Dictionary { [SystemStreams.NodeStateStream] = _listener, }); diff --git a/src/EventStore.Core/ClusterVNode.cs b/src/EventStore.Core/ClusterVNode.cs index 263e8b7dfc7..06846f5093a 100644 --- a/src/EventStore.Core/ClusterVNode.cs +++ b/src/EventStore.Core/ClusterVNode.cs @@ -727,14 +727,27 @@ public class ClusterVNode : monitoringRequestBus.Subscribe(storageWriter); + // Mem streams + var memLogCommitPosition = 0; + long GetNextCommitPosition() => Interlocked.Increment(ref memLogCommitPosition); + + // Gossip listener + var gossipListener = new GossipListenerService( + NodeInfo.InstanceId, + _mainQueue, + GetNextCommitPosition); + _mainBus.Subscribe(gossipListener); + // Node state listener - var nodeStatusListener = new NodeStateListenerService(_mainQueue); + var nodeStatusListener = new NodeStateListenerService(_mainQueue, GetNextCommitPosition); _mainBus.Subscribe(nodeStatusListener); var inMemReader = new InMemoryStreamReader(new Dictionary { + [SystemStreams.GossipStream] = gossipListener, [SystemStreams.NodeStateStream] = nodeStatusListener, }); + // Storage Reader var storageReader = new StorageReaderService(_mainQueue, _mainBus, readIndex, logFormat.SystemStreams, readerThreadsCount, Db.Config.WriterCheckpoint.AsReadOnly(), inMemReader, _queueStatsManager, diff --git a/src/EventStore.Core/Services/GossipListenerService.cs b/src/EventStore.Core/Services/GossipListenerService.cs new file mode 100644 index 00000000000..4f32e4116c1 --- /dev/null +++ b/src/EventStore.Core/Services/GossipListenerService.cs @@ -0,0 +1,49 @@ +using System; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization; +using EventStore.Core.Bus; +using EventStore.Core.Messages; +using EventStore.Core.Services.Storage; + +namespace EventStore.Core.Services; + +public class GossipListenerService : + IInMemoryStreamReader, + IHandle { + + private readonly SingleEventInMemoryStream _stream; + private readonly Guid _nodeId; + public const string EventType = "$GossipUpdated"; + + private readonly JsonSerializerOptions _options = new() { + Converters = { + new JsonStringEnumConverter(), + }, + }; + + public GossipListenerService(Guid nodeId, IPublisher publisher, Func getNextCommitPosition) { + _stream = new(publisher, getNextCommitPosition, SystemStreams.GossipStream); + _nodeId = nodeId; + } + + public void Handle(GossipMessage.GossipUpdated message) { + // SystemStreams.GossipStream is a system stream so only readable by admins + // we use ClientMemberInfo because plugins will consume this stream and + // it is less likely to change than the internal gossip. + var payload = new { + NodeId = _nodeId, + Members = message.ClusterInfo.Members.Select(static x => + new Cluster.ClientClusterInfo.ClientMemberInfo(x)), + }; + + var data = JsonSerializer.SerializeToUtf8Bytes(payload, _options); + _stream.Write(EventType, data); + } + + public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards( + ClientMessage.ReadStreamEventsForward msg) => _stream.ReadForwards(msg); + + public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards( + ClientMessage.ReadStreamEventsBackward msg) => _stream.ReadBackwards(msg); +} diff --git a/src/EventStore.Core/Services/NodeStateListenerService.cs b/src/EventStore.Core/Services/NodeStateListenerService.cs index 379c4d2893b..f3375df9c12 100644 --- a/src/EventStore.Core/Services/NodeStateListenerService.cs +++ b/src/EventStore.Core/Services/NodeStateListenerService.cs @@ -1,10 +1,9 @@ using System; using System.Text.Json; +using System.Text.Json.Serialization; using EventStore.Core.Bus; -using EventStore.Core.Data; using EventStore.Core.Messages; using EventStore.Core.Services.Storage; -using EventStore.Core.TransactionLog.LogRecords; namespace EventStore.Core.Services; @@ -13,127 +12,30 @@ namespace EventStore.Core.Services; public class NodeStateListenerService : IInMemoryStreamReader, IHandle { - private readonly IPublisher _publisher; - public const string EventType = "$NodeStateChanged"; - private const PrepareFlags Flags = PrepareFlags.Data | PrepareFlags.IsCommitted | PrepareFlags.IsJson; - private long _eventNumber; - private EventRecord _lastEvent; - - public NodeStateListenerService(IPublisher publisher) { - _publisher = publisher; - _eventNumber = 0; - } - - public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards(ClientMessage.ReadStreamEventsForward msg) { - ReadStreamResult result; - ResolvedEvent[] events; - long nextEventNumber, lastEventNumber, tfLastCommitPosition; - - var lastEvent = _lastEvent; - if (lastEvent == null) { - // no stream - result = ReadStreamResult.NoStream; - events = Array.Empty(); - nextEventNumber = -1; - lastEventNumber = ExpectedVersion.NoStream; - tfLastCommitPosition = -1; - } else { - result = ReadStreamResult.Success; - nextEventNumber = lastEvent.EventNumber + 1; - lastEventNumber = lastEvent.EventNumber; - tfLastCommitPosition = lastEvent.EventNumber; - - if (msg.FromEventNumber > lastEvent.EventNumber) { - // from too high. empty read - events = Array.Empty(); - } else { - // read containing the event - events = new[] { ResolvedEvent.ForUnresolvedEvent(lastEvent) }; - } - } - return new ClientMessage.ReadStreamEventsForwardCompleted( - msg.CorrelationId, - msg.EventStreamId, - msg.FromEventNumber, - msg.MaxCount, - result, - events, - StreamMetadata.Empty, - isCachePublic: false, - error: string.Empty, - nextEventNumber: nextEventNumber, - lastEventNumber: lastEventNumber, - isEndOfStream: true, - tfLastCommitPosition: tfLastCommitPosition); - } - - public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards(ClientMessage.ReadStreamEventsBackward msg) { - ReadStreamResult result; - ResolvedEvent[] events; - long adjustedFromEventNumber, lastEventNumber, tfLastCommitPosition; - - var lastEvent = _lastEvent; - if (lastEvent == null) { - // no stream - adjustedFromEventNumber = msg.FromEventNumber; - result = ReadStreamResult.NoStream; - events = Array.Empty(); - lastEventNumber = ExpectedVersion.NoStream; - tfLastCommitPosition = -1L; - } else { - result = ReadStreamResult.Success; - lastEventNumber = lastEvent.EventNumber; - tfLastCommitPosition = lastEvent.EventNumber; + private readonly SingleEventInMemoryStream _stream; - var readFromEnd = msg.FromEventNumber < 0; - adjustedFromEventNumber = readFromEnd ? lastEvent.EventNumber : msg.FromEventNumber; + public const string EventType = "$NodeStateChanged"; - if (adjustedFromEventNumber < lastEvent.EventNumber) { - // from too low. empty read - events = Array.Empty(); - } else { - // read containing the event - events = new[] { ResolvedEvent.ForUnresolvedEvent(lastEvent) }; - } - } + private readonly JsonSerializerOptions _options = new() { + Converters = { + new JsonStringEnumConverter(), + }, + }; - return new ClientMessage.ReadStreamEventsBackwardCompleted( - correlationId: msg.CorrelationId, - eventStreamId: msg.EventStreamId, - fromEventNumber: adjustedFromEventNumber, - maxCount: msg.MaxCount, - result: result, - events: events, - streamMetadata: StreamMetadata.Empty, - isCachePublic: false, - error: string.Empty, - nextEventNumber: -1, - lastEventNumber: lastEventNumber, - isEndOfStream: true, - tfLastCommitPosition: tfLastCommitPosition); + public NodeStateListenerService(IPublisher publisher, Func getNextCommitPosition) { + _stream = new(publisher, getNextCommitPosition, SystemStreams.NodeStateStream); } public void Handle(SystemMessage.StateChangeMessage message) { - var payload = new { state = message.State.ToString() }; - var data = JsonSerializer.SerializeToUtf8Bytes(payload); - var prepare = new PrepareLogRecord( - logPosition: _eventNumber, - correlationId: message.CorrelationId, - eventId: Guid.NewGuid(), - transactionPosition: _eventNumber, - transactionOffset: 0, - eventStreamId: SystemStreams.NodeStateStream, - eventStreamIdSize: null, - expectedVersion: _eventNumber - 1, - timeStamp: DateTime.Now, - flags: Flags, - eventType: EventType, - eventTypeSize: null, - data: data, - metadata: Array.Empty()); - _lastEvent = new EventRecord(_eventNumber, prepare, SystemStreams.NodeStateStream, EventType); - _publisher.Publish(new StorageMessage.InMemoryEventCommitted(_eventNumber, _lastEvent)); - _eventNumber++; + var payload = new { State = message.State }; + var data = JsonSerializer.SerializeToUtf8Bytes(payload, _options); + _stream.Write(EventType, data); } + + public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards( + ClientMessage.ReadStreamEventsForward msg) => _stream.ReadForwards(msg); + + public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards( + ClientMessage.ReadStreamEventsBackward msg) => _stream.ReadBackwards(msg); } diff --git a/src/EventStore.Core/Services/SingleEventInMemoryStream.cs b/src/EventStore.Core/Services/SingleEventInMemoryStream.cs new file mode 100644 index 00000000000..5838351cff8 --- /dev/null +++ b/src/EventStore.Core/Services/SingleEventInMemoryStream.cs @@ -0,0 +1,146 @@ +using System; +using EventStore.Core.Bus; +using EventStore.Core.Data; +using EventStore.Core.Messages; +using EventStore.Core.Services.Storage; +using EventStore.Core.TransactionLog.LogRecords; + +namespace EventStore.Core.Services; + +// threading: we expect to handle one Write at a time, but Reads can happen concurrently +// with the write and with other reads. +public class SingleEventInMemoryStream : IInMemoryStreamReader { + private readonly IPublisher _publisher; + private readonly Func _getNextCommitPosition; + private readonly string _streamName; + private const PrepareFlags Flags = PrepareFlags.Data | PrepareFlags.IsCommitted | PrepareFlags.IsJson; + private long _eventNumber; + private EventRecord _lastEvent; + + public SingleEventInMemoryStream( + IPublisher publisher, + Func getNextCommitPosition, + string streamName) { + + _publisher = publisher; + _eventNumber = 0; + _getNextCommitPosition = getNextCommitPosition; + _streamName = streamName; + } + + public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards( + ClientMessage.ReadStreamEventsForward msg) { + + ReadStreamResult result; + ResolvedEvent[] events; + long nextEventNumber, lastEventNumber, tfLastCommitPosition; + + var lastEvent = _lastEvent; + if (lastEvent == null) { + // no stream + result = ReadStreamResult.NoStream; + events = Array.Empty(); + nextEventNumber = -1; + lastEventNumber = ExpectedVersion.NoStream; + tfLastCommitPosition = -1; + } else { + result = ReadStreamResult.Success; + nextEventNumber = lastEvent.EventNumber + 1; + lastEventNumber = lastEvent.EventNumber; + tfLastCommitPosition = lastEvent.EventNumber; + + if (msg.FromEventNumber > lastEvent.EventNumber) { + // from too high. empty read + events = Array.Empty(); + } else { + // read containing the event + events = new[] { ResolvedEvent.ForUnresolvedEvent(lastEvent) }; + } + } + + return new ClientMessage.ReadStreamEventsForwardCompleted( + msg.CorrelationId, + msg.EventStreamId, + msg.FromEventNumber, + msg.MaxCount, + result, + events, + StreamMetadata.Empty, + isCachePublic: false, + error: string.Empty, + nextEventNumber: nextEventNumber, + lastEventNumber: lastEventNumber, + isEndOfStream: true, + tfLastCommitPosition: tfLastCommitPosition); + } + + public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards( + ClientMessage.ReadStreamEventsBackward msg) { + + ReadStreamResult result; + ResolvedEvent[] events; + long adjustedFromEventNumber, lastEventNumber, tfLastCommitPosition; + + var lastEvent = _lastEvent; + if (lastEvent == null) { + // no stream + adjustedFromEventNumber = msg.FromEventNumber; + result = ReadStreamResult.NoStream; + events = Array.Empty(); + lastEventNumber = ExpectedVersion.NoStream; + tfLastCommitPosition = -1L; + } else { + result = ReadStreamResult.Success; + lastEventNumber = lastEvent.EventNumber; + tfLastCommitPosition = lastEvent.EventNumber; + + var readFromEnd = msg.FromEventNumber < 0; + adjustedFromEventNumber = readFromEnd ? lastEvent.EventNumber : msg.FromEventNumber; + + if (adjustedFromEventNumber < lastEvent.EventNumber) { + // from too low. empty read + events = Array.Empty(); + } else { + // read containing the event + events = new[] { ResolvedEvent.ForUnresolvedEvent(lastEvent) }; + } + } + + return new ClientMessage.ReadStreamEventsBackwardCompleted( + correlationId: msg.CorrelationId, + eventStreamId: msg.EventStreamId, + fromEventNumber: adjustedFromEventNumber, + maxCount: msg.MaxCount, + result: result, + events: events, + streamMetadata: StreamMetadata.Empty, + isCachePublic: false, + error: string.Empty, + nextEventNumber: -1, + lastEventNumber: lastEventNumber, + isEndOfStream: true, + tfLastCommitPosition: tfLastCommitPosition); + } + + public void Write(string eventType, ReadOnlyMemory data) { + var commitPosition = _getNextCommitPosition(); + var prepare = new PrepareLogRecord( + logPosition: commitPosition, + correlationId: Guid.NewGuid(), + eventId: Guid.NewGuid(), + transactionPosition: commitPosition, + transactionOffset: 0, + eventStreamId: _streamName, + eventStreamIdSize: null, + expectedVersion: _eventNumber - 1, + timeStamp: DateTime.Now, + flags: Flags, + eventType: eventType, + eventTypeSize: null, + data: data, + metadata: Array.Empty()); + _lastEvent = new EventRecord(_eventNumber, prepare, _streamName, eventType); + _publisher.Publish(new StorageMessage.InMemoryEventCommitted(commitPosition, _lastEvent)); + _eventNumber++; + } +} diff --git a/src/EventStore.Core/Services/SystemNames.cs b/src/EventStore.Core/Services/SystemNames.cs index b2e6f42b1c5..ea81ddcda8e 100644 --- a/src/EventStore.Core/Services/SystemNames.cs +++ b/src/EventStore.Core/Services/SystemNames.cs @@ -27,7 +27,10 @@ public static class SystemStreams { public const string ScavengesStream = "$scavenges"; public const string EpochInformationStream = "$epoch-information"; public const string ScavengePointsStream = "$scavengePoints"; + + // mem streams public const string NodeStateStream = "$mem-node-state"; + public const string GossipStream = "$mem-gossip"; public static bool IsSystemStream(string streamId) { return streamId.Length != 0 && streamId[0] == '$';