Skip to content

Commit

Permalink
Make gossip available as a mem stream: $mem-gossip
Browse files Browse the repository at this point in the history
Works very similarly to the $mem-node-state stream. Factored out a helper class.
  • Loading branch information
timothycoleman committed Jan 22, 2024
1 parent 139fb53 commit 36fd072
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Net;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Core.Cluster;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Core.Services;
using Xunit;

namespace EventStore.Core.XUnit.Tests.Services.VNode;

public class GossipListenerServiceTests {
private readonly GossipListenerService _sut;
private readonly ChannelReader<Message> _channelReader;
private readonly Guid _nodeId = Guid.NewGuid();

private readonly JsonSerializerOptions _options = new() {
Converters = {
new JsonStringEnumConverter(),
},
};

public GossipListenerServiceTests() {
var channel = Channel.CreateUnbounded<Message>();
_channelReader = channel.Reader;
var commitPosition = 0;
_sut = new GossipListenerService(
nodeId: _nodeId,
publisher: new EnvelopePublisher(new ChannelEnvelope(channel)),
getNextCommitPosition: () => ++commitPosition);
}

[Fact]
public async Task notify_state_change() {
static int random() => Random.Shared.Next(65000);

var member = MemberInfo.ForVNode(
instanceId: Guid.NewGuid(),
timeStamp: DateTime.Now,
state: Data.VNodeState.DiscoverLeader,
isAlive: true,
internalTcpEndPoint: default,
internalSecureTcpEndPoint: new DnsEndPoint("myhost", random()),
externalTcpEndPoint: default,
externalSecureTcpEndPoint: new DnsEndPoint("myhost", random()),
httpEndPoint: new DnsEndPoint("myhost", random()),
advertiseHostToClientAs: "advertiseHostToClientAs",
advertiseHttpPortToClientAs: random(),
advertiseTcpPortToClientAs: random(),
lastCommitPosition: random(),
writerCheckpoint: random(),
chaserCheckpoint: random(),
epochPosition: random(),
epochNumber: random(),
epochId: Guid.NewGuid(),
nodePriority: random(),
isReadOnlyReplica: true);

// when
_sut.Handle(new GossipMessage.GossipUpdated(new ClusterInfo(member)));

// then
var @event = Assert.IsType<StorageMessage.InMemoryEventCommitted>(await _channelReader.ReadAsync());

Assert.Equal(SystemStreams.GossipStream, @event.Event.EventStreamId);
Assert.Equal(GossipListenerService.EventType, @event.Event.EventType);
Assert.Equal(0, @event.Event.EventNumber);

var expectedBytes = JsonSerializer.SerializeToUtf8Bytes(
new {
NodeId = _nodeId,
Members = new[] { new ClientClusterInfo.ClientMemberInfo(member) },
},
_options);

var actualBytes = @event.Event.Data;

Assert.Equal(
Encoding.UTF8.GetString(expectedBytes),
Encoding.UTF8.GetString(actualBytes.Span));

Assert.Equal(expectedBytes, actualBytes);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Core.Data;
Expand All @@ -18,7 +17,10 @@ public class NodeStateListenerServiceTests {
public NodeStateListenerServiceTests() {
var channel = Channel.CreateUnbounded<Message>();
_channelReader = channel.Reader;
_sut = new NodeStateListenerService(new EnvelopePublisher(new ChannelEnvelope(channel)));
var commitPosition = 0;
_sut = new NodeStateListenerService(
new EnvelopePublisher(new ChannelEnvelope(channel)),
() => ++commitPosition);
}

[Fact]
Expand All @@ -30,7 +32,7 @@ public class NodeStateListenerServiceTests {
Assert.Equal(NodeStateListenerService.EventType, @event.Event.EventType);
Assert.Equal(0, @event.Event.EventNumber);
Assert.Equal(JsonSerializer.SerializeToUtf8Bytes(new {
state = VNodeState.Leader.ToString(),
State = VNodeState.Leader.ToString(),
}), @event.Event.Data.ToArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ public class InMemoryStreamReaderTests {

public InMemoryStreamReaderTests() {
var channel = Channel.CreateUnbounded<Message>();
_listener = new NodeStateListenerService(new EnvelopePublisher(new ChannelEnvelope(channel)));
var commitPosition = 0;
_listener = new NodeStateListenerService(
new EnvelopePublisher(new ChannelEnvelope(channel)),
() => ++commitPosition);
_sut = new InMemoryStreamReader(new Dictionary<string, IInMemoryStreamReader> {
[SystemStreams.NodeStateStream] = _listener,
});
Expand Down
15 changes: 14 additions & 1 deletion src/EventStore.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -727,14 +727,27 @@ public class ClusterVNode<TStreamId> :

monitoringRequestBus.Subscribe<MonitoringMessage.InternalStatsRequest>(storageWriter);

// Mem streams
var memLogCommitPosition = 0;
long GetNextCommitPosition() => Interlocked.Increment(ref memLogCommitPosition);

// Gossip listener
var gossipListener = new GossipListenerService(
NodeInfo.InstanceId,
_mainQueue,
GetNextCommitPosition);
_mainBus.Subscribe<GossipMessage.GossipUpdated>(gossipListener);

// Node state listener
var nodeStatusListener = new NodeStateListenerService(_mainQueue);
var nodeStatusListener = new NodeStateListenerService(_mainQueue, GetNextCommitPosition);
_mainBus.Subscribe<SystemMessage.StateChangeMessage>(nodeStatusListener);

var inMemReader = new InMemoryStreamReader(new Dictionary<string, IInMemoryStreamReader> {
[SystemStreams.GossipStream] = gossipListener,
[SystemStreams.NodeStateStream] = nodeStatusListener,
});

// Storage Reader
var storageReader = new StorageReaderService<TStreamId>(_mainQueue, _mainBus, readIndex,
logFormat.SystemStreams,
readerThreadsCount, Db.Config.WriterCheckpoint.AsReadOnly(), inMemReader, _queueStatsManager,
Expand Down
49 changes: 49 additions & 0 deletions src/EventStore.Core/Services/GossipListenerService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using EventStore.Core.Bus;
using EventStore.Core.Messages;
using EventStore.Core.Services.Storage;

namespace EventStore.Core.Services;

public class GossipListenerService :
IInMemoryStreamReader,
IHandle<GossipMessage.GossipUpdated> {

private readonly SingleEventInMemoryStream _stream;
private readonly Guid _nodeId;
public const string EventType = "$GossipUpdated";

private readonly JsonSerializerOptions _options = new() {
Converters = {
new JsonStringEnumConverter(),
},
};

public GossipListenerService(Guid nodeId, IPublisher publisher, Func<long> getNextCommitPosition) {
_stream = new(publisher, getNextCommitPosition, SystemStreams.GossipStream);
_nodeId = nodeId;
}

public void Handle(GossipMessage.GossipUpdated message) {
// SystemStreams.GossipStream is a system stream so only readable by admins
// we use ClientMemberInfo because plugins will consume this stream and
// it is less likely to change than the internal gossip.
var payload = new {
NodeId = _nodeId,
Members = message.ClusterInfo.Members.Select(static x =>
new Cluster.ClientClusterInfo.ClientMemberInfo(x)),
};

var data = JsonSerializer.SerializeToUtf8Bytes(payload, _options);
_stream.Write(EventType, data);
}

public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards(
ClientMessage.ReadStreamEventsForward msg) => _stream.ReadForwards(msg);

public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards(
ClientMessage.ReadStreamEventsBackward msg) => _stream.ReadBackwards(msg);
}
136 changes: 19 additions & 117 deletions src/EventStore.Core/Services/NodeStateListenerService.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
using System;
using System.Text.Json;
using System.Text.Json.Serialization;
using EventStore.Core.Bus;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Services.Storage;
using EventStore.Core.TransactionLog.LogRecords;

namespace EventStore.Core.Services;

Expand All @@ -13,127 +12,30 @@ namespace EventStore.Core.Services;
public class NodeStateListenerService :
IInMemoryStreamReader,
IHandle<SystemMessage.StateChangeMessage> {
private readonly IPublisher _publisher;
public const string EventType = "$NodeStateChanged";
private const PrepareFlags Flags = PrepareFlags.Data | PrepareFlags.IsCommitted | PrepareFlags.IsJson;
private long _eventNumber;
private EventRecord _lastEvent;

public NodeStateListenerService(IPublisher publisher) {
_publisher = publisher;
_eventNumber = 0;
}

public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards(ClientMessage.ReadStreamEventsForward msg) {
ReadStreamResult result;
ResolvedEvent[] events;
long nextEventNumber, lastEventNumber, tfLastCommitPosition;

var lastEvent = _lastEvent;
if (lastEvent == null) {
// no stream
result = ReadStreamResult.NoStream;
events = Array.Empty<ResolvedEvent>();
nextEventNumber = -1;
lastEventNumber = ExpectedVersion.NoStream;
tfLastCommitPosition = -1;
} else {
result = ReadStreamResult.Success;
nextEventNumber = lastEvent.EventNumber + 1;
lastEventNumber = lastEvent.EventNumber;
tfLastCommitPosition = lastEvent.EventNumber;

if (msg.FromEventNumber > lastEvent.EventNumber) {
// from too high. empty read
events = Array.Empty<ResolvedEvent>();
} else {
// read containing the event
events = new[] { ResolvedEvent.ForUnresolvedEvent(lastEvent) };
}
}

return new ClientMessage.ReadStreamEventsForwardCompleted(
msg.CorrelationId,
msg.EventStreamId,
msg.FromEventNumber,
msg.MaxCount,
result,
events,
StreamMetadata.Empty,
isCachePublic: false,
error: string.Empty,
nextEventNumber: nextEventNumber,
lastEventNumber: lastEventNumber,
isEndOfStream: true,
tfLastCommitPosition: tfLastCommitPosition);
}

public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards(ClientMessage.ReadStreamEventsBackward msg) {
ReadStreamResult result;
ResolvedEvent[] events;
long adjustedFromEventNumber, lastEventNumber, tfLastCommitPosition;

var lastEvent = _lastEvent;
if (lastEvent == null) {
// no stream
adjustedFromEventNumber = msg.FromEventNumber;
result = ReadStreamResult.NoStream;
events = Array.Empty<ResolvedEvent>();
lastEventNumber = ExpectedVersion.NoStream;
tfLastCommitPosition = -1L;
} else {
result = ReadStreamResult.Success;
lastEventNumber = lastEvent.EventNumber;
tfLastCommitPosition = lastEvent.EventNumber;
private readonly SingleEventInMemoryStream _stream;

var readFromEnd = msg.FromEventNumber < 0;
adjustedFromEventNumber = readFromEnd ? lastEvent.EventNumber : msg.FromEventNumber;
public const string EventType = "$NodeStateChanged";

if (adjustedFromEventNumber < lastEvent.EventNumber) {
// from too low. empty read
events = Array.Empty<ResolvedEvent>();
} else {
// read containing the event
events = new[] { ResolvedEvent.ForUnresolvedEvent(lastEvent) };
}
}
private readonly JsonSerializerOptions _options = new() {
Converters = {
new JsonStringEnumConverter(),
},
};

return new ClientMessage.ReadStreamEventsBackwardCompleted(
correlationId: msg.CorrelationId,
eventStreamId: msg.EventStreamId,
fromEventNumber: adjustedFromEventNumber,
maxCount: msg.MaxCount,
result: result,
events: events,
streamMetadata: StreamMetadata.Empty,
isCachePublic: false,
error: string.Empty,
nextEventNumber: -1,
lastEventNumber: lastEventNumber,
isEndOfStream: true,
tfLastCommitPosition: tfLastCommitPosition);
public NodeStateListenerService(IPublisher publisher, Func<long> getNextCommitPosition) {
_stream = new(publisher, getNextCommitPosition, SystemStreams.NodeStateStream);
}

public void Handle(SystemMessage.StateChangeMessage message) {
var payload = new { state = message.State.ToString() };
var data = JsonSerializer.SerializeToUtf8Bytes(payload);
var prepare = new PrepareLogRecord(
logPosition: _eventNumber,
correlationId: message.CorrelationId,
eventId: Guid.NewGuid(),
transactionPosition: _eventNumber,
transactionOffset: 0,
eventStreamId: SystemStreams.NodeStateStream,
eventStreamIdSize: null,
expectedVersion: _eventNumber - 1,
timeStamp: DateTime.Now,
flags: Flags,
eventType: EventType,
eventTypeSize: null,
data: data,
metadata: Array.Empty<byte>());
_lastEvent = new EventRecord(_eventNumber, prepare, SystemStreams.NodeStateStream, EventType);
_publisher.Publish(new StorageMessage.InMemoryEventCommitted(_eventNumber, _lastEvent));
_eventNumber++;
var payload = new { State = message.State };
var data = JsonSerializer.SerializeToUtf8Bytes(payload, _options);
_stream.Write(EventType, data);
}

public ClientMessage.ReadStreamEventsForwardCompleted ReadForwards(
ClientMessage.ReadStreamEventsForward msg) => _stream.ReadForwards(msg);

public ClientMessage.ReadStreamEventsBackwardCompleted ReadBackwards(
ClientMessage.ReadStreamEventsBackward msg) => _stream.ReadBackwards(msg);
}

0 comments on commit 36fd072

Please sign in to comment.