diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs index a62b81836..79d24b17b 100644 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs +++ b/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Reactive.Concurrency; using System.Threading.Tasks; using FluentAssertions; using Microsoft.DotNet.Interactive.Jupyter.Protocol; diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs index 138bd6d51..e25aa3a08 100644 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs +++ b/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs @@ -7,6 +7,10 @@ using Microsoft.DotNet.Interactive.Jupyter.Protocol; using WorkspaceServer.Kernel; using Xunit; +using System.Reactive.Linq; +using Microsoft.DotNet.Interactive.Events; +using FluentAssertions.Extensions; +using System.Reactive.Concurrency; namespace Microsoft.DotNet.Interactive.Jupyter.Tests { diff --git a/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs b/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs index c49796654..26bbf7918 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Concurrent; using System.Linq; +using System.Reactive.Concurrency; using System.Text.RegularExpressions; using System.Threading.Tasks; using Microsoft.DotNet.Interactive.Commands; @@ -12,14 +13,15 @@ namespace Microsoft.DotNet.Interactive.Jupyter { - public class CompleteRequestHandler: RequestHandlerBase + public class CompleteRequestHandler : RequestHandlerBase { private static readonly Regex _lastToken = new Regex(@"(?\S+)$", RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant | RegexOptions.Multiline); - public CompleteRequestHandler(IKernel kernel) : base(kernel) + public CompleteRequestHandler(IKernel kernel, IScheduler scheduler = null) + : base(kernel, scheduler ?? CurrentThreadScheduler.Instance) { - + } public async Task Handle(JupyterRequestContext context) @@ -34,22 +36,18 @@ public async Task Handle(JupyterRequestContext context) InFlightRequests[command] = openRequest; - var kernelResult = await Kernel.SendAsync(command); - openRequest.AddDisposable(kernelResult.KernelEvents.Subscribe(OnKernelResultEvent)); - + await Kernel.SendAsync(command); } - void OnKernelResultEvent(IKernelEvent value) + protected override void OnKernelEvent(IKernelEvent @event) { - switch (value) + switch (@event) { case CompletionRequestCompleted completionRequestCompleted: OnCompletionRequestCompleted(completionRequestCompleted, InFlightRequests); break; case CompletionRequestReceived _: break; - default: - throw new NotSupportedException(); } } @@ -68,7 +66,7 @@ private static void OnCompletionRequestCompleted(CompletionRequestCompleted comp openRequest.Context.ServerChannel.Send(completeReply); openRequest.Context.RequestHandlerStatus.SetAsIdle(); openRequest.Dispose(); - + } private static int ComputeReplacementStartPosition(string code, int cursorPosition) diff --git a/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs b/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs index 9440d8215..f8f8c43f9 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs @@ -5,6 +5,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Reactive.Concurrency; using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.Interactive.Commands; @@ -17,7 +18,8 @@ public class ExecuteRequestHandler : RequestHandlerBase { private int _executionCount; - public ExecuteRequestHandler(IKernel kernel) : base(kernel) + public ExecuteRequestHandler(IKernel kernel, IScheduler scheduler = null) + : base(kernel, scheduler ?? CurrentThreadScheduler.Instance) { } @@ -36,8 +38,7 @@ public async Task Handle(JupyterRequestContext context) try { - var kernelResult = await Kernel.SendAsync(command); - openRequest.AddDisposable(kernelResult.KernelEvents.Subscribe(OnKernelResultEvent)); + await Kernel.SendAsync(command); } catch (Exception e) { @@ -84,9 +85,9 @@ public async Task Handle(JupyterRequestContext context) return transient; } - void OnKernelResultEvent(IKernelEvent value) + protected override void OnKernelEvent(IKernelEvent @event) { - switch (value) + switch (@event) { case ValueProduced valueProduced: OnValueProduced(valueProduced, InFlightRequests); @@ -101,8 +102,6 @@ void OnKernelResultEvent(IKernelEvent value) case IncompleteCodeSubmissionReceived _: case CompleteCodeSubmissionReceived _: break; - default: - throw new NotSupportedException(); } } diff --git a/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs b/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs index 70217f913..c82b1bcd4 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Reactive.Concurrency; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; using Clockwise; using Microsoft.DotNet.Interactive.Jupyter.Protocol; @@ -24,8 +26,16 @@ public class JupyterRequestContextHandler : ICommandHandler + { + var thread = new Thread(t); + thread.IsBackground = true; + thread.Name = "MessagePump"; + return thread; + }); + + _executeHandler = new ExecuteRequestHandler(kernel, scheduler); + _completeHandler = new CompleteRequestHandler(kernel, scheduler); if (packageRegistry == null) { diff --git a/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs b/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs index bd43725e5..9717f7778 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs @@ -3,9 +3,11 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; +using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Linq; using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.Events; using Microsoft.DotNet.Interactive.Jupyter.Protocol; namespace Microsoft.DotNet.Interactive.Jupyter @@ -13,13 +15,20 @@ namespace Microsoft.DotNet.Interactive.Jupyter public abstract class RequestHandlerBase : IDisposable where T : JupyterMessageContent { + private readonly CompositeDisposable _disposables = new CompositeDisposable(); + protected IObservable _kernelEventSource { get; } - protected RequestHandlerBase(IKernel kernel) + protected RequestHandlerBase(IKernel kernel, IScheduler scheduler) { Kernel = kernel ?? throw new ArgumentNullException(nameof(kernel)); + + _kernelEventSource = Kernel.KernelEvents.ObserveOn(scheduler ?? throw new ArgumentNullException(nameof(scheduler))); + _disposables.Add(_kernelEventSource.Subscribe(OnKernelEvent)); } + protected abstract void OnKernelEvent(IKernelEvent @event); + protected static T GetJupyterRequest(JupyterRequestContext context) { var request = context.GetRequestContent() ?? diff --git a/Microsoft.DotNet.Interactive.Jupyter/Shell.cs b/Microsoft.DotNet.Interactive.Jupyter/Shell.cs index 595a25738..f1bb78fbb 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/Shell.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/Shell.cs @@ -81,7 +81,7 @@ public async Task StartAsync(CancellationToken cancellationToken) activity.Info("Received: {message}", message.ToJson()); var status = new RequestHandlerStatus(message.Header, new MessageSender(_ioPubSocket, _signatureValidator)); - + switch (message.Header.MessageType) { @@ -107,8 +107,6 @@ public async Task StartAsync(CancellationToken cancellationToken) break; } - - } } } diff --git a/Microsoft.DotNet.Interactive/Events/ValueProduced.cs b/Microsoft.DotNet.Interactive/Events/ValueProduced.cs index c50ea337d..4cc06d69b 100644 --- a/Microsoft.DotNet.Interactive/Events/ValueProduced.cs +++ b/Microsoft.DotNet.Interactive/Events/ValueProduced.cs @@ -9,9 +9,9 @@ namespace Microsoft.DotNet.Interactive.Events public class ValueProduced : KernelEventBase { public ValueProduced(object value, - SubmitCode submitCode, + IKernelCommand command, bool isLastValue = false, - IReadOnlyCollection formattedValues = null) : base(submitCode) + IReadOnlyCollection formattedValues = null) : base(command) { Value = value; IsLastValue = isLastValue; diff --git a/WorkspaceServer.Tests/Kernel/CSharpKernelTestBase.cs b/WorkspaceServer.Tests/Kernel/CSharpKernelTestBase.cs index 49d66c1c8..a01549113 100644 --- a/WorkspaceServer.Tests/Kernel/CSharpKernelTestBase.cs +++ b/WorkspaceServer.Tests/Kernel/CSharpKernelTestBase.cs @@ -8,6 +8,8 @@ using Pocket; using WorkspaceServer.Kernel; using Xunit.Abstractions; +using System.Reactive.Linq; +using System.Reactive; namespace WorkspaceServer.Tests.Kernel { @@ -24,14 +26,14 @@ protected CSharpKernel CreateKernel() .LogEventsToPocketLogger(); DisposeAfterTest( - kernel.KernelEvents.Subscribe(KernelEvents.Add)); + kernel.KernelEvents.Timestamp().Subscribe(KernelEvents.Add)); return kernel; } private readonly CompositeDisposable _disposables = new CompositeDisposable(); - protected IList KernelEvents { get; } = new List(); + protected IList> KernelEvents { get; } = new List>(); protected void DisposeAfterTest(IDisposable disposable) { diff --git a/WorkspaceServer.Tests/Kernel/CSharpKernelTests.cs b/WorkspaceServer.Tests/Kernel/CSharpKernelTests.cs index bd96b79c9..6389ea0db 100644 --- a/WorkspaceServer.Tests/Kernel/CSharpKernelTests.cs +++ b/WorkspaceServer.Tests/Kernel/CSharpKernelTests.cs @@ -16,6 +16,9 @@ using WorkspaceServer.Kernel; using Xunit; using Xunit.Abstractions; +using System.Reactive.Linq; +using FluentAssertions.Extensions; +using System.Reactive; namespace WorkspaceServer.Tests.Kernel { @@ -32,7 +35,8 @@ public async Task it_returns_the_result_of_a_non_null_expression() await kernel.SendAsync(new SubmitCode("123")); - KernelEvents.OfType() + KernelEvents.ValuesOnly() + .OfType() .Last() .Value .Should() @@ -49,18 +53,18 @@ public async Task when_it_throws_exception_after_a_value_was_produced_then_only_ await kernel.SendAsync(new SubmitCode("adddddddddd")); var (failure, lastCodeSubmissionEvaluationFailedPosition) = KernelEvents - .Select((error, pos) => (error, pos)) - .Single(t => t.error is CodeSubmissionEvaluationFailed); + .Select((t, pos) => (t.Value, pos)) + .Single(t => t.Value is CodeSubmissionEvaluationFailed); ((CodeSubmissionEvaluationFailed)failure).Exception.Should().BeOfType(); var lastCodeSubmissionPosition = KernelEvents - .Select((e, pos) => (e, pos)) - .Last(t => t.e is CodeSubmissionReceived).pos; + .Select((e, pos) => (e.Value, pos)) + .Last(t => t.Value is CodeSubmissionReceived).pos; var lastValueProducedPosition = KernelEvents - .Select((e, pos) => (e, pos)) - .Last(t => t.e is ValueProduced).pos; + .Select((e, pos) => (e.Value, pos)) + .Last(t => t.Value is ValueProduced).pos; lastValueProducedPosition .Should() @@ -78,7 +82,8 @@ public async Task it_returns_exceptions_thrown_in_user_code() await kernel.SendAsync(new SubmitCode("using System;")); await kernel.SendAsync(new SubmitCode("throw new NotImplementedException();")); - KernelEvents.Last() + KernelEvents.ValuesOnly() + .Last() .Should() .BeOfType() .Which @@ -95,7 +100,8 @@ public async Task it_returns_diagnostics() await kernel.SendAsync(new SubmitCode("using System;")); await kernel.SendAsync(new SubmitCode("aaaadd")); - KernelEvents.Last() + KernelEvents.ValuesOnly() + .Last() .Should() .BeOfType() .Which @@ -114,11 +120,11 @@ public async Task it_notifies_when_submission_is_complete() await kernel.SendAsync(new SubmitCode("12;")); KernelEvents.Should() - .NotContain(e => e is ValueProduced); + .NotContain(e => e.Value is ValueProduced); KernelEvents .Should() - .Contain(e => e is CodeSubmissionEvaluated); + .Contain(e => e.Value is CodeSubmissionEvaluated); } [Fact] @@ -129,21 +135,23 @@ public async Task it_notifies_when_submission_is_incomplete() await kernel.SendAsync(new SubmitCode("var a =")); KernelEvents.Should() - .NotContain(e => e is ValueProduced); + .NotContain(e => e.Value is ValueProduced); - KernelEvents.Last() + KernelEvents.ValuesOnly() + .Last() .Should() .BeOfType(); } [Fact] - public async Task it_returns_the_result_of_a_null_expression() + public async Task expression_evaluated_to_null_has_result_with_null_value() { var kernel = CreateKernel(); await kernel.SendAsync(new SubmitCode("null")); - KernelEvents.OfType() + KernelEvents.ValuesOnly() + .OfType() .Last() .Value .Should() @@ -159,7 +167,7 @@ public async Task it_does_not_return_a_result_for_a_statement() KernelEvents .Should() - .NotContain(e => e is ValueProduced); + .NotContain(e => e.Value is ValueProduced); } [Fact] @@ -171,7 +179,8 @@ public async Task it_aggregates_multiple_submissions() await kernel.SendAsync(new SubmitCode("x.Add(3);")); await kernel.SendAsync(new SubmitCode("x.Max()")); - KernelEvents.OfType() + KernelEvents.ValuesOnly() + .OfType() .Last() .Value .Should() @@ -189,7 +198,8 @@ public async Task it_produces_values_when_executing_Console_output() Console.Write(""value three"");"); await kernel.SendAsync(kernelCommand); - KernelEvents.OfType() + KernelEvents.ValuesOnly() + .OfType() .Should() .BeEquivalentTo( new ValueProduced("value one", kernelCommand, false, new[] { new FormattedValue("text/plain", "value one"), }), @@ -209,12 +219,31 @@ public async Task it_produces_a_final_value_if_the_code_expression_evaluates() 5", "csharp"); await kernel.SendAsync(kernelCommand); - KernelEvents.OfType() + KernelEvents.ValuesOnly() + .OfType() .Should() .HaveCount(4) .And .ContainSingle(e => e.IsLastValue); - + + } + + [Fact] + public async Task the_output_is_asynchronous() + { + var kernel = CreateKernel(); + + var kernelCommand = new SubmitCode(@" +Console.Write(DateTime.Now); +System.Threading.Thread.Sleep(1000); +Console.Write(DateTime.Now); +5", "csharp"); + await kernel.SendAsync(kernelCommand); + var events = KernelEvents + .Where(e => e.Value is ValueProduced).ToArray(); + var diff = events[1].Timestamp - events[0].Timestamp; + diff.Should().BeCloseTo(1.Seconds(), precision: 200); + } [Fact(Skip = "requires support for cs8 in roslyn scripting")] @@ -225,7 +254,8 @@ public async Task it_supports_csharp_8() await kernel.SendAsync(new SubmitCode("var text = \"meow? meow!\";")); await kernel.SendAsync(new SubmitCode("text[^5..^0]")); - KernelEvents.OfType() + KernelEvents.ValuesOnly() + .OfType() .Last() .Value .Should() @@ -250,9 +280,12 @@ public async Task it_can_load_assembly_references_using_r_directive() json ")); - KernelEvents.Should() + KernelEvents.ValuesOnly() + .Should() .ContainSingle(e => e is ValueProduced); - KernelEvents.OfType() + + KernelEvents.ValuesOnly() + .OfType() .Single() .Value .Should() @@ -267,11 +300,13 @@ public async Task it_returns_completion_list_for_types() await kernel.SendAsync(new RequestCompletion("System.Console.", 15)); - KernelEvents.Should() - .ContainSingle(e => e is CompletionRequestReceived); + KernelEvents.ValuesOnly() + .Should() + .ContainSingle(e => e is CompletionRequestReceived); - KernelEvents.Single(e => e is CompletionRequestCompleted) - .As() + KernelEvents.ValuesOnly() + .OfType() + .Single() .CompletionList .Should() .Contain(i => i.DisplayText == "ReadLine"); @@ -287,14 +322,16 @@ public async Task it_returns_completion_list_for_previously_declared_variables() new SubmitCode("var alpha = new Random();")); await kernel.SendAsync(new RequestCompletion("al", 2)); - KernelEvents.Should() - .ContainSingle(e => e is CompletionRequestReceived); + KernelEvents.ValuesOnly() + .Should() + .ContainSingle(e => e is CompletionRequestReceived); - KernelEvents.Single(e => e is CompletionRequestCompleted) - .As() - .CompletionList - .Should() - .Contain(i => i.DisplayText == "alpha"); + KernelEvents.ValuesOnly() + .OfType() + .Single() + .CompletionList + .Should() + .Contain(i => i.DisplayText == "alpha"); } [Fact] @@ -311,9 +348,10 @@ public async Task it_returns_completion_list_for_types_imported_at_runtime() await kernel.SendAsync(new RequestCompletion("Newtonsoft.Json.JsonConvert.", 28)); KernelEvents.Should() - .ContainSingle(e => e is CompletionRequestReceived); + .ContainSingle(e => e.Value is CompletionRequestReceived); - KernelEvents.Single(e => e is CompletionRequestCompleted) + KernelEvents.Single(e => e.Value is CompletionRequestCompleted) + .Value .As() .CompletionList .Should() @@ -381,8 +419,8 @@ public async Task OnLoadAsync(IKernel kernel) await kernel.SendAsync(new SubmitCode($"#extend \"{extensionDllPath}\"")); KernelEvents.Should() - .ContainSingle(e => e is CodeSubmissionEvaluated && - e.As().Code.Contains("using System.Reflection;")); + .ContainSingle(e => e.Value is CodeSubmissionEvaluated && + e.Value.As().Code.Contains("using System.Reflection;")); } } } \ No newline at end of file diff --git a/WorkspaceServer.Tests/Kernel/TimestampedExtensions.cs b/WorkspaceServer.Tests/Kernel/TimestampedExtensions.cs new file mode 100644 index 000000000..f0be45c9f --- /dev/null +++ b/WorkspaceServer.Tests/Kernel/TimestampedExtensions.cs @@ -0,0 +1,23 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +namespace WorkspaceServer.Tests.Kernel +{ + internal static class TimestampedExtensions + { + public static IObservable ValuesOnly(this IObservable> source) + { + return source.Select(t => t.Value); + } + + public static IEnumerable ValuesOnly(this IEnumerable> source) + { + return source.Select(t => t.Value); + } + } +} diff --git a/WorkspaceServer/Kernel/CSharpKernel.cs b/WorkspaceServer/Kernel/CSharpKernel.cs index d6b049e8c..ca22e9434 100644 --- a/WorkspaceServer/Kernel/CSharpKernel.cs +++ b/WorkspaceServer/Kernel/CSharpKernel.cs @@ -121,9 +121,9 @@ private void SetupScriptOptions() { context.OnNext(new CompleteCodeSubmissionReceived(codeSubmission)); Exception exception = null; - var output = Array.Empty(); using (var console = await ConsoleOutput.Capture()) { + console.SubscribeToStandardOutput((std) => PublishOutput(std, context, codeSubmission)); try { @@ -149,10 +149,6 @@ private void SetupScriptOptions() { exception = e; } - output = - console.WriteOccurredOnStandardOutput - ? console.GetStandardOutputWrites().ToArray() - : Array.Empty(); } if (exception != null) @@ -165,21 +161,6 @@ private void SetupScriptOptions() } else { - foreach (var std in output) - { - var formattedValues = new List - { - new FormattedValue( - Formatter.MimeTypeFor(std?.GetType() ?? typeof(object)), std) - }; - - context.OnNext( - new ValueProduced( - std, - codeSubmission, - false, - formattedValues)); - } if (HasReturnValue) { var writer = new StringWriter(); @@ -210,6 +191,22 @@ private void SetupScriptOptions() } } + private void PublishOutput(string output, KernelInvocationContext context, IKernelCommand command) + { + var formattedValues = new List + { + new FormattedValue( + Formatter.MimeTypeFor(output?.GetType() ?? typeof(object)), output) + }; + + context.OnNext( + new ValueProduced( + output, + command, + false, + formattedValues)); + } + private async Task HandleRequestCompletion( RequestCompletion requestCompletion, KernelInvocationContext context, diff --git a/WorkspaceServer/Servers/Roslyn/ConsoleOutput.cs b/WorkspaceServer/Servers/Roslyn/ConsoleOutput.cs index eedde401b..510c7fdeb 100644 --- a/WorkspaceServer/Servers/Roslyn/ConsoleOutput.cs +++ b/WorkspaceServer/Servers/Roslyn/ConsoleOutput.cs @@ -50,6 +50,11 @@ public static async Task Capture() return redirector; } + public IDisposable SubscribeToStandardOutput(Action action) + { + return outputWriter.Subscribe(action); + } + public void Dispose() { if (Interlocked.CompareExchange(ref alreadyDisposed, DISPOSED, NOT_DISPOSED) == NOT_DISPOSED) diff --git a/WorkspaceServer/Servers/Roslyn/TrackingStringWriter.cs b/WorkspaceServer/Servers/Roslyn/TrackingStringWriter.cs index 84ac20015..05334c457 100644 --- a/WorkspaceServer/Servers/Roslyn/TrackingStringWriter.cs +++ b/WorkspaceServer/Servers/Roslyn/TrackingStringWriter.cs @@ -5,11 +5,16 @@ using System; using System.Collections.Generic; using System.IO; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; using System.Threading.Tasks; namespace WorkspaceServer.Servers.Roslyn { - internal class TrackingStringWriter : StringWriter + internal class TrackingStringWriter : StringWriter, IObservable { private class Region { @@ -17,9 +22,42 @@ private class Region public int Length { get; set; } } - readonly List _regions = new List(); - + private Subject WriteEvents = new Subject(); + private readonly List _regions = new List(); private bool _trackingWriteOperation; + private int _observerCount; + + private readonly CompositeDisposable _disposable; + private readonly IObservable _scheduleEvents; + public TrackingStringWriter() + { + var scheduler = new EventLoopScheduler(t => + { + var thread = new Thread(t); + thread.Name = "Diego"; + thread.IsBackground = true; + return thread; + }); + + _scheduleEvents = WriteEvents.ObserveOn(scheduler); + + + _disposable = new CompositeDisposable() + { + scheduler, + WriteEvents + }; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _disposable.Dispose(); + } + + base.Dispose(disposing); + } public bool WriteOccurred { get; set; } @@ -51,6 +89,15 @@ private void TrackWriteOperation(Action action) region.Length = sb.Length - region.Start; _trackingWriteOperation = false; + PumpStringIfObserved(sb, region); + } + + private void PumpStringIfObserved(System.Text.StringBuilder sb, Region region) + { + if (_observerCount > 0) + { + WriteEvents.OnNext(sb.ToString(region.Start, region.Length)); + } } private async Task TrackWriteOperationAsync(Func action) @@ -74,7 +121,10 @@ private async Task TrackWriteOperationAsync(Func action) await action(); region.Length = sb.Length - region.Start; + _trackingWriteOperation = false; + + PumpStringIfObserved(sb, region); } public override void Write(char[] buffer, int index, int count) @@ -290,5 +340,16 @@ public IEnumerable Writes() yield return src.Substring(region.Start, region.Length); } } + + public IDisposable Subscribe(IObserver observer) + { + Interlocked.Increment(ref _observerCount); + return new CompositeDisposable() + { + Disposable.Create( + () => Interlocked.Decrement(ref _observerCount)), + _scheduleEvents.Subscribe(observer) + }; + } } } \ No newline at end of file