Skip to content

Commit

Permalink
Add storage writer flush metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
timothycoleman committed Apr 26, 2023
1 parent 378d917 commit f83d0ba
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 19 deletions.
5 changes: 5 additions & 0 deletions src/EventStore.ClusterNode/telemetryconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
// "ProcessingRequestFromHttpClient"
],

"Writer": {
"FlushSize": true,
"FlushDuration": true
},

// this specifies what name to track each queue under, according to regular expressions to
// match the queue names against
"Queues": [
Expand Down
7 changes: 7 additions & 0 deletions src/EventStore.Common/Configuration/TelemetryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public enum Gossip {
ProcessingRequestFromHttpClient,
}

public enum WriterTracker {
FlushSize = 1,
FlushDuration,
}

public enum EventTracker {
Read = 1,
Written,
Expand All @@ -96,6 +101,8 @@ public class LabelMappingCase {

public Gossip[] GossipTrackers { get; set; } = Array.Empty<Gossip>();

public Dictionary<WriterTracker, bool> Writer { get; set; } = new();

public Dictionary<EventTracker, bool> Events { get; set; } = new();

// must be 0, 1, 5, 10 or a multiple of 15
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,28 @@ public class DurationMaxTrackerTests : IDisposable {
});
});
}

[Fact]
public void no_name() {
using var meter = new Meter($"{typeof(DurationMaxTrackerTests)}");
using var listener = new TestMeterListener<double>(meter);
var sut = new DurationMaxTracker(
metric: new DurationMaxMetric(meter, "the-metric"),
name: null,
expectedScrapeIntervalSeconds: 15);

listener.Observe();

Assert.Collection(
listener.RetrieveMeasurements("the-metric-seconds"),
m => {
Assert.Equal(0, m.Value);
Assert.Collection(
m.Tags.ToArray(),
t => {
Assert.Equal("range", t.Key);
Assert.Equal("16-20 seconds", t.Value);
});
});
}
}
105 changes: 105 additions & 0 deletions src/EventStore.Core.XUnit.Tests/Telemetry/MaxTrackerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System;
using System.Diagnostics.Metrics;
using System.Linq;
using EventStore.Core.Telemetry;
using Xunit;

namespace EventStore.Core.XUnit.Tests.Telemetry;

public class MaxTrackerTests : IDisposable {
private readonly TestMeterListener<long> _listener;
private readonly FakeClock _clock = new();
private readonly MaxTracker<long> _sut;

public MaxTrackerTests() {
var meter = new Meter($"{typeof(MaxTrackerTests)}");
_listener = new TestMeterListener<long>(meter);
var metric = new MaxMetric<long>(meter, "the-metric");
_sut = new MaxTracker<long>(
metric: metric,
name: "the-tracker",
expectedScrapeIntervalSeconds: 15,
clock: _clock);
}

public void Dispose() {
_listener.Dispose();
}

[Fact]
public void no_records() {
AssertMeasurements(0);
}

[Fact]
public void two_records_ascending() {
AssertMeasurements(0);
_sut.Record(1);
AssertMeasurements(1);
_sut.Record(2);
AssertMeasurements(2);
}

[Fact]
public void two_records_descending() {
AssertMeasurements(0);
_sut.Record(2);
AssertMeasurements(2);
_sut.Record(1);
AssertMeasurements(2);
}

[Fact]
public void removes_stale_data() {
_sut.Record(1);
_clock.AdvanceSeconds(19);
AssertMeasurements(1);

_clock.AdvanceSeconds(1);
AssertMeasurements(0);
}

void AssertMeasurements(double expectedValue) {
_listener.Observe();

Assert.Collection(
_listener.RetrieveMeasurements("the-metric"),
m => {
Assert.Equal(expectedValue, m.Value);
Assert.Collection(
m.Tags.ToArray(),
t => {
Assert.Equal("name", t.Key);
Assert.Equal("the-tracker", t.Value);
},
t => {
Assert.Equal("range", t.Key);
Assert.Equal("16-20 seconds", t.Value);
});
});
}

[Fact]
public void no_name() {
using var meter = new Meter($"{typeof(MaxTrackerTests)}");
using var listener = new TestMeterListener<long>(meter);
var sut = new MaxTracker<long>(
metric: new MaxMetric<long>(meter, "the-metric"),
name: null,
expectedScrapeIntervalSeconds: 15);

listener.Observe();

Assert.Collection(
listener.RetrieveMeasurements("the-metric"),
m => {
Assert.Equal(0, m.Value);
Assert.Collection(
m.Tags.ToArray(),
t => {
Assert.Equal("range", t.Key);
Assert.Equal("16-20 seconds", t.Value);
});
});
}
}
2 changes: 2 additions & 0 deletions src/EventStore.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,8 @@ public class ClusterVNode<TStreamId> :
logFormat.SystemStreams,
epochManager, _queueStatsManager,
trackers.QueueTrackers,
trackers.WriterFlushSizeTracker,
trackers.WriterFlushDurationTracker,
() => readIndex.LastIndexedPosition);
// subscribes internally
AddTasks(storageWriter.Tasks);
Expand Down
21 changes: 21 additions & 0 deletions src/EventStore.Core/MetricsBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class Trackers {
public GossipTrackers GossipTrackers { get; set; } = new ();
public ITransactionFileTracker TransactionFileTracker { get; set; } = new TFChunkTracker.NoOp();
public IIndexTracker IndexTracker { get; set; } = new IndexTracker.NoOp();
public IMaxTracker<long> WriterFlushSizeTracker { get; set; } = new MaxTracker<long>.NoOp();
public IDurationMaxTracker WriterFlushDurationTracker { get; set; } = new DurationMaxTracker.NoOp();
}

