Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/Elastic.Documentation/Diagnostics/DiagnosticsChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public sealed class DiagnosticsChannel : IDisposable
{
private readonly Channel<Diagnostic> _channel;
private readonly CancellationTokenSource _ctxSource;

public ChannelReader<Diagnostic> Reader => _channel.Reader;

public Cancel CancellationToken => _ctxSource.Token;
Expand All @@ -33,14 +34,10 @@ public void TryComplete(Exception? exception = null)

public ValueTask<bool> WaitToWrite(Cancel ctx) => _channel.Writer.WaitToWriteAsync(ctx);

public void Write(Diagnostic diagnostic)
{
var written = _channel.Writer.TryWrite(diagnostic);
if (!written)
{
//TODO
}
}
// Unbounded channel: TryWrite only fails if the writer is completed, which
// means a producer raced past StopAsync/DisposeAsync. Drop silently —
// diagnostics must never throw back into the caller.
public void Write(Diagnostic diagnostic) => _ = _channel.Writer.TryWrite(diagnostic);

public void Dispose() => _ctxSource.Dispose();
}
Expand Down
49 changes: 36 additions & 13 deletions src/Elastic.Documentation/Diagnostics/DiagnosticsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information

using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO.Abstractions;

namespace Elastic.Documentation.Diagnostics;
Expand All @@ -21,6 +20,12 @@ public class DiagnosticsCollector(IReadOnlyCollection<IDiagnosticsOutput> output
public int Hints => _hints;

private Task? _started;
// True once the background reader delegate has actually begun executing.
// _started becoming non-null is not enough: Task.Run short-circuits to a
// canceled Task when given an already-canceled token, and the delegate
// never runs. StopAsync uses this to decide whether awaiting _started is
// meaningful or whether the channel is guaranteed to have no drainer.
private volatile bool _readerStarted;

public HashSet<string> OffendingFiles { get; } = [];

Expand All @@ -30,6 +35,8 @@ public class DiagnosticsCollector(IReadOnlyCollection<IDiagnosticsOutput> output

public bool NoHints { get; set; }

public bool IsStarted => _readerStarted;

public virtual DiagnosticsCollector StartAsync(Cancel ctx)
{
_ = EnsureStarted(ctx);
Expand All @@ -45,6 +52,7 @@ private Task EnsureStarted(Cancel cancellationToken)
_started = Task.Run(async () =>
{
_ = await Channel.WaitToWrite(cancellationToken);
_readerStarted = true;
while (!Channel.CancellationToken.IsCancellationRequested)
{
try
Expand All @@ -61,18 +69,18 @@ private Task EnsureStarted(Cancel cancellationToken)
Drain();
}, cancellationToken);
return _started;
}

void Drain()
private void Drain()
{
while (Channel.Reader.TryRead(out var item))
{
while (Channel.Reader.TryRead(out var item))
{
if (item.Severity == Severity.Hint && NoHints)
continue;
HandleItem(item);
_ = OffendingFiles.Add(item.File);
foreach (var output in outputs)
output.Write(item);
}
if (item.Severity == Severity.Hint && NoHints)
continue;
HandleItem(item);
_ = OffendingFiles.Add(item.File);
foreach (var output in outputs)
output.Write(item);
}
}

Expand All @@ -91,9 +99,24 @@ protected virtual void HandleItem(Diagnostic diagnostic) { }
public virtual async Task StopAsync(Cancel cancellationToken)
{
Channel.TryComplete();
if (_started is not null)
// StartAsync was never called. Items may sit in the channel but
// nobody is coming to drain them — awaiting Channel.Reader.Completion
// here would deadlock. Returning is the correct behaviour for
// fire-and-forget collectors (the channel dies with the instance).
if (_started is null)
return;

try
{
await _started;
await Channel.Reader.Completion;
}
catch (OperationCanceledException)
{
// Reader was canceled before its final Drain(); mop up below.
}
// Defensive: if the reader exited early via cancellation, items may
// still be queued. Drain them synchronously so they're not lost.
Drain();
}

public void EmitCrossLink(string link) => CrossLinks.Add(link);
Expand Down
10 changes: 10 additions & 0 deletions src/Elastic.Documentation/Diagnostics/IDiagnosticsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public interface IDiagnosticsCollector : IAsyncDisposable
HashSet<string> OffendingFiles { get; }
ConcurrentDictionary<string, bool> InUseSubstitutionKeys { get; }

/// True once the background reader is actively draining the channel.
bool IsStarted { get; }

Task StartAsync(Cancel cancellationToken);
Task StopAsync(Cancel cancellationToken);

Expand Down Expand Up @@ -51,6 +54,13 @@ public interface IDiagnosticsCollector : IAsyncDisposable

async Task WaitForDrain()
{
if (!IsStarted)
{
throw new InvalidOperationException(
"WaitForDrain called on a collector that was never started; no reader is draining the channel. " +
"Call StartAsync first or dispose the collector to drain synchronously.");
}

var start = DateTime.UtcNow;
while (Channel.Reader.TryPeek(out _))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using AwesomeAssertions;
using Elastic.Documentation.Diagnostics;

namespace Elastic.Changelog.Tests.Changelogs;

public class DiagnosticsCollectorDisposeTests
{
private sealed class RecordingOutput : IDiagnosticsOutput
{
public List<Diagnostic> Items { get; } = [];
public void Write(Diagnostic diagnostic) => Items.Add(diagnostic);
}

private static async Task ShouldComplete(Task task, TimeSpan timeout, string because)
{
using var cts = new CancellationTokenSource(timeout);
var completed = await Task.WhenAny(task, Task.Delay(Timeout.Infinite, cts.Token));
completed.Should().BeSameAs(task, because);
await task;
}

// Regression: the changelog-scrubber lambda used a DiagnosticsCollector without calling
// StartAsync. Emitting a diagnostic and then disposing deadlocked on
// Channel.Reader.Completion because nothing was draining the channel,
// causing the lambda to hit its 180s timeout.
[Fact]
public async Task DisposeAsync_WithoutStartAsyncAfterEmit_DoesNotHang()
{
var output = new RecordingOutput();
var collector = new DiagnosticsCollector([output]);
collector.EmitWarning("file.yaml", "test warning that nobody is reading");

await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5),
"DisposeAsync must not deadlock when StartAsync was never called");

collector.Warnings.Should().Be(1, "severity counters update regardless of reader state");
collector.IsStarted.Should().BeFalse();
collector.OffendingFiles.Should().BeEmpty("OffendingFiles is only populated by the background reader");
output.Items.Should().BeEmpty("IDiagnosticsOutput sinks are only invoked by the background reader");
}

[Fact]
public async Task StopAsync_WithoutStartAsyncAfterEmit_DoesNotHang()
{
var output = new RecordingOutput();
var collector = new DiagnosticsCollector([output]);
collector.EmitError("file.yaml", "test error that nobody is reading");

await ShouldComplete(collector.StopAsync(CancellationToken.None), TimeSpan.FromSeconds(5),
"StopAsync must not deadlock when StartAsync was never called");

collector.Errors.Should().Be(1);
collector.IsStarted.Should().BeFalse();
output.Items.Should().BeEmpty();
}

[Fact]
public async Task DisposeAsync_WithoutStartAsyncAndNoEmissions_DoesNotHang()
{
var collector = new DiagnosticsCollector([]);

await ShouldComplete(collector.DisposeAsync().AsTask(), TimeSpan.FromSeconds(5),
"Instantiate-and-dispose with no emissions must be a no-op");

collector.IsStarted.Should().BeFalse();
collector.Warnings.Should().Be(0);
collector.Errors.Should().Be(0);
}

[Fact]
public async Task WaitForDrain_WithoutStartAsync_ThrowsImmediately()
{
var collector = new DiagnosticsCollector([]);
collector.EmitWarning(string.Empty, "queued");

Func<Task> act = () => ((IDiagnosticsCollector)collector).WaitForDrain();
_ = await act.Should().ThrowAsync<InvalidOperationException>();
}
}
Loading