Skip to content

Commit

Permalink
migrated io metrics from Counter to ObservableCounter instrument
Browse files Browse the repository at this point in the history
  • Loading branch information
hunter1703 committed May 11, 2023
1 parent e6d52cd commit 2a1f904
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 40 deletions.
9 changes: 2 additions & 7 deletions src/EventStore.Core.XUnit.Tests/Index/IndexTrackerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ public class IndexTrackerTests : IDisposable {
var meter = new Meter($"{typeof(IndexTrackerTests)}");
_listener = new TestMeterListener<long>(meter);

var eventMetric = meter.CreateCounter<long>("eventstore-io", unit: "events");
_sut = new IndexTracker(
new CounterSubMetric<long>(
eventMetric,
new KeyValuePair<string, object>("activity", "written")));
var eventMetric = new CounterMetric(meter, "eventstore-io", "events");
_sut = new IndexTracker(new CounterSubMetric(eventMetric, new[] {new KeyValuePair<string, object>("activity", "written")}));
}

public void Dispose() {
Expand All @@ -49,8 +46,6 @@ public class IndexTrackerTests : IDisposable {
}

private void AssertMeasurements(long expectedEventsWritten) {
_listener.Observe();

Assert.Collection(
_listener.RetrieveMeasurements("eventstore-io-events"),
m => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ public class TFChunkTrackerTests : IDisposable {
public TFChunkTrackerTests() {
var meter = new Meter($"{typeof(TFChunkTrackerTests)}");
_listener = new TestMeterListener<long>(meter);
var byteMetric = meter.CreateCounter<long>("eventstore-io", unit: "bytes");
var eventMetric = meter.CreateCounter<long>("eventstore-io", unit: "events");
var byteMetric = new CounterMetric(meter, "eventstore-io", unit: "bytes");
var eventMetric = new CounterMetric(meter, "eventstore-io", unit: "events");

var readTag = new KeyValuePair<string, object>("activity", "read");
_sut = new TFChunkTracker(
readBytes: new CounterSubMetric<long>(byteMetric, readTag),
readEvents: new CounterSubMetric<long>(eventMetric, readTag));
readBytes: new CounterSubMetric(byteMetric, new[] {readTag}),
readEvents: new CounterSubMetric(eventMetric, new[] {readTag}));
}

public void Dispose() {
Expand All @@ -49,8 +49,8 @@ public class TFChunkTrackerTests : IDisposable {
_sut.OnRead(system);
_listener.Observe();

AssertEventsRead(null);
AssertBytesRead(null);
AssertEventsRead(0);
AssertBytesRead(0);
}

[Fact]
Expand All @@ -59,8 +59,8 @@ public class TFChunkTrackerTests : IDisposable {
_sut.OnRead(system);
_listener.Observe();

AssertEventsRead(null);
AssertBytesRead(null);
AssertEventsRead(0);
AssertBytesRead(0);
}

private void AssertEventsRead(long? expectedEventsRead) =>
Expand All @@ -70,7 +70,6 @@ public class TFChunkTrackerTests : IDisposable {
AssertMeasurements("eventstore-io-bytes", expectedBytesRead);

private void AssertMeasurements(string instrumentName, long? expectedValue) {
_listener.Observe();
var actual = _listener.RetrieveMeasurements(instrumentName);

if (expectedValue is null) {
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Core/Index/IndexTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ public interface IIndexTracker {
}

public class IndexTracker : IIndexTracker {
private readonly CounterSubMetric<long> _indexedEvents;
private readonly CounterSubMetric _indexedEvents;

public IndexTracker(CounterSubMetric<long> indexedEvents) {
public IndexTracker(CounterSubMetric indexedEvents) {
_indexedEvents = indexedEvents;
}

Expand Down
12 changes: 6 additions & 6 deletions src/EventStore.Core/MetricsBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public static class MetricsBootstrapper {
var gossipProcessingMetric = new DurationMetric(coreMeter, "eventstore-gossip-processing-duration");
var queueQueueingDurationMaxMetric = new DurationMaxMetric(coreMeter, "eventstore-queue-queueing-duration-max");
var queueProcessingDurationMetric = new DurationMetric(coreMeter, "eventstore-queue-processing-duration");
var byteMetric = coreMeter.CreateCounter<long>("eventstore-io", unit: "bytes");
var eventMetric = coreMeter.CreateCounter<long>("eventstore-io", unit: "events");
var byteMetric = new CounterMetric(coreMeter, "eventstore-io", unit: "bytes");
var eventMetric = new CounterMetric(coreMeter, "eventstore-io", unit: "events");

// incoming grpc calls
var enabledCalls = conf.IncomingGrpcCalls.Where(kvp => kvp.Value).Select(kvp => kvp.Key).ToArray();
Expand All @@ -104,15 +104,15 @@ public static class MetricsBootstrapper {
if (conf.Events.TryGetValue(Conf.EventTracker.Read, out var readEnabled) && readEnabled) {
var readTag = new KeyValuePair<string, object>("activity", "read");
trackers.TransactionFileTracker = new TFChunkTracker(
readBytes: new CounterSubMetric<long>(byteMetric, readTag),
readEvents: new CounterSubMetric<long>(eventMetric, readTag));
readBytes: new CounterSubMetric(byteMetric, new[] {readTag}),
readEvents: new CounterSubMetric(eventMetric, new[] {readTag}));
}

// from a users perspective an event is written when it is indexed: thats when it can be read.
if (conf.Events.TryGetValue(Conf.EventTracker.Written, out var writtenEnabled) && writtenEnabled) {
trackers.IndexTracker = new IndexTracker(new CounterSubMetric<long>(
trackers.IndexTracker = new IndexTracker(new CounterSubMetric(
eventMetric,
new KeyValuePair<string, object>("activity", "written")));
new[] {new KeyValuePair<string, object>("activity", "written")}));
}

// gossip
Expand Down
27 changes: 27 additions & 0 deletions src/EventStore.Core/Telemetry/CounterMetric.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.Collections.Generic;
using System.Diagnostics.Metrics;

namespace EventStore.Core.Telemetry {
public class CounterMetric {
private readonly List<CounterSubMetric> _subMetrics = new();
private readonly object _lock = new();

public CounterMetric(Meter meter, string name, string unit = null) {
meter.CreateObservableCounter(name, Observe, unit);
}

public void Add(CounterSubMetric subMetric) {
lock (_lock) {
_subMetrics.Add(subMetric);
}
}

private IEnumerable<Measurement<long>> Observe() {
lock (_lock) {
foreach (CounterSubMetric subMetric in _subMetrics) {
yield return subMetric.Observe();
}
}
}
}
}
30 changes: 18 additions & 12 deletions src/EventStore.Core/Telemetry/CounterSubMetric.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Threading;

namespace EventStore.Core.Telemetry {
public class CounterSubMetric<T> where T : struct {
private readonly Counter<T> _metric;
private readonly KeyValuePair<string, object> _tag;
namespace EventStore.Core.Telemetry;

public CounterSubMetric(Counter<T> metric, KeyValuePair<string, object> tag) {
_metric = metric;
_tag = tag;
}
public class CounterSubMetric {
private readonly KeyValuePair<string, object>[] _tags;
private long _counter;

public void Add(T delta) {
_metric.Add(delta, _tag);
}
public CounterSubMetric(CounterMetric metric, KeyValuePair<string, object>[] tags) {
_tags = tags;
metric.Add(this);
}

public void Add(long delta) {
Interlocked.Add(ref _counter, delta);
}

public Measurement<long> Observe() {
return new Measurement<long>(Interlocked.Read(ref _counter), _tags);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace EventStore.Core.TransactionLog.Chunks;

public class TFChunkTracker : ITransactionFileTracker {
private readonly CounterSubMetric<long> _readBytes;
private readonly CounterSubMetric<long> _readEvents;
private readonly CounterSubMetric _readBytes;
private readonly CounterSubMetric _readEvents;

public TFChunkTracker(
CounterSubMetric<long> readBytes,
CounterSubMetric<long> readEvents) {
CounterSubMetric readBytes,
CounterSubMetric readEvents) {

_readBytes = readBytes;
_readEvents = readEvents;
Expand Down

0 comments on commit 2a1f904

Please sign in to comment.