public class GrpcTrackers {
Expand Down Expand Up @@ -155,6 +157,25 @@ public static class MetricsBootstrapper {
trackers.GrpcTrackers[method] = new DurationTracker(durationMetric, label);
}

// storage writer
if (conf.Writer.Count > 0) {
if (conf.Writer.TryGetValue(Conf.WriterTracker.FlushSize, out var flushSizeEnabled) && flushSizeEnabled) {
var maxMetric = new MaxMetric<long>(coreMeter, "eventstore-writer-flush-size-max");
trackers.WriterFlushSizeTracker = new MaxTracker<long>(
metric: maxMetric,
name: null,
expectedScrapeIntervalSeconds: conf.ExpectedScrapeIntervalSeconds);
}

if (conf.Writer.TryGetValue(Conf.WriterTracker.FlushDuration, out var flushDurationEnabled) && flushDurationEnabled) {
var maxDurationmetric = new DurationMaxMetric(coreMeter, "eventstore-writer-flush-duration-max");
trackers.WriterFlushDurationTracker = new DurationMaxTracker(
maxDurationmetric,
name: null,
expectedScrapeIntervalSeconds: conf.ExpectedScrapeIntervalSeconds);
}
}

// queue length trackers
trackers.QueueTrackers = new QueueTrackers(
conf.Queues,
Expand Down
5 changes: 4 additions & 1 deletion src/EventStore.Core/Services/ClusterStorageWriterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ public class ClusterStorageWriterService<TStreamId> : StorageWriterService<TStre
IEpochManager epochManager,
QueueStatsManager queueStatsManager,
QueueTrackers trackers,
IMaxTracker<long> flushSizeTracker,
IDurationMaxTracker flushDurationTracker,
Func<long> getLastIndexedPosition)
: base(bus, subscribeToBus, minFlushDelay, db, writer, indexWriter, recordFactory, streamNameIndex,
eventTypeIndex, emptyEventTypeId, systemStreams, epochManager, queueStatsManager, trackers) {
eventTypeIndex, emptyEventTypeId, systemStreams, epochManager, queueStatsManager, trackers,
flushSizeTracker, flushDurationTracker) {
Ensure.NotNull(getLastIndexedPosition, "getLastCommitPosition");

_getLastIndexedPosition = getLastIndexedPosition;
Expand Down
36 changes: 23 additions & 13 deletions src/EventStore.Core/Services/Storage/StorageWriterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using EventStore.Core.Services.Storage.EpochManager;
using EventStore.Core.Services.Storage.ReaderIndex;
using EventStore.Core.Telemetry;
using EventStore.Core.Time;
using EventStore.Core.TransactionLog.Chunks;
using EventStore.Core.TransactionLog.LogRecords;
using Newtonsoft.Json;
Expand Down Expand Up @@ -51,18 +52,19 @@ public class StorageWriterService<TStreamId> : IHandle<SystemMessage.SystemInit>
private readonly INameIndex<TStreamId> _streamNameIndex;
private readonly INameIndex<TStreamId> _eventTypeIndex;
private readonly ISystemStreamLookup<TStreamId> _systemStreams;
private readonly IMaxTracker<long> _flushSizeTracker;
private readonly IDurationMaxTracker _flushDurationTracker;

protected readonly IEpochManager EpochManager;

protected readonly IPublisher Bus;
private readonly ISubscriber _subscribeToBus;
protected readonly IQueuedHandler StorageWriterQueue;
private readonly InMemoryBus _writerBus;

private readonly Stopwatch _watch = Stopwatch.StartNew();
private readonly Clock _clock = Clock.Instance;
private readonly double _minFlushDelay;
private long _lastFlushDelay;
private long _lastFlushTimestamp;
private Instant _lastFlushTimestamp;

protected int FlushMessagesInQueue;
private VNodeState _vnodeState = VNodeState.Initializing;
Expand Down Expand Up @@ -100,7 +102,9 @@ public class StorageWriterService<TStreamId> : IHandle<SystemMessage.SystemInit>
ISystemStreamLookup<TStreamId> systemStreams,
IEpochManager epochManager,
QueueStatsManager queueStatsManager,
QueueTrackers trackers) {
QueueTrackers queueTrackers,
IMaxTracker<long> flushSizeTracker,
IDurationMaxTracker flushDurationTracker) {

Ensure.NotNull(bus, "bus");
Ensure.NotNull(subscribeToBus, "subscribeToBus");
Expand All @@ -123,13 +127,14 @@ public class StorageWriterService<TStreamId> : IHandle<SystemMessage.SystemInit>
_systemStreams = systemStreams;
_emptyEventTypeId = emptyEventTypeId;
EpochManager = epochManager;

_flushDurationTracker = flushDurationTracker;
_flushSizeTracker = flushSizeTracker;
_scavengePointsStreamId = _streamNameIndex.GetExisting(SystemStreams.ScavengePointsStream);
_scavengePointEventTypeId = _eventTypeIndex.GetExisting(SystemEventTypes.ScavengePoint);

_minFlushDelay = minFlushDelay.TotalMilliseconds * TicksPerMs;
_lastFlushDelay = 0;
_lastFlushTimestamp = _watch.ElapsedTicks;
_lastFlushTimestamp = _clock.Now;

Writer = writer;
Writer.Open();
Expand All @@ -138,7 +143,7 @@ public class StorageWriterService<TStreamId> : IHandle<SystemMessage.SystemInit>
StorageWriterQueue = QueuedHandler.CreateQueuedHandler(new AdHocHandler<Message>(CommonHandle),
"StorageWriterQueue",
queueStatsManager,
trackers,
queueTrackers,
true,
TimeSpan.FromMilliseconds(500));
_tasks.Add(StorageWriterQueue.Start());
Expand Down Expand Up @@ -710,15 +715,20 @@ record = prepare.CopyForRetry(
}

protected bool Flush(bool force = false) {
var start = _watch.ElapsedTicks;
if (force || FlushMessagesInQueue == 0 || start - _lastFlushTimestamp >= _lastFlushDelay + _minFlushDelay) {
var start = _clock.Now;
if (force || FlushMessagesInQueue == 0 || start.ElapsedTicksSince(_lastFlushTimestamp) >= _lastFlushDelay + _minFlushDelay) {
var flushSize = Writer.Checkpoint.ReadNonFlushed() - Writer.Checkpoint.Read();

Writer.Flush();
HistogramService.SetValue(_writerFlushHistogram,
(long)((((double)_watch.ElapsedTicks - start) / Stopwatch.Frequency) * 1000000000));
var end = _watch.ElapsedTicks;
var flushDelay = end - start;

_flushSizeTracker.Record(flushSize);
var end = _flushDurationTracker.RecordNow(start);

var flushDelay = end.ElapsedTicksSince(start);
var flushDelaySeconds = ((double)flushDelay) / Stopwatch.Frequency;
var flushDelayNanoSeconds = (long)(flushDelaySeconds * 1_000_000_000);
HistogramService.SetValue(_writerFlushHistogram, flushDelayNanoSeconds);

Interlocked.Exchange(ref _lastFlushDelay, flushDelay);
Interlocked.Exchange(ref _lastFlushSize, flushSize);
_lastFlushTimestamp = end;
Expand Down
2 changes: 2 additions & 0 deletions src/EventStore.Core/Telemetry/DurationMaxMetric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public class DurationMaxMetric {
private readonly List<DurationMaxTracker> _trackers = new();

public DurationMaxMetric(Meter meter, string name) {
// gauge rather than updowncounter because the dimensions wont make sense to sum,
// because they are maxes and not necessarily from the same moment
meter.CreateObservableGauge(name, Observe, "seconds");
}

Expand Down
9 changes: 5 additions & 4 deletions src/EventStore.Core/Telemetry/DurationMaxTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ public class DurationMaxTracker : IDurationMaxTracker {
_clock = clock ?? Clock.Instance;
_recentMax = new RecentMax<double>(expectedScrapeIntervalSeconds);

_maxTags = new KeyValuePair<string, object>[] {
new("name", name),
new("range", $"{_recentMax.MinPeriodSeconds}-{_recentMax.MaxPeriodSeconds} seconds"),
};
var maxTags = new List<KeyValuePair<string, object>>();
if (!string.IsNullOrWhiteSpace(name))
maxTags.Add(new("name", name));
maxTags.Add(new("range", $"{_recentMax.MinPeriodSeconds}-{_recentMax.MaxPeriodSeconds} seconds"));
_maxTags = maxTags.ToArray();

metric.Add(this);
}
Expand Down
28 changes: 28 additions & 0 deletions src/EventStore.Core/Telemetry/MaxMetric.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Collections.Generic;
using System.Diagnostics.Metrics;

namespace EventStore.Core.Telemetry;

public class MaxMetric<T> where T : struct {
private readonly List<MaxTracker<T>> _trackers = new();

public MaxMetric(Meter meter, string name) {
// gauge rather than updowncounter because the dimensions wont make sense to sum,
// because they are maxes and not necessarily from the same moment
meter.CreateObservableGauge(name, Observe);
}

public void Add(MaxTracker<T> tracker) {
lock (_trackers) {
_trackers.Add(tracker);
}
}

private IEnumerable<Measurement<T>> Observe() {
lock (_trackers) {
foreach (var tracker in _trackers) {
yield return tracker.Observe();
}
}
}
}
Loading

0 comments on commit f83d0ba

Please sign in to comment.