diff --git a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs index 7cd0b3015d5f69..6dcc244f569082 100644 --- a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs +++ b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs @@ -159,6 +159,7 @@ public static void LeaveDebugMode() { } protected void OnExited() { } public (byte[] StandardOutput, byte[] StandardError) ReadAllBytes(System.TimeSpan? timeout = default(System.TimeSpan?)) { throw null; } public System.Threading.Tasks.Task<(byte[] StandardOutput, byte[] StandardError)> ReadAllBytesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Collections.Generic.IAsyncEnumerable ReadAllLinesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public (string StandardOutput, string StandardError) ReadAllText(System.TimeSpan? timeout = default(System.TimeSpan?)) { throw null; } public System.Threading.Tasks.Task<(string StandardOutput, string StandardError)> ReadAllTextAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public void Refresh() { } @@ -225,6 +226,14 @@ public sealed partial class ProcessExitStatus public int ExitCode { get { throw null; } } public System.Runtime.InteropServices.PosixSignal? Signal { get { throw null; } } } + public readonly partial struct ProcessOutputLine + { + private readonly object _dummy; + private readonly int _dummyPrimitive; + public ProcessOutputLine(string content, bool standardError) { throw null; } + public string Content { get { throw null; } } + public bool StandardError { get { throw null; } } + } public enum ProcessPriorityClass { Normal = 32, diff --git a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj index 6db2a72aeb08fa..ed9d62efe66324 100644 --- a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj +++ b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj @@ -22,6 +22,7 @@ + @@ -423,6 +424,7 @@ + diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index 71ec5a984a1096..8ff5443f4e2e16 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -2,10 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Buffers; +using System.Collections.Generic; using System.IO; -using System.Runtime.InteropServices; +using System.Runtime.CompilerServices; using System.Text; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Win32.SafeHandles; @@ -256,6 +258,92 @@ private static async Task> ReadPipeToBufferAsync(Stream strea } } + /// + /// Asynchronously reads all standard output and standard error of the process as lines of text, + /// interleaving them as they become available. + /// + /// + /// A token to cancel the asynchronous operation. + /// + /// + /// An async enumerable of instances representing the lines + /// read from standard output and standard error. + /// + /// + /// Lines from standard output and standard error are yielded as they become available. + /// When the consumer stops enumerating early (for example, by breaking out of + /// ), any pending read operations are canceled. + /// + /// + /// Standard output or standard error has not been redirected. + /// -or- + /// A redirected stream has already been used for synchronous or asynchronous reading. + /// + /// + /// The was canceled. + /// + /// + /// The process has been disposed. + /// + public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ValidateReadAllState(); + + StreamReader outputReader = _standardOutput!; + StreamReader errorReader = _standardError!; + + Channel channel = Channel.CreateBounded(0); + bool firstCompleted = false; + + CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + Task outputTask = ReadToChannelAsync(outputReader, standardError: false, linkedCts.Token); + Task errorTask = ReadToChannelAsync(errorReader, standardError: true, linkedCts.Token); + + try + { + while (await channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + while (channel.Reader.TryRead(out ProcessOutputLine line)) + { + yield return line; + } + } + } + finally + { + linkedCts.Cancel(); + + // Ensure both tasks complete before disposing the CancellationTokenSource. + // The tasks handle all exceptions internally, so they always run to completion. + await outputTask.ConfigureAwait(false); + await errorTask.ConfigureAwait(false); + + linkedCts.Dispose(); + } + + async Task ReadToChannelAsync(StreamReader reader, bool standardError, CancellationToken ct) + { + try + { + while (await reader.ReadLineAsync(ct).ConfigureAwait(false) is string line) + { + await channel.Writer.WriteAsync(new ProcessOutputLine(line, standardError), ct).ConfigureAwait(false); + } + } + catch (Exception ex) + { + channel.Writer.TryComplete(ex); + return; + } + + if (Interlocked.Exchange(ref firstCompleted, true)) + { + channel.Writer.TryComplete(); + } + } + } + /// /// Validates that the process is not disposed, both stdout and stderr are redirected, /// and neither stream has been used (mode must be Undefined). Sets both streams to sync mode. diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs new file mode 100644 index 00000000000000..c8d9d9ad39bc2f --- /dev/null +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs @@ -0,0 +1,39 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Diagnostics +{ + /// + /// Represents a single line of text read from a process's standard output or standard error stream. + /// + public readonly struct ProcessOutputLine + { + /// + /// Initializes a new instance of the struct. + /// + /// The text content of the output line. + /// + /// if the line was read from standard error; + /// otherwise, . + /// + public ProcessOutputLine(string content, bool standardError) + { + Content = content ?? throw new ArgumentNullException(nameof(content)); + StandardError = standardError; + } + + /// + /// Gets the text content of the output line. + /// + public string Content { get; } + + /// + /// Gets a value that indicates whether the line was read from standard error. + /// + /// + /// if the line was read from standard error; + /// otherwise, . + /// + public bool StandardError { get; } + } +} diff --git a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs new file mode 100644 index 00000000000000..4b2967e1085611 --- /dev/null +++ b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs @@ -0,0 +1,372 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.DotNet.RemoteExecutor; +using Xunit; + +namespace System.Diagnostics.Tests +{ + public class ProcessStreamingTests : ProcessTestBase + { + private const string DontPrintAnything = "DO_NOT_PRINT"; + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ThrowsAfterDispose() + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.Start(); + Assert.True(process.WaitForExit(WaitInMS)); + + process.Dispose(); + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ThrowsWhenNoStreamsRedirected() + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.Start(); + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAllLinesAsync_ThrowsWhenOnlyOutputOrErrorIsRedirected(bool standardOutput) + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.StartInfo.RedirectStandardOutput = standardOutput; + process.StartInfo.RedirectStandardError = !standardOutput; + process.Start(); + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAllLinesAsync_ThrowsWhenOutputOrErrorIsInSyncMode(bool standardOutput) + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + // Access the StreamReader property to set the stream to sync mode + _ = standardOutput ? process.StandardOutput : process.StandardError; + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAllLinesAsync_ThrowsWhenOutputOrErrorIsInAsyncMode(bool standardOutput) + { + Process process = CreateProcess(RemotelyInvokable.StreamBody); + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + if (standardOutput) + { + process.BeginOutputReadLine(); + } + else + { + process.BeginErrorReadLine(); + } + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + if (standardOutput) + { + process.CancelOutputRead(); + } + else + { + process.CancelErrorRead(); + } + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData("hello", "world")] + [InlineData("just output", "")] + [InlineData("", "just error")] + [InlineData("", "")] + public async Task ReadAllLinesAsync_ReadsBothOutputAndError(string standardOutput, string standardError) + { + using Process process = StartLinePrintingProcess( + string.IsNullOrEmpty(standardOutput) ? DontPrintAnything : standardOutput, + string.IsNullOrEmpty(standardError) ? DontPrintAnything : standardError); + + List capturedOutput = new(); + List capturedError = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + if (line.StandardError) + { + capturedError.Add(line.Content); + } + else + { + capturedOutput.Add(line.Content); + } + } + + if (string.IsNullOrEmpty(standardOutput)) + { + Assert.Empty(capturedOutput); + } + else + { + Assert.Equal(new[] { standardOutput }, capturedOutput); + } + + if (string.IsNullOrEmpty(standardError)) + { + Assert.Empty(capturedError); + } + else + { + Assert.Equal(new[] { standardError }, capturedError); + } + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ReadsInterleavedOutput() + { + const int iterations = 100; + using Process process = CreateProcess(() => + { + for (int i = 0; i < iterations; i++) + { + Console.Out.WriteLine($"out{i}"); + Console.Out.Flush(); + Console.Error.WriteLine($"err{i}"); + Console.Error.Flush(); + } + + return RemoteExecutor.SuccessExitCode; + }); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + List capturedOutput = new(); + List capturedError = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + if (line.StandardError) + { + capturedError.Add(line.Content); + } + else + { + capturedOutput.Add(line.Content); + } + } + + List expectedOutput = new(); + List expectedError = new(); + for (int i = 0; i < iterations; i++) + { + expectedOutput.Add($"out{i}"); + expectedError.Add($"err{i}"); + } + + Assert.Equal(expectedOutput, capturedOutput); + Assert.Equal(expectedError, capturedError); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ReadsLargeOutput() + { + const int lineCount = 1000; + using Process process = CreateProcess(() => + { + for (int i = 0; i < lineCount; i++) + { + Console.Out.WriteLine($"line{i}"); + } + + return RemoteExecutor.SuccessExitCode; + }); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + List capturedOutput = new(); + List capturedError = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + if (line.StandardError) + { + capturedError.Add(line.Content); + } + else + { + capturedOutput.Add(line.Content); + } + } + + for (int i = 0; i < lineCount; i++) + { + Assert.Equal($"line{i}", capturedOutput[i]); + } + + Assert.Empty(capturedError); + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ThrowsOperationCanceledOnCancellation() + { + Process process = CreateProcess(RemotelyInvokable.ReadLine); + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.StartInfo.RedirectStandardInput = true; + process.Start(); + + try + { + using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); + + await Assert.ThrowsAnyAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync(cts.Token)) + { + } + }); + } + finally + { + process.Kill(); + } + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ProcessOutputLineProperties() + { + using Process process = StartLinePrintingProcess("stdout_line", "stderr_line"); + + List allLines = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + allLines.Add(line); + } + + Assert.Single(allLines, line => line.Content == "stdout_line" && !line.StandardError); + Assert.Single(allLines, line => line.Content == "stderr_line" && line.StandardError); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_StopsCleanlyWhenConsumerBreaksEarly() + { + using Process process = CreateProcess(() => + { + Console.Out.WriteLine("first"); + Console.Out.Flush(); + Console.Out.WriteLine("second"); + Console.Out.Flush(); + Console.Error.WriteLine("error1"); + Console.Error.Flush(); + + return RemoteExecutor.SuccessExitCode; + }); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + ProcessOutputLine? firstLine = null; + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + firstLine = line; + break; // stop after first line + } + + Assert.NotNull(firstLine); + Assert.NotNull(firstLine.Value.Content); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + private Process StartLinePrintingProcess(string stdOutText, string stdErrText) + { + Process process = CreateProcess((stdOut, stdErr) => + { + if (stdOut != DontPrintAnything) + { + Console.Out.WriteLine(stdOut); + } + + if (stdErr != DontPrintAnything) + { + Console.Error.WriteLine(stdErr); + } + + return RemoteExecutor.SuccessExitCode; + }, stdOutText, stdErrText); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + return process; + } + } +} diff --git a/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj b/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj index dd73a7baa7f533..3cb2d575046965 100644 --- a/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj +++ b/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj @@ -32,6 +32,7 @@ +